[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 16/27] tools/libxl: Infrastructure for reading a libxl migration v2 stream



From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>

Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
Signed-off-by: Andrew Cooper <andrew.cooper3@xxxxxxxxxx>
CC: Ian Campbell <Ian.Campbell@xxxxxxxxxx>
CC: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx>
CC: Wei Liu <wei.liu2@xxxxxxxxxx>
---
 tools/libxl/Makefile            |    1 +
 tools/libxl/libxl_internal.h    |   39 ++++
 tools/libxl/libxl_stream_read.c |  485 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 525 insertions(+)
 create mode 100644 tools/libxl/libxl_stream_read.c

diff --git a/tools/libxl/Makefile b/tools/libxl/Makefile
index cc9c152..c71c5fe 100644
--- a/tools/libxl/Makefile
+++ b/tools/libxl/Makefile
@@ -94,6 +94,7 @@ LIBXL_OBJS = flexarray.o libxl.o libxl_create.o libxl_dm.o 
libxl_pci.o \
                        libxl_dom.o libxl_exec.o libxl_xshelp.o libxl_device.o \
                        libxl_internal.o libxl_utils.o libxl_uuid.o \
                        libxl_json.o libxl_aoutils.o libxl_numa.o libxl_vnuma.o 
\
+                       libxl_stream_read.o \
                        libxl_save_callout.o _libxl_save_msgs_callout.o \
                        libxl_qmp.o libxl_event.o libxl_fork.o $(LIBXL_OBJS-y)
 LIBXL_OBJS += libxl_genid.o
diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
index 101994f..4f33cb8 100644
--- a/tools/libxl/libxl_internal.h
+++ b/tools/libxl/libxl_internal.h
@@ -19,6 +19,8 @@
 
 #include "libxl_osdeps.h" /* must come before any other headers */
 
+#include "libxl_sr_stream_format.h"
+
 #include <assert.h>
 #include <dirent.h>
 #include <errno.h>
@@ -3121,6 +3123,42 @@ typedef void libxl__domain_create_cb(libxl__egc *egc,
                                      libxl__domain_create_state*,
                                      int rc, uint32_t domid);
 
