[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH 5/9] libxl: Permit multithreaded event waiting



On Fri, 2012-01-13 at 19:25 +0000, Ian Jackson wrote:
> Previously, the context would be locked whenever we were waiting in
> libxl's own call to poll (waiting for operating system events).
> 
> This would mean that multiple simultaneous calls to libxl_event_wait
> in different threads with different parameters would not work
> properly.
> 
> If we simply unlock the context, it would be possible for another
> thread to discover the occurrence of the event we were waiting for,
> without us even waking up, and we would remain in poll.  So we need a
> way to wake up other threads: a pipe, one for each thread in poll.
> 
> We also need to move some variables from globals in the ctx to be
> per-polling-thread.

I don't think this relates to this patch, just that the mention of
multithreaded waiting brought it to mind. What are the intended
semantics of two calls to libxl_event_wait with overlapping event masks?

Do we expect that the caller must have called the appropriate evenables
twice such that both waits get an event (possibly discriminate via the
predicate)?

Presumably we want to ensure that one of the waits doesn't sleep for
ever.

How does this interact with events generated via the hooks mechanism? Do
we always deliver to the explicit wait in preference?

> 
> Signed-off-by: Ian Jackson <ian.jackson@xxxxxxxxxxxxx>
> ---
>  tools/libxl/libxl.c          |   18 +++-
>  tools/libxl/libxl_event.c    |  196 
> ++++++++++++++++++++++++++++++++++--------
>  tools/libxl/libxl_internal.h |   50 ++++++++++-
>  3 files changed, 218 insertions(+), 46 deletions(-)
[...]
> diff --git a/tools/libxl/libxl_event.c b/tools/libxl/libxl_event.c
> index 621a7cc..82889f6 100644
> --- a/tools/libxl/libxl_event.c
> +++ b/tools/libxl/libxl_event.c

> @@ -542,30 +542,39 @@ static int beforepoll_internal(libxl__gc *gc, int 
> *nfds_io,
>                  maxfd = efd->fd + 1;
>          }
>          /* make sure our array is as big as *nfds_io */
> -        if (CTX->fd_rindex_allocd < maxfd) {
> +        if (poller->fd_rindex_allocd < maxfd) {
>              assert(maxfd < INT_MAX / sizeof(int) / 2);
> -            int *newarray = realloc(CTX->fd_rindex, sizeof(int) * maxfd);
> +            int *newarray = realloc(poller->fd_rindex, sizeof(int) * maxfd);
>              if (!newarray) { rc = ERROR_NOMEM; goto out; }
> -            memset(newarray + CTX->fd_rindex_allocd, 0,
> -                   sizeof(int) * (maxfd - CTX->fd_rindex_allocd));
> -            CTX->fd_rindex = newarray;
> -            CTX->fd_rindex_allocd = maxfd;
> +            memset(newarray + poller->fd_rindex_allocd, 0,
> +                   sizeof(int) * (maxfd - poller->fd_rindex_allocd));
> +            poller->fd_rindex = newarray;
> +            poller->fd_rindex_allocd = maxfd;
>          }
>      }
> 
>      int used = 0;
[...]
> +
> +#define REQUIRE_FD(req_fd, req_events, efd) do{                 \
> +        if ((req_events)) {                                     \
> +            if (used < *nfds_io) {                              \
> +                fds[used].fd = (req_fd);                        \
> +                fds[used].events = (req_events);                \
> +                fds[used].revents = 0;                          \
> +                assert((req_fd) < poller->fd_rindex_allocd);    \
> +                poller->fd_rindex[(req_fd)] = used;             \
> +            }                                                   \
> +            used++;                                             \

Used is expected to be in the calling context? IOC -- this is defined
temporarily within a function, the diff context (which I've now trimmed)
confused me.

Does this actually add anything above doing
        LIBXL_LIST_FOREACH(...) {
                /* the body of require_fd */
        }
?

> +        }                                                       \
> +    }while(0)
> +
> +    LIBXL_LIST_FOREACH(efd, &CTX->efds, entry)
> +        REQUIRE_FD(efd->fd, efd->events, efd);
> +
> +    REQUIRE_FD(poller->wakeup_pipe[0], POLLIN, 0);
> +
> +#undef REQUIRE_FD
> +
>      rc = used <= *nfds_io ? 0 : ERROR_BUFFERFULL;
> 
>      *nfds_io = used;
[...]
> @@ -630,22 +640,31 @@ static int afterpoll_check_fd(libxl_ctx *ctx,
>      return revents;
>  }
> 
> -static void afterpoll_internal(libxl__egc *egc,
> +static void afterpoll_internal(libxl__egc *egc, libxl__poller *poller,
>                                 int nfds, const struct pollfd *fds,
>                                 struct timeval now)
>  {
>      EGC_GC;
>      libxl__ev_fd *efd;
> 
> +
>      LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
>          if (!efd->events)
>              continue;
> 
> -        int revents = afterpoll_check_fd(CTX,fds,nfds, efd->fd,efd->events);
> +        int revents = afterpoll_check_fd(poller,fds,nfds, 
> efd->fd,efd->events);
>          if (revents)
>              efd->func(egc, efd, efd->fd, efd->events, revents);
>      }
> 
> +    if (afterpoll_check_fd(poller,fds,nfds, poller->wakeup_pipe[0],POLLIN)) {
> +        char buf[256];

Is it (theoretically) possible to have more than 256 events pending?

> +        int r = read(poller->wakeup_pipe[0], buf, sizeof(buf));
> +        if (r < 0)
> +            if (errno != EINTR && errno != EWOULDBLOCK)
> +                LIBXL__EVENT_DISASTER(egc, "read wakeup", errno, 0);
> +    }
> +
>      for (;;) {
>          libxl__ev_time *etime = LIBXL_TAILQ_FIRST(&CTX->etimes);
>          if (!etime)
[...]
> @@ -858,7 +880,94 @@ int libxl_event_check(libxl_ctx *ctx, libxl_event 
> **event_r,
>      return rc;
>  }
> 
> -static int eventloop_iteration(libxl__egc *egc) {
> +/*
> + * Manipulation of pollers
> + */
> +
> +int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p)
> +{
> +    int r, rc;
> +    p->fd_polls = 0;
> +    p->fd_rindex = 0;
> +
> +    r = pipe(p->wakeup_pipe);
> +    if (r) {
> +        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot create poller pipe");
> +        rc = ERROR_FAIL;
> +        goto out;
> +    }
> +
> +    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[0], 1);
> +    if (rc) goto out;
> +
> +    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[1], 1);
> +    if (rc) goto out;
> +
> +    return 0;
> +
> + out:
> +    libxl__poller_dispose(p);

The dispose function checks for fd > 0 before closing but if you take
the first goto out (pipe failed) then wake_pipe[{0,1}] are still
undefined?

> +    return rc;
> +}
> +
> +void libxl__poller_dispose(libxl__poller *p)
> +{
> +    if (p->wakeup_pipe[1] > 0) close(p->wakeup_pipe[1]);
> +    if (p->wakeup_pipe[0] > 0) close(p->wakeup_pipe[0]);

Strictly speaking 0 is a valid value for an open fd.

I once saw a bug (in gzip iirc) where, because stdin had inadvertently
been closed, a dup() of some sort returned 0 but the subsequent checks
were for >0 rather than >=. Slightly unusual case but it took an age
(for someone else...) to debug.

> +    free(p->fd_polls);
> +    free(p->fd_rindex);
> +}
> +
> +libxl__poller *libxl__poller_get(libxl_ctx *ctx)
> +{
> +    /* must be called with ctx locked */
> +    int rc;
> +
> +    libxl__poller *p = LIBXL_LIST_FIRST(&ctx->pollers_idle);
> +    if (p)
> +        return p;
> +
> +    p = malloc(sizeof(*p));
> +    if (!p) {
> +        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot allocate poller");
> +        return 0;
> +    }
> +    memset(p, 0, sizeof(*p));

Hrm, I guess this is where p->wakeup_pipe gets initialised. Likewise the
call to _init in ctx_alloc is preceded by a memset. Not entirely obvious
but I guess its ok. (initialising to -1 in _init would solve both this
and the 0-is-a-valid-fd)

> +
> +    rc = libxl__poller_init(ctx, p);
> +    if (rc) return NULL;
> +
> +    return p;
> +}
> +
> +void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p)
> +{
> +    LIBXL_LIST_INSERT_HEAD(&ctx->pollers_idle, p, entry);
> +}
> +
> +void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p)
> +{
> +    static const char buf[1] = "";
> +
> +    for (;;) {
> +        int r = write(p->wakeup_pipe[1], buf, 1);
> +        if (r==1) return;
> +        assert(r==-1);

There's no possibility of r == 0 here?

> +        if (errno == EINTR) continue;
> +        if (errno == EWOULDBLOCK) return;

write(2) says that both EWOULDBLOCK and EAGAIN are valid returns for a
non-blocking fd and may have different values so apps should check for
both.

> +        LIBXL__EVENT_DISASTER(egc, "cannot poke watch pipe", errno, 0);
> +        return;
> +    }
> +}
> +
> +/*
> + * Main event loop iteration
> + */
> +
> +static int eventloop_iteration(libxl__egc *egc, libxl__poller *poller) {
> +    /* The CTX must be locked EXACTLY ONCE so that this function
> +     * can unlock it when it polls.
> +     */
>      EGC_GC;
>      int rc;
>      struct timeval now;
[...]
> @@ -900,7 +1013,8 @@ static int eventloop_iteration(libxl__egc *egc) {
>      rc = libxl__gettimeofday(gc, &now);
>      if (rc) goto out;
> 
> -    afterpoll_internal(egc, CTX->fd_polls_allocd, CTX->fd_polls, now);
> +    afterpoll_internal(egc, poller,
> +                       poller->fd_polls_allocd, poller->fd_polls, now);

Can this function be simplified to take just (egc, poller, now)?
Likewise beforepoll_internal?

> 
>      CTX_UNLOCK;
> 

> diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
> index edb73eb..53d2462 100644
> --- a/tools/libxl/libxl_internal.h
> +++ b/tools/libxl/libxl_internal.h
> @@ -205,6 +205,33 @@ struct libxl__evgen_disk_eject {
>  _hidden void
>  libxl__evdisable_disk_eject(libxl__gc*, libxl_evgen_disk_eject*);
> 
> +typedef struct libxl__poller libxl__poller;
> +struct libxl__poller {
> +    /*
> +     * These are used to allow other threads to wake up a thread which
> +     * may be stuck in poll, because whatever it was waiting for
> +     * hadn't happened yet.  Threads which generate events will write
> +     * a byte to each pipe.  A thread which is waiting will empty its
> +     * own pipe, and put its poller on the pollers_event list, before
> +     * releasing the ctx lock and going into poll; when it comes out
> +     * of poll it will take the poller off the pollers_event list.
> +     *
> +     * When a thread is done with a poller it should put it onto
> +     * pollers_idle, where it can be reused later.
> +     *
> +     * The "poller_app" is never idle, but is sometimes on
> +     * pollers_event.
> +     */
> +    LIBXL_LIST_ENTRY(libxl__poller) entry;
> +
> +    struct pollfd *fd_polls;
> +    int fd_polls_allocd;
> +
> +    int fd_rindex_allocd;
> +    int *fd_rindex; /* see libxl_osevent_beforepoll */
> +
> +    int wakeup_pipe[2]; /* 0 means no fd allocated */

Or does it ;-)
> +};
> 
>  struct libxl__ctx {
>      xentoollog_logger *lg;
> @@ -235,10 +262,9 @@ struct libxl__ctx {
>        /* See the comment for OSEVENT_HOOK_INTERN in libxl_event.c
>         * for restrictions on the use of the osevent fields. */
> 
> -    struct pollfd *fd_polls;
> -    int fd_polls_allocd;
> -    int fd_rindex_allocd;
> -    int *fd_rindex; /* see libxl_osevent_beforepoll */
> +    libxl__poller poller_app; /* libxl_osevent_beforepoll and _afterpoll */

This presumably means that an app can only use before/afterpoll from one
thread at a time. Hardly an onerous requirement but worth noting
perhaps?

Could also check that pooler_app.entry is not currently on a list?

> +    LIBXL_LIST_HEAD(, libxl__poller) pollers_event, pollers_idle;
> +
>      LIBXL_LIST_HEAD(, libxl__ev_fd) efds;
>      LIBXL_TAILQ_HEAD(, libxl__ev_time) etimes;
> 
> @@ -524,6 +550,22 @@ _hidden void libxl__event_disaster(libxl__egc*, const 
> char *msg, int errnoval,
>      libxl__event_disaster(egc, msg, errnoval, type, 
> __FILE__,__LINE__,__func__)
> 
> 
> +/* Fills in, or disposes of, the resources held by, a poller whose

That third comma read weirdly to me.

> + * space the caller has allocated.  ctx must be locked. */

init doesn't appear to do anything which needs a lock?

> +int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p);
> +void libxl__poller_dispose(libxl__poller *p);
> +
> +/* Obtain a fresh poller from malloc or the idle list, and put it
> + * away again afterwards.  _get can fail, returning NULL.
> + * ctx must be locked. */
> +libxl__poller *libxl__poller_get(libxl_ctx *ctx);
> +void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p);
> +
> +/* Notifies whoever is polling using p that they should wake up.
> + * ctx must be locked. */
> +void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p);
> +

Ian.



_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.