WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-changelog

[Xen-changelog] Make libxenstore thread-safe. It also spawns an internal

To: xen-changelog@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-changelog] Make libxenstore thread-safe. It also spawns an internal
From: Xen patchbot -unstable <patchbot-unstable@xxxxxxxxxxxxxxxxxxx>
Date: Sat, 08 Oct 2005 18:32:11 +0000
Delivery-date: Sat, 08 Oct 2005 18:29:45 +0000
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
List-help: <mailto:xen-changelog-request@lists.xensource.com?subject=help>
List-id: BK change log <xen-changelog.lists.xensource.com>
List-post: <mailto:xen-changelog@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=unsubscribe>
Reply-to: xen-devel@xxxxxxxxxxxxxxxxxxx
Sender: xen-changelog-bounces@xxxxxxxxxxxxxxxxxxx
# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 2144de6eabcc7fc6272a8ca088008ef92c05aa6b
# Parent  e69413dca6844c87885121a7360fa7c2b11cdea9
Make libxenstore thread-safe. It also spawns an internal
thread to read messages from the comms channel.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/lowlevel/xs/xs.c
--- a/tools/python/xen/lowlevel/xs/xs.c Sat Oct  8 09:22:01 2005
+++ b/tools/python/xen/lowlevel/xs/xs.c Sat Oct  8 18:19:27 2005
@@ -775,39 +775,6 @@
     return val;
 }
 
-#define xspy_shutdown_doc "\n"                 \
-       "Shutdown the xenstore daemon.\n"       \
-       "\n"                                    \
-       "Returns None on success.\n"            \
-       "Raises RuntimeError on error.\n"       \
-       "\n"
-
-static PyObject *xspy_shutdown(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwd_spec[] = { NULL };
-    static char *arg_spec = "";
-
-    struct xs_handle *xh = xshandle(self);
-    PyObject *val = NULL;
-    int xsval = 0;
-
-    if (!xh)
-        goto exit;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
-        goto exit;
-    Py_BEGIN_ALLOW_THREADS
-    xsval = xs_shutdown(xh);
-    Py_END_ALLOW_THREADS
-    if (!xsval) {
-        PyErr_SetFromErrno(PyExc_RuntimeError);
-        goto exit;
-    }
-    Py_INCREF(Py_None);
-    val = Py_None;
- exit:
-    return val;
-}
-
 #define xspy_get_domain_path_doc "\n"                  \
        "Return store path of domain.\n"                \
        " domid [int]: domain id\n"                     \
@@ -846,28 +813,6 @@
     }
     val = PyString_FromString(xsval);
     free(xsval);
- exit:
-    return val;
-}
-
-#define xspy_fileno_doc "\n"                                   \
-       "Get the file descriptor of the xenstore socket.\n"     \
-       "Allows an xs object to be passed to select().\n"       \
-       "\n"                                                    \
-       "Returns: [int] file descriptor.\n"                     \
-       "\n"
-
-static PyObject *xspy_fileno(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwd_spec[] = { NULL };
-    static char *arg_spec = "";
-
-    struct xs_handle *xh = xshandle(self);
-    PyObject *val = NULL;
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
-        goto exit;
-    val = PyInt_FromLong((xh ? xs_fileno(xh) : -1));
  exit:
     return val;
 }
@@ -895,9 +840,7 @@
      XSPY_METH(introduce_domain),
      XSPY_METH(release_domain),
      XSPY_METH(close),
-     XSPY_METH(shutdown),
      XSPY_METH(get_domain_path),
-     XSPY_METH(fileno),
      { /* Terminator. */ },
 };
 
diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/xend/xenstore/xsutil.py
--- a/tools/python/xen/xend/xenstore/xsutil.py  Sat Oct  8 09:22:01 2005
+++ b/tools/python/xen/xend/xenstore/xsutil.py  Sat Oct  8 18:19:27 2005
@@ -7,14 +7,17 @@
 import threading
 from xen.lowlevel import xs
 
-handles = {}
+xs_lock = threading.Lock()
+xs_handle = None
 
-# XXX need to g/c handles from dead threads
 def xshandle():
-    if not handles.has_key(threading.currentThread()):
-        handles[threading.currentThread()] = xs.open()
-    return handles[threading.currentThread()]
-
+    global xs_handle, xs_lock
+    if not xs_handle:
+        xs_lock.acquire()
+        if not xs_handle:
+            xs_handle = xs.open()
+        xs_lock.release()
+    return xs_handle
 
 def IntroduceDomain(domid, page, port, path):
     return xshandle().introduce_domain(domid, page, port, path)
diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/xend/xenstore/xswatch.py
--- a/tools/python/xen/xend/xenstore/xswatch.py Sat Oct  8 09:22:01 2005
+++ b/tools/python/xen/xend/xenstore/xswatch.py Sat Oct  8 18:19:27 2005
@@ -12,7 +12,6 @@
 class xswatch:
 
     watchThread = None
-    threadcond = threading.Condition()
     xs = None
     xslock = threading.Lock()
     
@@ -21,43 +20,31 @@
         self.args = args
         self.kwargs = kwargs
         xswatch.watchStart()
-        xswatch.xslock.acquire()
         xswatch.xs.watch(path, self)
-        xswatch.xslock.release()
 
     def watchStart(cls):
-        cls.threadcond.acquire()
+        cls.xslock.acquire()
         if cls.watchThread:
-            cls.threadcond.release()
+            cls.xslock.release()
             return
+        # XXX: When we fix xenstored to have better watch semantics,
+        # this can change to shared xshandle(). Currently that would result
+        # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
+        cls.xs = xs.open()
         cls.watchThread = threading.Thread(name="Watcher",
                                            target=cls.watchMain)
         cls.watchThread.setDaemon(True)
         cls.watchThread.start()
-        while cls.xs == None:
-            cls.threadcond.wait()
-        cls.threadcond.release()
+        cls.xslock.release()
 
     watchStart = classmethod(watchStart)
 
     def watchMain(cls):
-        cls.threadcond.acquire()
-        cls.xs = xs.open()
-        cls.threadcond.notifyAll()
-        cls.threadcond.release()
         while True:
             try:
-                (fd, _1, _2) = select.select([ cls.xs ], [], [])
-                cls.xslock.acquire()
-                # reconfirm ready to read with lock
-                (fd, _1, _2) = select.select([ cls.xs ], [], [], 0.001)
-                if not cls.xs in fd:
-                    cls.xslock.release()
-                    continue
                 we = cls.xs.read_watch()
                 watch = we[1]
                 cls.xs.acknowledge_watch(watch)
-                cls.xslock.release()
             except RuntimeError, ex:
                 print ex
                 raise
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/Makefile
--- a/tools/xenstore/Makefile   Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/Makefile   Sat Oct  8 18:19:27 2005
@@ -34,7 +34,6 @@
 xenstored: xenstored_core.o xenstored_watch.o xenstored_domain.o 
xenstored_transaction.o xs_lib.o talloc.o utils.o tdb.o
        $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -o $@
 
-$(CLIENTS): libxenstore.so
 $(CLIENTS): xenstore-%: xenstore_%.o
        $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -L. -lxenstore -o $@
 
@@ -47,6 +46,7 @@
 xs_tdb_dump: xs_tdb_dump.o utils.o tdb.o talloc.o
        $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@
 
+xs_test xs_random xs_stress xs_crashme: LDFLAGS+=-lpthread
 xs_test: xs_test.o xs_lib.o utils.o
 xs_random: xs_random.o xs_test_lib.o xs_lib.o talloc.o utils.o
 xs_stress: xs_stress.o xs_test_lib.o xs_lib.o talloc.o utils.o
@@ -69,7 +69,7 @@
        $(COMPILE.c) -o $@ $<
 
 libxenstore.so: xs.opic xs_lib.opic
-       $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ 
$^
+       $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ 
$^ -lpthread
 
 clean: testsuite-clean
        rm -f *.o *.opic *.so
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/12readonly.test
--- a/tools/xenstore/testsuite/12readonly.test  Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/testsuite/12readonly.test  Sat Oct  8 18:19:27 2005
@@ -27,8 +27,6 @@
 setperm /test 100 NONE
 expect setperm failed: Permission denied
 setperm /test 100 NONE
-expect shutdown failed: Permission denied
-shutdown
 expect introduce failed: Permission denied
 introduce 1 100 7 /home
 
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/test.sh
--- a/tools/xenstore/testsuite/test.sh  Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/testsuite/test.sh  Sat Oct  8 18:19:27 2005
@@ -23,7 +23,8 @@
            cat testsuite/tmp/xenstored_errors
            return 1
        fi
-       echo shutdown | ./xs_test
+       kill $PID
+       sleep 1
        return 0
     else
        # In case daemon is wedged.
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c   Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xenstored_core.c   Sat Oct  8 18:19:27 2005
@@ -150,7 +150,6 @@
 {
        switch (type) {
        case XS_DEBUG: return "DEBUG";
-       case XS_SHUTDOWN: return "SHUTDOWN";
        case XS_DIRECTORY: return "DIRECTORY";
        case XS_READ: return "READ";
        case XS_GET_PERMS: return "GET_PERMS";
@@ -1082,17 +1081,6 @@
        case XS_SET_PERMS:
                do_set_perms(conn, in);
                break;
-
-       case XS_SHUTDOWN:
-               /* FIXME: Implement gentle shutdown too. */
-               /* Only tools can do this. */
-               if (conn->id != 0 || !conn->can_write) {
-                       send_error(conn, EACCES);
-                       break;
-               }
-               send_ack(conn, XS_SHUTDOWN);
-               /* Everything hangs off auto-free context, freed at exit. */
-               exit(0);
 
        case XS_DEBUG:
                if (streq(in->buffer, "print"))
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs.c
--- a/tools/xenstore/xs.c       Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs.c       Sat Oct  8 18:19:27 2005
@@ -32,82 +32,153 @@
 #include <stdint.h>
 #include <errno.h>
 #include <sys/ioctl.h>
+#include <pthread.h>
 #include "xs.h"
+#include "list.h"
 #include "utils.h"
 
-struct xs_handle
-{
+struct xs_stored_msg {
+       struct list_head list;
+       struct xsd_sockmsg hdr;
+       char *body;
+};
+
+struct xs_handle {
+       /* Communications channel to xenstore daemon. */
        int fd;
+
+       /*
+         * A read thread which pulls messages off the comms channel and
+         * signals waiters.
+         */
+       pthread_t read_thr;
+
+       /*
+         * A list of fired watch messages, protected by a mutex. Users can
+         * wait on the conditional variable until a watch is pending.
+         */
+       struct list_head watch_list;
+       pthread_mutex_t watch_mutex;
+       pthread_cond_t watch_condvar;
+
+       /* Clients can select() on this pipe to wait for a watch to fire. */
+       int watch_pipe[2];
+
+       /*
+         * A list of replies. Currently only one will ever be outstanding
+         * because we serialise requests. The requester can wait on the
+         * conditional variable for its response.
+         */
+       struct list_head reply_list;
+       pthread_mutex_t reply_mutex;
+       pthread_cond_t reply_condvar;
+
+       /* One request at a time. */
+       pthread_mutex_t request_mutex;
+
+       /* One transaction at a time. */
+       pthread_mutex_t transaction_mutex;
 };
 
-/* Get the socket from the store daemon handle.
- */
+static void *read_thread(void *arg);
+
 int xs_fileno(struct xs_handle *h)
 {
-       return h->fd;
-}
-
-static struct xs_handle *get_socket(const char *connect_to)
+       char c = 0;
+
+       pthread_mutex_lock(&h->watch_mutex);
+
+       if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
+               /* Kick things off if the watch list is already non-empty. */
+               if (!list_empty(&h->watch_list))
+                       while (write(h->watch_pipe[1], &c, 1) != 1)
+                               continue;
+       }
+
+       pthread_mutex_unlock(&h->watch_mutex);
+
+       return h->watch_pipe[0];
+}
+
+static int get_socket(const char *connect_to)
 {
        struct sockaddr_un addr;
        int sock, saved_errno;
-       struct xs_handle *h = NULL;
 
        sock = socket(PF_UNIX, SOCK_STREAM, 0);
        if (sock < 0)
-               return NULL;
+               return -1;
 
        addr.sun_family = AF_UNIX;
        strcpy(addr.sun_path, connect_to);
 
-       if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
-               h = malloc(sizeof(*h));
-               if (h) {
-                       h->fd = sock;
-                       return h;
-               }
-       }
-
+       if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
+               saved_errno = errno;
+               close(sock);
+               errno = saved_errno;
+               return -1;
+       }
+
+       return sock;
+}
+
+static int get_dev(const char *connect_to)
+{
+       return open(connect_to, O_RDWR);
+}
+
+static struct xs_handle *get_handle(const char *connect_to)
+{
+       struct stat buf;
+       struct xs_handle *h = NULL;
+       int fd = -1, saved_errno;
+
+       if (stat(connect_to, &buf) != 0)
+               goto error;
+
+       if (S_ISSOCK(buf.st_mode))
+               fd = get_socket(connect_to);
+       else
+               fd = get_dev(connect_to);
+
+       if (fd == -1)
+               goto error;
+
+       h = malloc(sizeof(*h));
+       if (h == NULL)
+               goto error;
+
+       h->fd = fd;
+
+       /* Watch pipe is allocated on demand in xs_fileno(). */
+       h->watch_pipe[0] = h->watch_pipe[1] = -1;
+
+       INIT_LIST_HEAD(&h->watch_list);
+       pthread_mutex_init(&h->watch_mutex, NULL);
+       pthread_cond_init(&h->watch_condvar, NULL);
+
+       INIT_LIST_HEAD(&h->reply_list);
+       pthread_mutex_init(&h->reply_mutex, NULL);
+       pthread_cond_init(&h->reply_condvar, NULL);
+
+       pthread_mutex_init(&h->request_mutex, NULL);
+       pthread_mutex_init(&h->transaction_mutex, NULL);
+
+       if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
+               goto error;
+
+       return h;
+
+ error:
        saved_errno = errno;
-       close(sock);
+       if (h != NULL)
+               free(h);
+       if (fd != -1)
+               close(fd);
        errno = saved_errno;
        return NULL;
 }
 
-static struct xs_handle *get_dev(const char *connect_to)
-{
-       int fd, saved_errno;
-       struct xs_handle *h;
-
-       fd = open(connect_to, O_RDWR);
-       if (fd < 0)
-               return NULL;
-
-       h = malloc(sizeof(*h));
-       if (h) {
-               h->fd = fd;
-               return h;
-       }
-
-       saved_errno = errno;
-       close(fd);
-       errno = saved_errno;
-       return NULL;
-}
-
-static struct xs_handle *get_handle(const char *connect_to)
-{
-       struct stat buf;
-
-       if (stat(connect_to, &buf) != 0)
-               return NULL;
-
-       if (S_ISSOCK(buf.st_mode))
-               return get_socket(connect_to);
-       else
-               return get_dev(connect_to);
-}
-
 struct xs_handle *xs_daemon_open(void)
 {
        return get_handle(xs_daemon_socket());
@@ -125,8 +196,39 @@
 
 void xs_daemon_close(struct xs_handle *h)
 {
-       if (h->fd >= 0)
-               close(h->fd);
+       struct xs_stored_msg *msg, *tmsg;
+
+       pthread_mutex_lock(&h->transaction_mutex);
+       pthread_mutex_lock(&h->request_mutex);
+       pthread_mutex_lock(&h->reply_mutex);
+       pthread_mutex_lock(&h->watch_mutex);
+
+       /* XXX FIXME: May leak an unpublished message buffer. */
+       pthread_cancel(h->read_thr);
+       pthread_join(h->read_thr, NULL);
+
+       list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
+               free(msg->body);
+               free(msg);
+       }
+
+       list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
+               free(msg->body);
+               free(msg);
+       }
+
+       pthread_mutex_unlock(&h->transaction_mutex);
+       pthread_mutex_unlock(&h->request_mutex);
+       pthread_mutex_unlock(&h->reply_mutex);
+       pthread_mutex_unlock(&h->watch_mutex);
+
+       if (h->watch_pipe[0] != -1) {
+               close(h->watch_pipe[0]);
+               close(h->watch_pipe[1]);
+       }
+
+       close(h->fd);
+
        free(h);
 }
 
@@ -169,31 +271,28 @@
 }
 
 /* Adds extra nul terminator, because we generally (always?) hold strings. */
-static void *read_reply(int fd, enum xsd_sockmsg_type *type, unsigned int *len)
-{
-       struct xsd_sockmsg msg;
-       void *ret;
-       int saved_errno;
-
-       if (!read_all(fd, &msg, sizeof(msg)))
-               return NULL;
-
-       ret = malloc(msg.len + 1);
-       if (!ret)
-               return NULL;
-
-       if (!read_all(fd, ret, msg.len)) {
-               saved_errno = errno;
-               free(ret);
-               errno = saved_errno;
-               return NULL;
-       }
-
-       *type = msg.type;
+static void *read_reply(
+       struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
+{
+       struct xs_stored_msg *msg;
+       char *body;
+
+       pthread_mutex_lock(&h->reply_mutex);
+       while (list_empty(&h->reply_list))
+               pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
+       msg = list_top(&h->reply_list, struct xs_stored_msg, list);
+       list_del(&msg->list);
+       assert(list_empty(&h->reply_list));
+       pthread_mutex_unlock(&h->reply_mutex);
+
+       *type = msg->hdr.type;
        if (len)
-               *len = msg.len;
-       ((char *)ret)[msg.len] = '\0';
-       return ret;
+               *len = msg->hdr.len;
+       body = msg->body;
+
+       free(msg);
+
+       return body;
 }
 
 /* Send message to xs, get malloc'ed reply.  NULL and set errno on error. */
@@ -217,6 +316,8 @@
        ignorepipe.sa_flags = 0;
        sigaction(SIGPIPE, &ignorepipe, &oldact);
 
+       pthread_mutex_lock(&h->request_mutex);
+
        if (!xs_write_all(h->fd, &msg, sizeof(msg)))
                goto fail;
 
@@ -224,14 +325,11 @@
                if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
                        goto fail;
 
-       /* Watches can have fired before reply comes: daemon detects
-        * and re-transmits, so we can ignore this. */
-       do {
-               free(ret);
-               ret = read_reply(h->fd, &msg.type, len);
-               if (!ret)
-                       goto fail;
-       } while (msg.type == XS_WATCH_EVENT);
+       ret = read_reply(h, &msg.type, len);
+       if (!ret)
+               goto fail;
+
+       pthread_mutex_unlock(&h->request_mutex);
 
        sigaction(SIGPIPE, &oldact, NULL);
        if (msg.type == XS_ERROR) {
@@ -252,6 +350,7 @@
 fail:
        /* We're in a bad state, so close fd. */
        saved_errno = errno;
+       pthread_mutex_unlock(&h->request_mutex);
        sigaction(SIGPIPE, &oldact, NULL);
 close_fd:
        close(h->fd);
@@ -449,39 +548,45 @@
  */
 char **xs_read_watch(struct xs_handle *h, unsigned int *num)
 {
-       struct xsd_sockmsg msg;
-       char **ret;
-       char *strings;
+       struct xs_stored_msg *msg;
+       char **ret, *strings, c = 0;
        unsigned int num_strings, i;
 
-       if (!read_all(h->fd, &msg, sizeof(msg)))
-               return NULL;
-
-       assert(msg.type == XS_WATCH_EVENT);
-       strings = malloc(msg.len);
-       if (!strings)
-               return NULL;
-
-       if (!read_all(h->fd, strings, msg.len)) {
-               free_no_errno(strings);
-               return NULL;
-       }
-
-       num_strings = xs_count_strings(strings, msg.len);
-
-       ret = malloc(sizeof(char*) * num_strings + msg.len);
+       pthread_mutex_lock(&h->watch_mutex);
+
+       /* Wait on the condition variable for a watch to fire. */
+       while (list_empty(&h->watch_list))
+               pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
+       msg = list_top(&h->watch_list, struct xs_stored_msg, list);
+       list_del(&msg->list);
+
+       /* Clear the pipe token if there are no more pending watches. */
+       if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
+               while (read(h->watch_pipe[0], &c, 1) != 1)
+                       continue;
+
+       pthread_mutex_unlock(&h->watch_mutex);
+
+       assert(msg->hdr.type == XS_WATCH_EVENT);
+
+       strings     = msg->body;
+       num_strings = xs_count_strings(strings, msg->hdr.len);
+
+       ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
        if (!ret) {
                free_no_errno(strings);
+               free_no_errno(msg);
                return NULL;
        }
 
        ret[0] = (char *)(ret + num_strings);
-       memcpy(ret[0], strings, msg.len);
+       memcpy(ret[0], strings, msg->hdr.len);
+
        free(strings);
-
-       for (i = 1; i < num_strings; i++) {
+       free(msg);
+
+       for (i = 1; i < num_strings; i++)
                ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
-       }
 
        *num = num_strings;
 
@@ -519,6 +624,7 @@
  */
 bool xs_transaction_start(struct xs_handle *h)
 {
+       pthread_mutex_lock(&h->transaction_mutex);
        return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
 }
 
@@ -530,12 +636,18 @@
 bool xs_transaction_end(struct xs_handle *h, bool abort)
 {
        char abortstr[2];
+       bool rc;
 
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
-       return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+       
+       rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+
+       pthread_mutex_unlock(&h->transaction_mutex);
+
+       return rc;
 }
 
 /* Introduce a new domain.
@@ -584,18 +696,6 @@
        return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
 }
 
-bool xs_shutdown(struct xs_handle *h)
-{
-       bool ret = xs_bool(xs_single(h, XS_SHUTDOWN, "", NULL));
-       if (ret) {
-               char c;
-               /* Wait for it to actually shutdown. */
-               while ((read(h->fd, &c, 1) < 0) && (errno == EINTR))
-                       continue;
-       }
-       return ret;
-}
-
 /* Only useful for DEBUG versions */
 char *xs_debug_command(struct xs_handle *h, const char *cmd,
                       void *data, unsigned int len)
@@ -609,3 +709,75 @@
 
        return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
 }
+
+static void *read_thread(void *arg)
+{
+       struct xs_handle *h = arg;
+       struct xs_stored_msg *msg = NULL;
+       char *body = NULL;
+
+       for (;;) {
+               msg = NULL;
+               body = NULL;
+
+               /* Allocate message structure and read the message header. */
+               msg = malloc(sizeof(*msg));
+               if (msg == NULL)
+                       goto error;
+               if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
+                       goto error;
+
+               /* Allocate and read the message body. */
+               body = msg->body = malloc(msg->hdr.len + 1);
+               if (body == NULL)
+                       goto error;
+               if (!read_all(h->fd, body, msg->hdr.len))
+                       goto error;
+               body[msg->hdr.len] = '\0';
+
+               if (msg->hdr.type == XS_WATCH_EVENT) {
+                       pthread_mutex_lock(&h->watch_mutex);
+
+                       /* Kick users out of their select() loop. */
+                       if (list_empty(&h->watch_list) &&
+                           (h->watch_pipe[1] != -1))
+                               while (write(h->watch_pipe[1], body, 1) != 1)
+                                       continue;
+
+                       list_add_tail(&msg->list, &h->watch_list);
+                       pthread_cond_signal(&h->watch_condvar);
+
+                       pthread_mutex_unlock(&h->watch_mutex);
+               } else {
+                       pthread_mutex_lock(&h->reply_mutex);
+
+                       /* There should only ever be one response pending! */
+                       if (!list_empty(&h->reply_list)) {
+                               pthread_mutex_unlock(&h->reply_mutex);
+                               goto error;
+                       }
+
+                       list_add_tail(&msg->list, &h->reply_list);
+                       pthread_cond_signal(&h->reply_condvar);
+
+                       pthread_mutex_unlock(&h->reply_mutex);
+               }
+       }
+
+ error:
+       if (body != NULL)
+               free(body);
+       if (msg != NULL)
+               free(msg);
+       return NULL;
+}
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs.h
--- a/tools/xenstore/xs.h       Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs.h       Sat Oct  8 18:19:27 2005
@@ -141,7 +141,4 @@
 char *xs_debug_command(struct xs_handle *h, const char *cmd,
                       void *data, unsigned int len);
 
-/* Shut down the daemon. */
-bool xs_shutdown(struct xs_handle *h);
-
 #endif /* _XS_H */
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs_random.c
--- a/tools/xenstore/xs_random.c        Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs_random.c        Sat Oct  8 18:19:27 2005
@@ -879,20 +879,11 @@
 static void cleanup_xs_ops(void)
 {
        char *cmd;
+
        if (daemon_pid) {
-               struct xs_handle *h;
-               h = xs_daemon_open();
-               if (h) {
-                       if (xs_shutdown(h)) {
-                               waitpid(daemon_pid, NULL, 0);
-                               daemon_pid = 0;
-                       }
-                       xs_daemon_close(h);
-               }
-               if (daemon_pid) {
-                       kill(daemon_pid, SIGTERM);
-                       waitpid(daemon_pid, NULL, 0);
-               }
+               kill(daemon_pid, SIGTERM);
+               waitpid(daemon_pid, NULL, 0);
+               daemon_pid = 0;
        }
        
        cmd = talloc_asprintf(NULL, "rm -rf testsuite/tmp/*");
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs_test.c
--- a/tools/xenstore/xs_test.c  Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs_test.c  Sat Oct  8 18:19:27 2005
@@ -198,7 +198,6 @@
             "  rm <path>\n"
             "  getperm <path>\n"
             "  setperm <path> <id> <flags> ...\n"
-            "  shutdown\n"
             "  watch <path> <token>\n"
             "  watchnoack <path> <token>\n"
             "  waitwatch\n"
@@ -214,8 +213,6 @@
             "  notimeout\n"
             "  readonly\n"
             "  readwrite\n"
-            "  noackwrite <path> <value>...\n"
-            "  readack\n"
             "  dump\n");
 }
 
@@ -353,37 +350,6 @@
 {
        if (!xs_write(handles[handle], path, data, strlen(data)))
                failed(handle);
-}
-
-static void do_noackwrite(unsigned int handle,
-                         char *path, char *data)
-{
-       struct xsd_sockmsg msg;
-
-       msg.len = strlen(path) + 1 + strlen(data);
-       msg.type = XS_WRITE;
-       if (!write_all_choice(handles[handle]->fd, &msg, sizeof(msg)))
-               failed(handle);
-       if (!write_all_choice(handles[handle]->fd, path, strlen(path) + 1))
-               failed(handle);
-       if (!write_all_choice(handles[handle]->fd, data, strlen(data)))
-               failed(handle);
-       /* Do not wait for ack. */
-}
-
-static void do_readack(unsigned int handle)
-{
-       enum xsd_sockmsg_type type;
-       char *ret = NULL;
-
-       /* Watches can have fired before reply comes: daemon detects
-        * and re-transmits, so we can ignore this. */
-       do {
-               free(ret);
-               ret = read_reply(handles[handle]->fd, &type, NULL);
-               if (!ret)
-                       failed(handle);
-       } while (type == XS_WATCH_EVENT);
 }
 
 static void do_setid(unsigned int handle, char *id)
@@ -472,12 +438,6 @@
        }
 
        if (!xs_set_permissions(handles[handle], path, perms, i))
-               failed(handle);
-}
-
-static void do_shutdown(unsigned int handle)
-{
-       if (!xs_shutdown(handles[handle]))
                failed(handle);
 }
 
@@ -780,8 +740,6 @@
                do_getperm(handle, arg(line, 1));
        else if (streq(command, "setperm"))
                do_setperm(handle, arg(line, 1), line);
-       else if (streq(command, "shutdown"))
-               do_shutdown(handle);
        else if (streq(command, "watch"))
                do_watch(handle, arg(line, 1), arg(line, 2), true);
        else if (streq(command, "watchnoack"))
@@ -823,11 +781,7 @@
                readonly = false;
                xs_daemon_close(handles[handle]);
                handles[handle] = NULL;
-       } else if (streq(command, "noackwrite"))
-               do_noackwrite(handle, arg(line,1), arg(line,2));
-       else if (streq(command, "readack"))
-               do_readack(handle);
-       else
+       } else
                barf("Unknown command %s", command);
        fflush(stdout);
        disarm_timeout();
diff -r e69413dca684 -r 2144de6eabcc xen/include/public/io/xs_wire.h
--- a/xen/include/public/io/xs_wire.h   Sat Oct  8 09:22:01 2005
+++ b/xen/include/public/io/xs_wire.h   Sat Oct  8 18:19:27 2005
@@ -31,7 +31,6 @@
 enum xsd_sockmsg_type
 {
        XS_DEBUG,
-       XS_SHUTDOWN,
        XS_DIRECTORY,
        XS_READ,
        XS_GET_PERMS,
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/15nowait.test
--- a/tools/xenstore/testsuite/15nowait.test    Sat Oct  8 09:22:01 2005
+++ /dev/null   Sat Oct  8 18:19:27 2005
@@ -1,25 +0,0 @@
-# If we don't wait for an ack, we can crash daemon as it never expects to be
-# sending out two replies on top of each other.
-noackwrite /1 1
-noackwrite /2 2
-noackwrite /3 3
-noackwrite /4 4
-noackwrite /5 5
-readack
-readack
-readack
-readack
-readack
-
-expect handle is 1
-introduce 1 100 7 /my/home
-1 noackwrite /1 1
-1 noackwrite /2 2
-1 noackwrite /3 3
-1 noackwrite /4 4
-1 noackwrite /5 5
-1 readack
-1 readack
-1 readack
-1 readack
-1 readack
diff -r e69413dca684 -r 2144de6eabcc 
tools/xenstore/testsuite/16block-watch-crash.test
--- a/tools/xenstore/testsuite/16block-watch-crash.test Sat Oct  8 09:22:01 2005
+++ /dev/null   Sat Oct  8 18:19:27 2005
@@ -1,14 +0,0 @@
-# Test case where blocked connection gets sent watch.
-
-# FIXME: We no longer block connections 
-# mkdir /test
-# watch /test token
-# 1 start
-# # This will block on above
-# noackwrite /test/entry contents
-# 1 write /test/entry2 contents
-# 1 commit
-# readack
-# expect /test/entry2:token
-# waitwatch
-# ackwatch token

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog

<Prev in Thread] Current Thread [Next in Thread>
  • [Xen-changelog] Make libxenstore thread-safe. It also spawns an internal, Xen patchbot -unstable <=