+/* State for manipulating a libxl migration v2 stream */
+typedef struct libxl__stream_read_state libxl__stream_read_state;
+
+struct libxl__stream_read_state {
+    /* filled by the user */
+    libxl__ao *ao;
+    int fd;
+    void (*completion_callback)(libxl__egc *egc,
+                                libxl__domain_create_state *dcs,
+                                int rc);
+    /* Private */
+    int rc;
+    bool running;
+    libxl__datacopier_state dc;
+    size_t expected_len;
+    libxl_sr_hdr hdr;
+    libxl_sr_rec_hdr rec_hdr;
+    void *rec_body;
+};
+
+_hidden void libxl__stream_read_start(libxl__egc *egc,
+                                      libxl__stream_read_state *stream);
+
+_hidden void libxl__stream_read_continue(libxl__egc *egc,
+                                         libxl__stream_read_state *stream);
+
+_hidden void libxl__stream_read_abort(libxl__egc *egc,
+                                      libxl__stream_read_state *stream, int 
rc);
+
+static inline bool libxl__stream_read_inuse(
+    const libxl__stream_read_state *stream)
+{
+    return stream->running;
+}
+
+
 struct libxl__domain_create_state {
     /* filled in by user */
     libxl__ao *ao;
@@ -3137,6 +3175,7 @@ struct libxl__domain_create_state {
     libxl__stub_dm_spawn_state dmss;
         /* If we're not doing stubdom, we use only dmss.dm,
          * for the non-stubdom device model. */
+    libxl__stream_read_state srs;
     libxl__save_helper_state shs;
     /* necessary if the domain creation failed and we have to destroy it */
     libxl__domain_destroy_state dds;
diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c
new file mode 100644
index 0000000..9cdaadf
--- /dev/null
+++ b/tools/libxl/libxl_stream_read.c
@@ -0,0 +1,485 @@
+/*
+ * Copyright (C) 2015      Citrix Ltd.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "libxl_osdeps.h" /* must come before any other headers */
+
+#include "libxl_internal.h"
+
+/*
+ * Infrastructure for reading and acting on the contents of a libxl migration
+ * stream. There are a lot of moving parts here.
+ *
+ * Entry points from outside:
+ *  - libxl__stream_read_start()
+ *     - Set up reading a stream from the start.
+ *
+ *  - libxl__stream_read_continue()
+ *     - Set up reading the next record from a started stream.
+ *
+ * The principle loop functionality involves reading the stream header, then
+ * reading a record at time and acting upon it.  It follows the callbacks:
+ *
+ *  - stream_header_done()
+ *  - stream_record_header_done()
+ *  - stream_record_body_done()
+ *  - process_record()
+ *
+ * process_record() will choose the correct next action based upon the
+ * record.  Upon completion of the action, the next record header will be read
+ * from the stream.
+ */
+
+static void stream_success(libxl__egc *egc,
+                           libxl__stream_read_state *stream);
+static void stream_failed(libxl__egc *egc,
+                          libxl__stream_read_state *stream, int rc);
+static void stream_done(libxl__egc *egc,
+                        libxl__stream_read_state *stream);
+
+/* Event callbacks for main reading loop. */
+static void stream_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int onwrite, int errnoval);
+static void record_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int onwrite, int errnoval);
+static void record_body_done(libxl__egc *egc,
+                             libxl__datacopier_state *dc,
+                             int onwrite, int errnoval);
+static void process_record(libxl__egc *egc,
+                           libxl__stream_read_state *stream);
+
+/* Mini-event loop for splicing a emulator record out of the stream. */
+static void read_emulator_body(libxl__egc *egc,
+                               libxl__stream_read_state *stream);
+static void emulator_body_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int onwrite, int errnoval);
+static void emulator_padding_done(libxl__egc *egc,
+                                  libxl__datacopier_state *dc,
+                                  int onwrite, int errnoval);
+
+void libxl__stream_read_start(libxl__egc *egc,
+                              libxl__stream_read_state *stream)
+{
+    libxl__datacopier_state *dc = &stream->dc;
+    int ret = 0;
+
+    /* State initialisation. */
+    assert(!stream->running);
+
+    memset(dc, 0, sizeof(*dc));
+    dc->ao = stream->ao;
+    dc->readfd = stream->fd;
+    dc->writefd = -1;
+
+    /* Start reading the stream header. */
+    dc->readwhat = "stream header";
+    dc->readbuf = &stream->hdr;
+    stream->expected_len = dc->bytes_to_read = sizeof(stream->hdr);
+    dc->used = 0;
+    dc->callback = stream_header_done;
+
+    ret = libxl__datacopier_start(dc);
+    if (ret)
+        goto err;
+
+    stream->running = true;
+    assert(!ret);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+void libxl__stream_read_continue(libxl__egc *egc,
+                                 libxl__stream_read_state *stream)
+{
+    libxl__datacopier_state *dc = &stream->dc;
+    int ret = 0;
+
+    assert(stream->running);
+
+    /* Read a record header. */
+    dc->readwhat = "record header";
+    dc->readbuf = &stream->rec_hdr;
+    stream->expected_len = dc->bytes_to_read = sizeof(stream->rec_hdr);
+    dc->used = 0;
+    dc->callback = record_header_done;
+
+    ret = libxl__datacopier_start(dc);
+    if (ret)
+        goto err;
+
+    assert(!ret);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+void libxl__stream_read_abort(libxl__egc *egc,
+                              libxl__stream_read_state *stream, int rc)
+{
+    stream_failed(egc, stream, rc);
+}
+
+static void stream_success(libxl__egc *egc, libxl__stream_read_state *stream)
+{
+    stream->rc = 0;
+    stream->running = false;
+
+    stream_done(egc, stream);
+}
+
+static void stream_failed(libxl__egc *egc,
+                          libxl__stream_read_state *stream, int rc)
+{
+    assert(rc);
+    stream->rc = rc;
+
+    if (stream->running) {
+        stream->running = false;
+        stream_done(egc, stream);
+    }
+}
+
+static void stream_done(libxl__egc *egc,
+                        libxl__stream_read_state *stream)
+{
+    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
+
+    assert(!stream->running);
+
+    stream->completion_callback(egc, dcs, stream->rc);
+}
+
+static void stream_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    libxl_sr_hdr *hdr = &stream->hdr;
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    if (onwrite || dc->used != stream->expected_len) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
+            onwrite, errnoval, stream->expected_len, dc->used);
+        goto err;
+    }
+
+    hdr->ident   = be64toh(hdr->ident);
+    hdr->version = be32toh(hdr->version);
+    hdr->options = be32toh(hdr->options);
+
+    if (hdr->ident != RESTORE_STREAM_IDENT) {
+        ret = ERROR_FAIL;
+        LOG(ERROR,
+            "Invalid ident: expected 0x%016"PRIx64", got 0x%016"PRIx64,
+            RESTORE_STREAM_IDENT, hdr->ident);
+        goto err;
+    }
+    if (hdr->version != RESTORE_STREAM_VERSION) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "Unexpected Version: expected %u, got %u",
+            RESTORE_STREAM_VERSION, hdr->version);
+        goto err;
+    }
+    if (hdr->options & RESTORE_OPT_BIG_ENDIAN) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "Unable to handle big endian streams");
+        goto err;
+    }
+
+    LOG(INFO, "Stream v%u%s", hdr->version,
+        hdr->options & RESTORE_OPT_LEGACY ? " (from legacy)" : "");
+
+    libxl__stream_read_continue(egc, stream);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void record_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    if (onwrite || dc->used != stream->expected_len) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
+            onwrite, errnoval, stream->expected_len, dc->used);
+        goto err;
+    }
+
+    assert(stream->rec_body == NULL);
+
+    /* No body? Process straight away. */
+    if (rec_hdr->length == 0) {
+        process_record(egc, stream);
+        return;
+    }
+
+    /* Queue up reading the body. */
+    size_t bytes_to_read;
+
+    switch (rec_hdr->type) {
+        /*
+         * Emulator records want to retain the blob in the pipe, for a further
+         * datacopier call to move elsewhere.  Just read the emulator header.
+         */
+    case REC_TYPE_EMULATOR_CONTEXT:
+        bytes_to_read = sizeof(struct libxl_sr_emulator_hdr);
+        break;
+
+    default:
+        bytes_to_read = rec_hdr->length;
+        break;
+    }
+
+    bytes_to_read = ROUNDUP(bytes_to_read, REC_ALIGN_ORDER);
+
+    dc->readwhat = "record body";
+    stream->rec_body = dc->readbuf = libxl__malloc(NOGC, bytes_to_read);
+    stream->expected_len = dc->bytes_to_read = bytes_to_read;
+    dc->used = 0;
+    dc->callback = record_body_done;
+
+    ret = libxl__datacopier_start(dc);
+    if (ret)
+        goto err;
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void record_body_done(libxl__egc *egc,
+                             libxl__datacopier_state *dc,
+                             int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    if (onwrite || dc->used != stream->expected_len) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
+            onwrite, errnoval, stream->expected_len, dc->used);
+
+        free(stream->rec_body);
+        stream->rec_body = dc->readbuf = NULL;
+
+        goto err;
+    }
+
+    process_record(egc, stream);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void process_record(libxl__egc *egc,
+                           libxl__stream_read_state *stream)
+{
+    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
+    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
+    STATE_AO_GC(stream->ao);
+    int ret = 0;
+
+    LOG(DEBUG, "Record: 0x%08x, length %u", rec_hdr->type, rec_hdr->length);
+
+    switch (rec_hdr->type) {
+
+    case REC_TYPE_END:
+        /* Handled later, after cleanup. */
+        break;
+
+    case REC_TYPE_XENSTORE_DATA:
+        ret = libxl__toolstack_restore(dcs->guest_domid, stream->rec_body,
+                                       rec_hdr->length, &dcs->shs);
+        if (ret)
+            goto err;
+
+        /*
+         * libxl__toolstack_restore() is a synchronous function.  Manually
+         * start looking for the next record.
+         */
+        libxl__stream_read_continue(egc, &dcs->srs);
+        break;
+
+    case REC_TYPE_EMULATOR_CONTEXT:
+        read_emulator_body(egc, stream);
+        break;
+
+    default:
+        LOG(ERROR, "Unrecognised record 0x%08x", rec_hdr->type);
+        ret = ERROR_FAIL;
+        goto err;
+    }
+
+    assert(!ret);
+    if (rec_hdr->length) {
+        free(stream->rec_body);
+        stream->rec_body = NULL;
+    }
+
+    if (rec_hdr->type == REC_TYPE_END)
+        stream_success(egc, stream);
+    return;
+
+ err:
+    assert(ret);
+    if (rec_hdr->length) {
+        free(stream->rec_body);
+        stream->rec_body = NULL;
+    }
+    stream_failed(egc, stream, ret);
+}
+
+static void read_emulator_body(libxl__egc *egc,
+                               libxl__stream_read_state *stream)
+{
+    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
+    libxl__datacopier_state *dc = &stream->dc;
+    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
+    libxl_sr_emulator_hdr *emu_hdr = stream->rec_body;
+    STATE_AO_GC(stream->ao);
+    char path[256];
+    int ret = 0;
+
+    sprintf(path, XC_DEVICE_MODEL_RESTORE_FILE".%u", dcs->guest_domid);
+
+    dc->readwhat = "save/migration stream";
+    dc->copywhat = "emulator context";
+    dc->writewhat = "qemu save file";
+    dc->readbuf = NULL;
+    dc->writefd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0666);
+    if (dc->writefd == -1) {
+        ret = ERROR_FAIL;
+        LOGE(ERROR, "Unable to open '%s'", path);
+        goto err;
+    }
+    dc->maxsz = dc->bytes_to_read = rec_hdr->length - sizeof(*emu_hdr);
+    stream->expected_len = dc->used = 0;
+    dc->callback = emulator_body_done;
+
+    ret = libxl__datacopier_start(dc);
+    if (ret)
+        goto err;
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void emulator_body_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int onwrite, int errnoval)
+{
+    /* Safe to be static, as it is a write-only discard buffer. */
+    static char padding[1U << REC_ALIGN_ORDER];
+
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
+    STATE_AO_GC(dc->ao);
+    unsigned int nr_padding_bytes = (1U << REC_ALIGN_ORDER);
+    int ret = 0;
+
+    if (onwrite || dc->used != stream->expected_len) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
+            onwrite, errnoval, stream->expected_len, dc->used);
+        goto err;
+    }
+
+    /* Undo modifications for splicing the emulator context. */
+    memset(dc, 0, sizeof(*dc));
+    dc->ao = stream->ao;
+    dc->readfd = stream->fd;
+    dc->writefd = -1;
+
+    /* Do we need to eat some padding out of the stream? */
+    if (rec_hdr->length & (nr_padding_bytes - 1)) {
+        unsigned int bytes_to_discard =
+            nr_padding_bytes - (rec_hdr->length & (nr_padding_bytes - 1));
+
+        dc->readwhat = "padding bytes";
+        dc->readbuf = padding;
+        stream->expected_len = dc->bytes_to_read = bytes_to_discard;
+        dc->used = 0;
+        dc->callback = emulator_padding_done;
+
+        ret = libxl__datacopier_start(dc);
+        if (ret)
+            goto err;
+    }
+    else
+    {
+        stream->expected_len = dc->bytes_to_read = 0;
+        dc->used = 0;
+
+        emulator_padding_done(egc, dc, 0, 0);
+    }
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void emulator_padding_done(libxl__egc *egc,
+                                  libxl__datacopier_state *dc,
+                                  int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    if (onwrite || dc->used != stream->expected_len) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
+            onwrite, errnoval, stream->expected_len, dc->used);
+        goto err;
+    }
+
+    libxl__stream_read_continue(egc, stream);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+/*
+ * Local variables:
+ * mode: C
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ */
-- 
1.7.10.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.