[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 07/11] libxl: Permit multithreaded event waiting



Previously, the context would be locked whenever we were waiting in
libxl's own call to poll (waiting for operating system events).

This would mean that multiple simultaneous calls to libxl_event_wait
in different threads with different parameters would not work
properly.

If we simply unlock the context, it would be possible for another
thread to discover the occurrence of the event we were waiting for,
without us even waking up, and we would remain in poll.  So we need a
way to wake up other threads: a pipe, one for each thread in poll.

We also need to move some variables from globals in the ctx to be
per-polling-thread.

Signed-off-by: Ian Jackson <ian.jackson@xxxxxxxxxxxxx>
---
 tools/libxl/libxl.c          |   18 +++-
 tools/libxl/libxl_event.c    |  196 ++++++++++++++++++++++++++++++++++--------
 tools/libxl/libxl_internal.h |   50 ++++++++++-
 3 files changed, 218 insertions(+), 46 deletions(-)

diff --git a/tools/libxl/libxl.c b/tools/libxl/libxl.c
index 0094541..59e8cb8 100644
--- a/tools/libxl/libxl.c
+++ b/tools/libxl/libxl.c
@@ -49,8 +49,9 @@ int libxl_ctx_alloc(libxl_ctx **pctx, int version,
 
     ctx->osevent_hooks = 0;
 
-    ctx->fd_polls = 0;
-    ctx->fd_rindex = 0;
+    LIBXL_LIST_INIT(&ctx->pollers_event);
+    LIBXL_LIST_INIT(&ctx->pollers_idle);
+
     LIBXL_LIST_INIT(&ctx->efds);
     LIBXL_TAILQ_INIT(&ctx->etimes);
 
@@ -61,6 +62,9 @@ int libxl_ctx_alloc(libxl_ctx **pctx, int version,
     LIBXL_TAILQ_INIT(&ctx->death_list);
     libxl__ev_xswatch_init(&ctx->death_watch);
 
+    rc = libxl__poller_init(ctx, &ctx->poller_app);
+    if (rc) goto out;
+
     if ( stat(XENSTORE_PID_FILE, &stat_buf) != 0 ) {
         LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "Is xenstore daemon running?\n"
                      "failed to stat %s", XENSTORE_PID_FILE);
@@ -135,8 +139,14 @@ int libxl_ctx_free(libxl_ctx *ctx)
     libxl_version_info_dispose(&ctx->version_info);
     if (ctx->xsh) xs_daemon_close(ctx->xsh);
 
-    free(ctx->fd_polls);
-    free(ctx->fd_rindex);
+    libxl__poller_dispose(&ctx->poller_app);
+    assert(LIBXL_LIST_EMPTY(&ctx->pollers_event));
+    libxl__poller *poller, *poller_tmp;
+    LIBXL_LIST_FOREACH_SAFE(poller, &ctx->pollers_idle, entry, poller_tmp) {
+        libxl__poller_dispose(poller);
+        free(poller);
+    }
+
     free(ctx->watch_slots);
 
     discard_events(&ctx->occurred);
diff --git a/tools/libxl/libxl_event.c b/tools/libxl/libxl_event.c
index 69ad318..73dfd9d 100644
--- a/tools/libxl/libxl_event.c
+++ b/tools/libxl/libxl_event.c
@@ -510,9 +510,9 @@ void libxl__ev_xswatch_deregister(libxl__gc *gc, 
libxl__ev_xswatch *w)
  * osevent poll
  */
 
-static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
-                               struct pollfd *fds, int *timeout_upd,
-                               struct timeval now)
+static int beforepoll_internal(libxl__gc *gc, libxl__poller *poller,
+                               int *nfds_io, struct pollfd *fds,
+                               int *timeout_upd, struct timeval now)
 {
     libxl__ev_fd *efd;
     int rc;
@@ -534,7 +534,7 @@ static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
          * not to mess with fd_rindex.
          */
 
-        int maxfd = 0;
+        int maxfd = poller->wakeup_pipe[0] + 1;
         LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
             if (!efd->events)
                 continue;
@@ -542,30 +542,39 @@ static int beforepoll_internal(libxl__gc *gc, int 
*nfds_io,
                 maxfd = efd->fd + 1;
         }
         /* make sure our array is as big as *nfds_io */
-        if (CTX->fd_rindex_allocd < maxfd) {
+        if (poller->fd_rindex_allocd < maxfd) {
             assert(maxfd < INT_MAX / sizeof(int) / 2);
-            int *newarray = realloc(CTX->fd_rindex, sizeof(int) * maxfd);
+            int *newarray = realloc(poller->fd_rindex, sizeof(int) * maxfd);
             if (!newarray) { rc = ERROR_NOMEM; goto out; }
-            memset(newarray + CTX->fd_rindex_allocd, 0,
-                   sizeof(int) * (maxfd - CTX->fd_rindex_allocd));
-            CTX->fd_rindex = newarray;
-            CTX->fd_rindex_allocd = maxfd;
+            memset(newarray + poller->fd_rindex_allocd, 0,
+                   sizeof(int) * (maxfd - poller->fd_rindex_allocd));
+            poller->fd_rindex = newarray;
+            poller->fd_rindex_allocd = maxfd;
         }
     }
 
     int used = 0;
-    LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
-        if (!efd->events)
-            continue;
-        if (used < *nfds_io) {
-            fds[used].fd = efd->fd;
-            fds[used].events = efd->events;
-            fds[used].revents = 0;
-            assert(efd->fd < CTX->fd_rindex_allocd);
-            CTX->fd_rindex[efd->fd] = used;
-        }
-        used++;
-    }
+
+#define REQUIRE_FD(req_fd, req_events, efd) do{                 \
+        if ((req_events)) {                                     \
+            if (used < *nfds_io) {                              \
+                fds[used].fd = (req_fd);                        \
+                fds[used].events = (req_events);                \
+                fds[used].revents = 0;                          \
+                assert((req_fd) < poller->fd_rindex_allocd);    \
+                poller->fd_rindex[(req_fd)] = used;             \
+            }                                                   \
+            used++;                                             \
+        }                                                       \
+    }while(0)
+
+    LIBXL_LIST_FOREACH(efd, &CTX->efds, entry)
+        REQUIRE_FD(efd->fd, efd->events, efd);
+
+    REQUIRE_FD(poller->wakeup_pipe[0], POLLIN, 0);
+
+#undef REQUIRE_FD
+
     rc = used <= *nfds_io ? 0 : ERROR_BUFFERFULL;
 
     *nfds_io = used;
@@ -599,22 +608,23 @@ int libxl_osevent_beforepoll(libxl_ctx *ctx, int *nfds_io,
 {
     EGC_INIT(ctx);
     CTX_LOCK;
-    int rc = beforepoll_internal(gc, nfds_io, fds, timeout_upd, now);
+    int rc = beforepoll_internal(gc, &ctx->poller_app,
+                                 nfds_io, fds, timeout_upd, now);
     CTX_UNLOCK;
     EGC_FREE;
     return rc;
 }
 
-static int afterpoll_check_fd(libxl_ctx *ctx,
+static int afterpoll_check_fd(libxl__poller *poller,
                               const struct pollfd *fds, int nfds,
                               int fd, int events)
     /* returns mask of events which were requested and occurred */
 {
-    if (fd >= ctx->fd_rindex_allocd)
+    if (fd >= poller->fd_rindex_allocd)
         /* added after we went into poll, have to try again */
         return 0;
 
-    int slot = ctx->fd_rindex[fd];
+    int slot = poller->fd_rindex[fd];
 
     if (slot >= nfds)
         /* stale slot entry; again, added afterwards */
@@ -630,22 +640,31 @@ static int afterpoll_check_fd(libxl_ctx *ctx,
     return revents;
 }
 
-static void afterpoll_internal(libxl__egc *egc,
+static void afterpoll_internal(libxl__egc *egc, libxl__poller *poller,
                                int nfds, const struct pollfd *fds,
                                struct timeval now)
 {
     EGC_GC;
     libxl__ev_fd *efd;
 
+
     LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
         if (!efd->events)
             continue;
 
-        int revents = afterpoll_check_fd(CTX,fds,nfds, efd->fd,efd->events);
+        int revents = afterpoll_check_fd(poller,fds,nfds, efd->fd,efd->events);
         if (revents)
             efd->func(egc, efd, efd->fd, efd->events, revents);
     }
 
+    if (afterpoll_check_fd(poller,fds,nfds, poller->wakeup_pipe[0],POLLIN)) {
+        char buf[256];
+        int r = read(poller->wakeup_pipe[0], buf, sizeof(buf));
+        if (r < 0)
+            if (errno != EINTR && errno != EWOULDBLOCK)
+                LIBXL__EVENT_DISASTER(egc, "read wakeup", errno, 0);
+    }
+
     for (;;) {
         libxl__ev_time *etime = LIBXL_TAILQ_FIRST(&CTX->etimes);
         if (!etime)
@@ -667,7 +686,7 @@ void libxl_osevent_afterpoll(libxl_ctx *ctx, int nfds, 
const struct pollfd *fds,
 {
     EGC_INIT(ctx);
     CTX_LOCK;
-    afterpoll_internal(egc, nfds, fds, now);
+    afterpoll_internal(egc, &ctx->poller_app, nfds, fds, now);
     CTX_UNLOCK;
     EGC_FREE;
 }
@@ -790,7 +809,10 @@ void libxl__event_occurred(libxl__egc *egc, libxl_event 
*event)
         LIBXL_TAILQ_INSERT_TAIL(&egc->occurred_for_callback, event, link);
         return;
     } else {
+        libxl__poller *poller;
         LIBXL_TAILQ_INSERT_TAIL(&CTX->occurred, event, link);
+        LIBXL_LIST_FOREACH(poller, &CTX->pollers_event, entry)
+            libxl__poller_wakeup(egc, poller);
     }
 }
 
@@ -858,7 +880,94 @@ int libxl_event_check(libxl_ctx *ctx, libxl_event 
**event_r,
     return rc;
 }
 
-static int eventloop_iteration(libxl__egc *egc) {
+/*
+ * Manipulation of pollers
+ */
+
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p)
+{
+    int r, rc;
+    p->fd_polls = 0;
+    p->fd_rindex = 0;
+
+    r = pipe(p->wakeup_pipe);
+    if (r) {
+        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot create poller pipe");
+        rc = ERROR_FAIL;
+        goto out;
+    }
+
+    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[0], 1);
+    if (rc) goto out;
+
+    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[1], 1);
+    if (rc) goto out;
+
+    return 0;
+
+ out:
+    libxl__poller_dispose(p);
+    return rc;
+}
+
+void libxl__poller_dispose(libxl__poller *p)
+{
+    if (p->wakeup_pipe[1] > 0) close(p->wakeup_pipe[1]);
+    if (p->wakeup_pipe[0] > 0) close(p->wakeup_pipe[0]);
+    free(p->fd_polls);
+    free(p->fd_rindex);
+}
+
+libxl__poller *libxl__poller_get(libxl_ctx *ctx)
+{
+    /* must be called with ctx locked */
+    int rc;
+
+    libxl__poller *p = LIBXL_LIST_FIRST(&ctx->pollers_idle);
+    if (p)
+        return p;
+
+    p = malloc(sizeof(*p));
+    if (!p) {
+        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot allocate poller");
+        return 0;
+    }
+    memset(p, 0, sizeof(*p));
+
+    rc = libxl__poller_init(ctx, p);
+    if (rc) return NULL;
+
+    return p;
+}
+
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p)
+{
+    LIBXL_LIST_INSERT_HEAD(&ctx->pollers_idle, p, entry);
+}
+
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p)
+{
+    static const char buf[1] = "";
+
+    for (;;) {
+        int r = write(p->wakeup_pipe[1], buf, 1);
+        if (r==1) return;
+        assert(r==-1);
+        if (errno == EINTR) continue;
+        if (errno == EWOULDBLOCK) return;
+        LIBXL__EVENT_DISASTER(egc, "cannot poke watch pipe", errno, 0);
+        return;
+    }
+}
+
+/*
+ * Main event loop iteration
+ */
+
+static int eventloop_iteration(libxl__egc *egc, libxl__poller *poller) {
+    /* The CTX must be locked EXACTLY ONCE so that this function
+     * can unlock it when it polls.
+     */
     EGC_GC;
     int rc;
     struct timeval now;
@@ -871,23 +980,27 @@ static int eventloop_iteration(libxl__egc *egc) {
     int timeout;
 
     for (;;) {
-        int nfds = CTX->fd_polls_allocd;
+        int nfds = poller->fd_polls_allocd;
         timeout = -1;
-        rc = beforepoll_internal(gc, &nfds, CTX->fd_polls, &timeout, now);
+        rc = beforepoll_internal(gc, poller, &nfds, poller->fd_polls,
+                                 &timeout, now);
         if (!rc) break;
         if (rc != ERROR_BUFFERFULL) goto out;
 
         struct pollfd *newarray =
             (nfds > INT_MAX / sizeof(struct pollfd) / 2) ? 0 :
-            realloc(CTX->fd_polls, sizeof(*newarray) * nfds);
+            realloc(poller->fd_polls, sizeof(*newarray) * nfds);
 
         if (!newarray) { rc = ERROR_NOMEM; goto out; }
 
-        CTX->fd_polls = newarray;
-        CTX->fd_polls_allocd = nfds;
+        poller->fd_polls = newarray;
+        poller->fd_polls_allocd = nfds;
     }
 
-    rc = poll(CTX->fd_polls, CTX->fd_polls_allocd, timeout);
+    CTX_UNLOCK;
+    rc = poll(poller->fd_polls, poller->fd_polls_allocd, timeout);
+    CTX_LOCK;
+
     if (rc < 0) {
         if (errno == EINTR)
             return 0; /* will go round again if caller requires */
@@ -900,7 +1013,8 @@ static int eventloop_iteration(libxl__egc *egc) {
     rc = libxl__gettimeofday(gc, &now);
     if (rc) goto out;
 
-    afterpoll_internal(egc, CTX->fd_polls_allocd, CTX->fd_polls, now);
+    afterpoll_internal(egc, poller,
+                       poller->fd_polls_allocd, poller->fd_polls, now);
 
     CTX_UNLOCK;
 
@@ -914,15 +1028,19 @@ int libxl_event_wait(libxl_ctx *ctx, libxl_event 
**event_r,
                      libxl_event_predicate *pred, void *pred_user)
 {
     int rc;
+    libxl__poller *poller = NULL;
 
     EGC_INIT(ctx);
     CTX_LOCK;
 
+    poller = libxl__poller_get(ctx);
+    if (!poller) { rc = ERROR_FAIL; goto out; }
+
     for (;;) {
         rc = event_check_internal(egc, event_r, typemask, pred, pred_user);
         if (rc != ERROR_NOT_READY) goto out;
 
-        rc = eventloop_iteration(egc);
+        rc = eventloop_iteration(egc, poller);
         if (rc) goto out;
 
         /* we unlock and cleanup the egc each time we go through this loop,
@@ -936,6 +1054,8 @@ int libxl_event_wait(libxl_ctx *ctx, libxl_event **event_r,
     }
 
  out:
+    libxl__poller_put(ctx, poller);
+
     CTX_UNLOCK;
     EGC_FREE;
     return rc;
diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
index 8e80f24..d1b96c1 100644
--- a/tools/libxl/libxl_internal.h
+++ b/tools/libxl/libxl_internal.h
@@ -207,6 +207,33 @@ struct libxl__evgen_disk_eject {
 _hidden void
 libxl__evdisable_disk_eject(libxl__gc*, libxl_evgen_disk_eject*);
 
+typedef struct libxl__poller libxl__poller;
+struct libxl__poller {
+    /*
+     * These are used to allow other threads to wake up a thread which
+     * may be stuck in poll, because whatever it was waiting for
+     * hadn't happened yet.  Threads which generate events will write
+     * a byte to each pipe.  A thread which is waiting will empty its
+     * own pipe, and put its poller on the pollers_event list, before
+     * releasing the ctx lock and going into poll; when it comes out
+     * of poll it will take the poller off the pollers_event list.
+     *
+     * When a thread is done with a poller it should put it onto
+     * pollers_idle, where it can be reused later.
+     *
+     * The "poller_app" is never idle, but is sometimes on
+     * pollers_event.
+     */
+    LIBXL_LIST_ENTRY(libxl__poller) entry;
+
+    struct pollfd *fd_polls;
+    int fd_polls_allocd;
+
+    int fd_rindex_allocd;
+    int *fd_rindex; /* see libxl_osevent_beforepoll */
+
+    int wakeup_pipe[2]; /* 0 means no fd allocated */
+};
 
 struct libxl__ctx {
     xentoollog_logger *lg;
@@ -237,10 +264,9 @@ struct libxl__ctx {
       /* See the comment for OSEVENT_HOOK_INTERN in libxl_event.c
        * for restrictions on the use of the osevent fields. */
 
-    struct pollfd *fd_polls;
-    int fd_polls_allocd;
-    int fd_rindex_allocd;
-    int *fd_rindex; /* see libxl_osevent_beforepoll */
+    libxl__poller poller_app; /* libxl_osevent_beforepoll and _afterpoll */
+    LIBXL_LIST_HEAD(, libxl__poller) pollers_event, pollers_idle;
+
     LIBXL_LIST_HEAD(, libxl__ev_fd) efds;
     LIBXL_TAILQ_HEAD(, libxl__ev_time) etimes;
 
@@ -526,6 +552,22 @@ _hidden void libxl__event_disaster(libxl__egc*, const char 
*msg, int errnoval,
     libxl__event_disaster(egc, msg, errnoval, type, __FILE__,__LINE__,__func__)
 
 
+/* Fills in, or disposes of, the resources held by, a poller whose
+ * space the caller has allocated.  ctx must be locked. */
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p);
+void libxl__poller_dispose(libxl__poller *p);
+
+/* Obtain a fresh poller from malloc or the idle list, and put it
+ * away again afterwards.  _get can fail, returning NULL.
+ * ctx must be locked. */
+libxl__poller *libxl__poller_get(libxl_ctx *ctx);
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p);
+
+/* Notifies whoever is polling using p that they should wake up.
+ * ctx must be locked. */
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p);
+
+
 /* from xl_dom */
 _hidden libxl_domain_type libxl__domain_type(libxl__gc *gc, uint32_t domid);
 _hidden int libxl__domain_shutdown_reason(libxl__gc *gc, uint32_t domid);
-- 
1.7.2.5


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.