|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH VERY RFC 1/5] tools/libxl: Add support for writing a set of buffers asynchronously
From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
---
tools/libxl/libxl_aoutils.c | 118 ++++++++++++++++++++++++++++++++++++++++++
tools/libxl/libxl_internal.h | 44 ++++++++++++++++
2 files changed, 162 insertions(+)
diff --git a/tools/libxl/libxl_aoutils.c b/tools/libxl/libxl_aoutils.c
index b10d2e1..6027d05 100644
--- a/tools/libxl/libxl_aoutils.c
+++ b/tools/libxl/libxl_aoutils.c
@@ -324,6 +324,124 @@ int libxl__datacopier_start(libxl__datacopier_state *dc)
return rc;
}
+
+/*----- writer -----*/
+
+void libxl__writer_kill(libxl__writer_state *dw)
+{
+ STATE_AO_GC(dw->ao);
+ libxl__writer_buf *buf, *tbuf;
+
+ libxl__ev_fd_deregister(gc, &dw->towrite);
+ LIBXL_TAILQ_FOREACH_SAFE(buf, &dw->bufs, entry, tbuf)
+ free(buf);
+ LIBXL_TAILQ_INIT(&dw->bufs);
+}
+
+void libxl__writer_append(libxl__egc *egc, libxl__writer_state *dw,
+ const void *data, size_t len)
+{
+ EGC_GC;
+ libxl__writer_buf *buf;
+
+ assert(len < dw->maxsz - dw->used);
+
+ buf = libxl__zalloc(NOGC, sizeof(*buf));
+ buf->used = len;
+ memcpy(buf->buf, data, len);
+
+ dw->used += len;
+ LIBXL_TAILQ_INSERT_TAIL(&dw->bufs, buf, entry);
+}
+
+static int writer_pollhup_handled(libxl__egc *egc,
+ libxl__writer_state *dw,
+ short revents)
+{
+ STATE_AO_GC(dw->ao);
+
+ if (dw->callback_pollhup && (revents & POLLHUP)) {
+ LOG(DEBUG, "received POLLHUP on %s during writing of %s",
+ dw->towhat, dw->writewhat);
+ libxl__writer_kill(dw);
+ dw->callback_pollhup(egc, dw, 1, -1);
+ return 1;
+ }
+ return 0;
+}
+
+static void writer_writable(libxl__egc *egc, libxl__ev_fd *ev,
+ int fd, short events, short revents) {
+ libxl__writer_state *dw = CONTAINER_OF(ev, *dw, towrite);
+ STATE_AO_GC(dw->ao);
+
+ if (writer_pollhup_handled(egc, dw, revents))
+ return;
+
+ if (revents & ~POLLOUT) {
+ LOG(ERROR, "unexpected poll event 0x%x (should be POLLOUT)"
+ " during writing %s to %s", revents, dw->writewhat, dw->towhat);
+ libxl__writer_kill(dw);
+ dw->callback(egc, dw, -1, 0);
+ return;
+ }
+ assert(revents & POLLOUT);
+ for (;;) {
+ libxl__writer_buf *buf = LIBXL_TAILQ_FIRST(&dw->bufs);
+ if (!buf) {
+ libxl__writer_kill(dw);
+ dw->callback(egc, dw, 0, 0);
+ break;
+ }
+ if (!buf->used) {
+ LIBXL_TAILQ_REMOVE(&dw->bufs, buf, entry);
+ free(buf);
+ continue;
+ }
+ int r = write(ev->fd, buf->buf, buf->used);
+ if (r < 0) {
+ if (errno == EINTR) continue;
+ if (errno == EWOULDBLOCK) break;
+ LOGE(ERROR, "error writing %s to %s",
+ dw->writewhat, dw->towhat);
+ libxl__writer_kill(dw);
+ dw->callback(egc, dw, 1, errno);
+ return;
+ }
+ assert(r > 0);
+ assert(r <= buf->used);
+ buf->used -= r;
+ dw->used -= r;
+ assert(dw->used >= 0);
+ memmove(buf->buf, buf->buf+r, buf->used);
+ }
+}
+
+void libxl__writer_init(libxl__writer_state *dw)
+{
+ assert(dw->ao);
+ libxl__ev_fd_init(&dw->towrite);
+ LIBXL_TAILQ_INIT(&dw->bufs);
+}
+
+int libxl__writer_start(libxl__writer_state *dw)
+{
+ int rc;
+ STATE_AO_GC(dw->ao);
+
+ libxl__writer_init(dw);
+
+ rc = libxl__ev_fd_register(gc, &dw->towrite, writer_writable,
+ dw->writefd, POLLOUT);
+ if (rc) goto out;
+
+ return 0;
+
+ out:
+ libxl__writer_kill(dw);
+ return rc;
+}
+
/*----- openpty -----*/
/* implementation */
diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
index 04c9378..47fbf45 100644
--- a/tools/libxl/libxl_internal.h
+++ b/tools/libxl/libxl_internal.h
@@ -2471,6 +2471,50 @@ typedef struct libxl__save_helper_state {
* marshalling and xc callback functions */
} libxl__save_helper_state;
+/*----- writer: writes a set of buffers to an fd asynchronously -----*/
+
+typedef struct libxl__writer_state libxl__writer_state;
+typedef struct libxl__writer_buf libxl__writer_buf;
+
+/* onwrite==1 means failure happened when writing, logged, errnoval is valid
+ * onwrite==-1 means some other internal failure, errnoval not valid, logged
+ * If we get POLLHUP, we call callback_pollhup(..., 1, -1);
+ * or if callback_pollhup==0 this is an internal failure, as above.
+ * In all cases copier is killed before calling this callback */
+typedef void libxl__writer_callback(libxl__egc *egc,
+ libxl__writer_state *dw, int state, int errnoval);
+
+struct libxl__writer_buf {
+ /* private to writer */
+ LIBXL_TAILQ_ENTRY(libxl__writer_buf) entry;
+ int used;
+ char buf[1000];
+};
+
+struct libxl__writer_state {
+ /* caller must fill these in, and they must all remain valid */
+ libxl__ao *ao;
+ int writefd;
+ ssize_t maxsz;
+ const char *towhat, *writewhat; /* for error msgs */
+ libxl__writer_callback *callback;
+ libxl__writer_callback *callback_pollhup;
+ /* remaining fields are private to writer */
+ libxl__ev_fd towrite;
+ ssize_t used;
+ LIBXL_TAILQ_HEAD(libxl__writer_bufs, libxl__writer_buf) bufs;
+};
+
+_hidden void libxl__writer_init(libxl__writer_state *dc);
+_hidden void libxl__writer_kill(libxl__writer_state *dc);
+_hidden int libxl__writer_start(libxl__writer_state *dc);
+
+/* Inserts literal data into the output stream. The data is copied.
+ * May safely be used only immediately after libxl__writer_start
+ * (before the ctx is unlocked). But may be called multiple times.
+ * NB exceeding maxsz will fail an assertion! */
+_hidden void libxl__writer_append(libxl__egc*, libxl__writer_state*,
+ const void *data, size_t len);
/*----- Domain suspend (save) state structure -----*/
--
1.7.10.4
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |