[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH VERY RFC 1/5] tools/libxl: Add support for writing a set of buffers asynchronously



From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>

Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
---
 tools/libxl/libxl_aoutils.c  |  118 ++++++++++++++++++++++++++++++++++++++++++
 tools/libxl/libxl_internal.h |   44 ++++++++++++++++
 2 files changed, 162 insertions(+)

diff --git a/tools/libxl/libxl_aoutils.c b/tools/libxl/libxl_aoutils.c
index b10d2e1..6027d05 100644
--- a/tools/libxl/libxl_aoutils.c
+++ b/tools/libxl/libxl_aoutils.c
@@ -324,6 +324,124 @@ int libxl__datacopier_start(libxl__datacopier_state *dc)
     return rc;
 }
 
+
+/*----- writer -----*/
+
+void libxl__writer_kill(libxl__writer_state *dw)
+{
+    STATE_AO_GC(dw->ao);
+    libxl__writer_buf *buf, *tbuf;
+
+    libxl__ev_fd_deregister(gc, &dw->towrite);
+    LIBXL_TAILQ_FOREACH_SAFE(buf, &dw->bufs, entry, tbuf)
+        free(buf);
+    LIBXL_TAILQ_INIT(&dw->bufs);
+}
+
+void libxl__writer_append(libxl__egc *egc, libxl__writer_state *dw,
+                          const void *data, size_t len)
+{
+    EGC_GC;
+    libxl__writer_buf *buf;
+
+    assert(len < dw->maxsz - dw->used);
+
+    buf = libxl__zalloc(NOGC, sizeof(*buf));
+    buf->used = len;
+    memcpy(buf->buf, data, len);
+
+    dw->used += len;
+    LIBXL_TAILQ_INSERT_TAIL(&dw->bufs, buf, entry);
+}
+
+static int writer_pollhup_handled(libxl__egc *egc,
+                                  libxl__writer_state *dw,
+                                  short revents)
+{
+    STATE_AO_GC(dw->ao);
+
+    if (dw->callback_pollhup && (revents & POLLHUP)) {
+        LOG(DEBUG, "received POLLHUP on %s during writing of %s",
+            dw->towhat, dw->writewhat);
+        libxl__writer_kill(dw);
+        dw->callback_pollhup(egc, dw, 1, -1);
+        return 1;
+    }
+    return 0;
+}
+
+static void writer_writable(libxl__egc *egc, libxl__ev_fd *ev,
+                            int fd, short events, short revents) {
+    libxl__writer_state *dw = CONTAINER_OF(ev, *dw, towrite);
+    STATE_AO_GC(dw->ao);
+
+    if (writer_pollhup_handled(egc, dw, revents))
+        return;
+
+    if (revents & ~POLLOUT) {
+        LOG(ERROR, "unexpected poll event 0x%x (should be POLLOUT)"
+            " during writing %s to %s", revents, dw->writewhat, dw->towhat);
+        libxl__writer_kill(dw);
+        dw->callback(egc, dw, -1, 0);
+        return;
+    }
+    assert(revents & POLLOUT);
+    for (;;) {
+        libxl__writer_buf *buf = LIBXL_TAILQ_FIRST(&dw->bufs);
+        if (!buf) {
+            libxl__writer_kill(dw);
+            dw->callback(egc, dw, 0, 0);
+            break;
+        }
+        if (!buf->used) {
+            LIBXL_TAILQ_REMOVE(&dw->bufs, buf, entry);
+            free(buf);
+            continue;
+        }
+        int r = write(ev->fd, buf->buf, buf->used);
+        if (r < 0) {
+            if (errno == EINTR) continue;
+            if (errno == EWOULDBLOCK) break;
+            LOGE(ERROR, "error writing %s to %s",
+                 dw->writewhat, dw->towhat);
+            libxl__writer_kill(dw);
+            dw->callback(egc, dw, 1, errno);
+            return;
+        }
+        assert(r > 0);
+        assert(r <= buf->used);
+        buf->used -= r;
+        dw->used -= r;
+        assert(dw->used >= 0);
+        memmove(buf->buf, buf->buf+r, buf->used);
+    }
+}
+
+void libxl__writer_init(libxl__writer_state *dw)
+{
+    assert(dw->ao);
+    libxl__ev_fd_init(&dw->towrite);
+    LIBXL_TAILQ_INIT(&dw->bufs);
+}
+
+int libxl__writer_start(libxl__writer_state *dw)
+{
+    int rc;
+    STATE_AO_GC(dw->ao);
+
+    libxl__writer_init(dw);
+
+    rc = libxl__ev_fd_register(gc, &dw->towrite, writer_writable,
+                               dw->writefd, POLLOUT);
+    if (rc) goto out;
+
+    return 0;
+
+ out:
+    libxl__writer_kill(dw);
+    return rc;
+}
+
 /*----- openpty -----*/
 
 /* implementation */
diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
index 04c9378..47fbf45 100644
--- a/tools/libxl/libxl_internal.h
+++ b/tools/libxl/libxl_internal.h
@@ -2471,6 +2471,50 @@ typedef struct libxl__save_helper_state {
                       * marshalling and xc callback functions */
 } libxl__save_helper_state;
 
+/*----- writer: writes a set of buffers to an fd asynchronously -----*/
+
+typedef struct libxl__writer_state libxl__writer_state;
+typedef struct libxl__writer_buf libxl__writer_buf;
+
+/* onwrite==1 means failure happened when writing, logged, errnoval is valid
+ * onwrite==-1 means some other internal failure, errnoval not valid, logged
+ * If we get POLLHUP, we call callback_pollhup(..., 1, -1);
+ * or if callback_pollhup==0 this is an internal failure, as above.
+ * In all cases copier is killed before calling this callback */
+typedef void libxl__writer_callback(libxl__egc *egc,
+     libxl__writer_state *dw, int state, int errnoval);
+
+struct libxl__writer_buf {
+    /* private to writer */
+    LIBXL_TAILQ_ENTRY(libxl__writer_buf) entry;
+    int used;
+    char buf[1000];
+};
+
+struct libxl__writer_state {
+    /* caller must fill these in, and they must all remain valid */
+    libxl__ao *ao;
+    int writefd;
+    ssize_t maxsz;
+    const char *towhat, *writewhat; /* for error msgs */
+    libxl__writer_callback *callback;
+    libxl__writer_callback *callback_pollhup;
+    /* remaining fields are private to writer */
+    libxl__ev_fd towrite;
+    ssize_t used;
+    LIBXL_TAILQ_HEAD(libxl__writer_bufs, libxl__writer_buf) bufs;
+};
+
+_hidden void libxl__writer_init(libxl__writer_state *dc);
+_hidden void libxl__writer_kill(libxl__writer_state *dc);
+_hidden int libxl__writer_start(libxl__writer_state *dc);
+
+/* Inserts literal data into the output stream.  The data is copied.
+ * May safely be used only immediately after libxl__writer_start
+ * (before the ctx is unlocked).  But may be called multiple times.
+ * NB exceeding maxsz will fail an assertion! */
+_hidden void libxl__writer_append(libxl__egc*, libxl__writer_state*,
+                                  const void *data, size_t len);
 
 /*----- Domain suspend (save) state structure -----*/
 
-- 
1.7.10.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.