[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 08/10] tools/xenstored: Extend restore code to handle multiple input buffer



From: Julien Grall <jgrall@xxxxxxxxxx>

Currently, the restore code is considering the stream will contain at
most one in-flight request per connection. In a follow-up changes, we
will want to transfer multiple in-flight requests.

The function read_state_buffered() is now extended to restore multiple
in-flight request. Complete requests will be queued as delayed
requests, if there a partial request (only the last one can) then it
will used as the current in-flight request.

Note that we want to bypass the quota check for delayed requests as
the new Xenstore may have a lower limit.

Lastly, there is no need to change the specification as there was
no restriction on the number of in-flight requests preserved.

Signed-off-by: Julien Grall <jgrall@xxxxxxxxxx>
---
 tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
 1 file changed, 48 insertions(+), 8 deletions(-)

diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c
index a5084a5b173d..5b7ab7f74013 100644
--- a/tools/xenstore/xenstored_core.c
+++ b/tools/xenstore/xenstored_core.c
@@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, 
struct buffered_data *in)
        enum xsd_sockmsg_type type = in->hdr.msg.type;
        int ret;
 
+       /* At least send_error() and send_reply() expects conn->in == in */
+       assert(conn->in == in);
+       trace_io(conn, in, 0);
+
        if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) {
                eprintf("Client unknown operation %i", type);
                send_error(conn, ENOSYS);
@@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, 
struct buffered_data *in)
        conn->transaction = NULL;
 }
 
+static bool process_delayed_message(struct delayed_request *req)
+{
+       struct connection *conn = req->data;
+       struct buffered_data *saved_in = conn->in;
+
+       /*
+        * Part of process_message() expects conn->in to contains the
+        * processed response. So save the current conn->in and restore it
+        * afterwards.
+        */
+       conn->in = req->in;
+       process_message(req->data, req->in);
+       conn->in = saved_in;
+
+       return true;
+}
+
 static void consider_message(struct connection *conn)
 {
        if (verbose)
@@ -1582,7 +1603,6 @@ static void handle_input(struct connection *conn)
        if (in->used != in->hdr.msg.len)
                return;
 
-       trace_io(conn, in, 0);
        consider_message(conn);
        return;
 
@@ -2611,14 +2631,20 @@ void read_state_buffered_data(const void *ctx, struct 
connection *conn,
        unsigned int len;
        bool partial = sc->data_resp_len;
 
-       if (sc->data_in_len) {
+       for (data = sc->data; data < sc->data + sc->data_in_len; data += len) {
                bdata = new_buffer(conn);
                if (!bdata)
                        barf("error restoring read data");
-               if (sc->data_in_len < sizeof(bdata->hdr)) {
+
+               /*
+                * We don't know yet if there is more than one message
+                * to process. So the len is the size of the leftover data.
+                */
+               len = sc->data_in_len - (data - sc->data);
+               if (len < sizeof(bdata->hdr)) {
                        bdata->inhdr = true;
-                       memcpy(&bdata->hdr, sc->data, sc->data_in_len);
-                       bdata->used = sc->data_in_len;
+                       memcpy(&bdata->hdr, sc->data, len);
+                       bdata->used = len;
                } else {
                        bdata->inhdr = false;
                        memcpy(&bdata->hdr, sc->data, sizeof(bdata->hdr));
@@ -2629,12 +2655,26 @@ void read_state_buffered_data(const void *ctx, struct 
connection *conn,
                                                        bdata->hdr.msg.len);
                        if (!bdata->buffer)
                                barf("Error allocating in buffer");
-                       bdata->used = sc->data_in_len - sizeof(bdata->hdr);
-                       memcpy(bdata->buffer, sc->data + sizeof(bdata->hdr),
+                       bdata->used = min_t(unsigned int,
+                                           len - sizeof(bdata->hdr),
+                                           bdata->hdr.msg.len);
+                       memcpy(bdata->buffer, data + sizeof(bdata->hdr),
                               bdata->used);
+                       /* Update len to match the size of the message. */
+                       len = bdata->used + sizeof(bdata->hdr);
                }
 
-               conn->in = bdata;
+               /*
+                * If the message is not complete, then it means this was
+                * the current processed message. All the other messages
+                * will be queued to be handled after restoring.
+                */
+               if (bdata->inhdr || bdata->used != bdata->hdr.msg.len) {
+                       assert(conn->in == NULL);
+                       conn->in = bdata;
+               } else if (delay_request(conn, bdata, process_delayed_message,
+                                        conn, true))
+                       barf("Unable to delay the request");
        }
 
        for (data = sc->data + sc->data_in_len;
-- 
2.17.1




 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.