# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 2144de6eabcc7fc6272a8ca088008ef92c05aa6b
# Parent e69413dca6844c87885121a7360fa7c2b11cdea9
Make libxenstore thread-safe. It also spawns an internal
thread to read messages from the comms channel.
Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>
diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/lowlevel/xs/xs.c
--- a/tools/python/xen/lowlevel/xs/xs.c Sat Oct 8 09:22:01 2005
+++ b/tools/python/xen/lowlevel/xs/xs.c Sat Oct 8 18:19:27 2005
@@ -775,39 +775,6 @@
return val;
}
-#define xspy_shutdown_doc "\n" \
- "Shutdown the xenstore daemon.\n" \
- "\n" \
- "Returns None on success.\n" \
- "Raises RuntimeError on error.\n" \
- "\n"
-
-static PyObject *xspy_shutdown(PyObject *self, PyObject *args, PyObject *kwds)
-{
- static char *kwd_spec[] = { NULL };
- static char *arg_spec = "";
-
- struct xs_handle *xh = xshandle(self);
- PyObject *val = NULL;
- int xsval = 0;
-
- if (!xh)
- goto exit;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
- goto exit;
- Py_BEGIN_ALLOW_THREADS
- xsval = xs_shutdown(xh);
- Py_END_ALLOW_THREADS
- if (!xsval) {
- PyErr_SetFromErrno(PyExc_RuntimeError);
- goto exit;
- }
- Py_INCREF(Py_None);
- val = Py_None;
- exit:
- return val;
-}
-
#define xspy_get_domain_path_doc "\n" \
"Return store path of domain.\n" \
" domid [int]: domain id\n" \
@@ -846,28 +813,6 @@
}
val = PyString_FromString(xsval);
free(xsval);
- exit:
- return val;
-}
-
-#define xspy_fileno_doc "\n" \
- "Get the file descriptor of the xenstore socket.\n" \
- "Allows an xs object to be passed to select().\n" \
- "\n" \
- "Returns: [int] file descriptor.\n" \
- "\n"
-
-static PyObject *xspy_fileno(PyObject *self, PyObject *args, PyObject *kwds)
-{
- static char *kwd_spec[] = { NULL };
- static char *arg_spec = "";
-
- struct xs_handle *xh = xshandle(self);
- PyObject *val = NULL;
-
- if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
- goto exit;
- val = PyInt_FromLong((xh ? xs_fileno(xh) : -1));
exit:
return val;
}
@@ -895,9 +840,7 @@
XSPY_METH(introduce_domain),
XSPY_METH(release_domain),
XSPY_METH(close),
- XSPY_METH(shutdown),
XSPY_METH(get_domain_path),
- XSPY_METH(fileno),
{ /* Terminator. */ },
};
diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/xend/xenstore/xsutil.py
--- a/tools/python/xen/xend/xenstore/xsutil.py Sat Oct 8 09:22:01 2005
+++ b/tools/python/xen/xend/xenstore/xsutil.py Sat Oct 8 18:19:27 2005
@@ -7,14 +7,17 @@
import threading
from xen.lowlevel import xs
-handles = {}
+xs_lock = threading.Lock()
+xs_handle = None
-# XXX need to g/c handles from dead threads
def xshandle():
- if not handles.has_key(threading.currentThread()):
- handles[threading.currentThread()] = xs.open()
- return handles[threading.currentThread()]
-
+ global xs_handle, xs_lock
+ if not xs_handle:
+ xs_lock.acquire()
+ if not xs_handle:
+ xs_handle = xs.open()
+ xs_lock.release()
+ return xs_handle
def IntroduceDomain(domid, page, port, path):
return xshandle().introduce_domain(domid, page, port, path)
diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/xend/xenstore/xswatch.py
--- a/tools/python/xen/xend/xenstore/xswatch.py Sat Oct 8 09:22:01 2005
+++ b/tools/python/xen/xend/xenstore/xswatch.py Sat Oct 8 18:19:27 2005
@@ -12,7 +12,6 @@
class xswatch:
watchThread = None
- threadcond = threading.Condition()
xs = None
xslock = threading.Lock()
@@ -21,43 +20,31 @@
self.args = args
self.kwargs = kwargs
xswatch.watchStart()
- xswatch.xslock.acquire()
xswatch.xs.watch(path, self)
- xswatch.xslock.release()
def watchStart(cls):
- cls.threadcond.acquire()
+ cls.xslock.acquire()
if cls.watchThread:
- cls.threadcond.release()
+ cls.xslock.release()
return
+ # XXX: When we fix xenstored to have better watch semantics,
+ # this can change to shared xshandle(). Currently that would result
+ # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
+ cls.xs = xs.open()
cls.watchThread = threading.Thread(name="Watcher",
target=cls.watchMain)
cls.watchThread.setDaemon(True)
cls.watchThread.start()
- while cls.xs == None:
- cls.threadcond.wait()
- cls.threadcond.release()
+ cls.xslock.release()
watchStart = classmethod(watchStart)
def watchMain(cls):
- cls.threadcond.acquire()
- cls.xs = xs.open()
- cls.threadcond.notifyAll()
- cls.threadcond.release()
while True:
try:
- (fd, _1, _2) = select.select([ cls.xs ], [], [])
- cls.xslock.acquire()
- # reconfirm ready to read with lock
- (fd, _1, _2) = select.select([ cls.xs ], [], [], 0.001)
- if not cls.xs in fd:
- cls.xslock.release()
- continue
we = cls.xs.read_watch()
watch = we[1]
cls.xs.acknowledge_watch(watch)
- cls.xslock.release()
except RuntimeError, ex:
print ex
raise
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/Makefile
--- a/tools/xenstore/Makefile Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/Makefile Sat Oct 8 18:19:27 2005
@@ -34,7 +34,6 @@
xenstored: xenstored_core.o xenstored_watch.o xenstored_domain.o
xenstored_transaction.o xs_lib.o talloc.o utils.o tdb.o
$(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -o $@
-$(CLIENTS): libxenstore.so
$(CLIENTS): xenstore-%: xenstore_%.o
$(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -L. -lxenstore -o $@
@@ -47,6 +46,7 @@
xs_tdb_dump: xs_tdb_dump.o utils.o tdb.o talloc.o
$(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@
+xs_test xs_random xs_stress xs_crashme: LDFLAGS+=-lpthread
xs_test: xs_test.o xs_lib.o utils.o
xs_random: xs_random.o xs_test_lib.o xs_lib.o talloc.o utils.o
xs_stress: xs_stress.o xs_test_lib.o xs_lib.o talloc.o utils.o
@@ -69,7 +69,7 @@
$(COMPILE.c) -o $@ $<
libxenstore.so: xs.opic xs_lib.opic
- $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@
$^
+ $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@
$^ -lpthread
clean: testsuite-clean
rm -f *.o *.opic *.so
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/12readonly.test
--- a/tools/xenstore/testsuite/12readonly.test Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/testsuite/12readonly.test Sat Oct 8 18:19:27 2005
@@ -27,8 +27,6 @@
setperm /test 100 NONE
expect setperm failed: Permission denied
setperm /test 100 NONE
-expect shutdown failed: Permission denied
-shutdown
expect introduce failed: Permission denied
introduce 1 100 7 /home
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/test.sh
--- a/tools/xenstore/testsuite/test.sh Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/testsuite/test.sh Sat Oct 8 18:19:27 2005
@@ -23,7 +23,8 @@
cat testsuite/tmp/xenstored_errors
return 1
fi
- echo shutdown | ./xs_test
+ kill $PID
+ sleep 1
return 0
else
# In case daemon is wedged.
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/xenstored_core.c Sat Oct 8 18:19:27 2005
@@ -150,7 +150,6 @@
{
switch (type) {
case XS_DEBUG: return "DEBUG";
- case XS_SHUTDOWN: return "SHUTDOWN";
case XS_DIRECTORY: return "DIRECTORY";
case XS_READ: return "READ";
case XS_GET_PERMS: return "GET_PERMS";
@@ -1082,17 +1081,6 @@
case XS_SET_PERMS:
do_set_perms(conn, in);
break;
-
- case XS_SHUTDOWN:
- /* FIXME: Implement gentle shutdown too. */
- /* Only tools can do this. */
- if (conn->id != 0 || !conn->can_write) {
- send_error(conn, EACCES);
- break;
- }
- send_ack(conn, XS_SHUTDOWN);
- /* Everything hangs off auto-free context, freed at exit. */
- exit(0);
case XS_DEBUG:
if (streq(in->buffer, "print"))
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs.c
--- a/tools/xenstore/xs.c Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/xs.c Sat Oct 8 18:19:27 2005
@@ -32,82 +32,153 @@
#include <stdint.h>
#include <errno.h>
#include <sys/ioctl.h>
+#include <pthread.h>
#include "xs.h"
+#include "list.h"
#include "utils.h"
-struct xs_handle
-{
+struct xs_stored_msg {
+ struct list_head list;
+ struct xsd_sockmsg hdr;
+ char *body;
+};
+
+struct xs_handle {
+ /* Communications channel to xenstore daemon. */
int fd;
+
+ /*
+ * A read thread which pulls messages off the comms channel and
+ * signals waiters.
+ */
+ pthread_t read_thr;
+
+ /*
+ * A list of fired watch messages, protected by a mutex. Users can
+ * wait on the conditional variable until a watch is pending.
+ */
+ struct list_head watch_list;
+ pthread_mutex_t watch_mutex;
+ pthread_cond_t watch_condvar;
+
+ /* Clients can select() on this pipe to wait for a watch to fire. */
+ int watch_pipe[2];
+
+ /*
+ * A list of replies. Currently only one will ever be outstanding
+ * because we serialise requests. The requester can wait on the
+ * conditional variable for its response.
+ */
+ struct list_head reply_list;
+ pthread_mutex_t reply_mutex;
+ pthread_cond_t reply_condvar;
+
+ /* One request at a time. */
+ pthread_mutex_t request_mutex;
+
+ /* One transaction at a time. */
+ pthread_mutex_t transaction_mutex;
};
-/* Get the socket from the store daemon handle.
- */
+static void *read_thread(void *arg);
+
int xs_fileno(struct xs_handle *h)
{
- return h->fd;
-}
-
-static struct xs_handle *get_socket(const char *connect_to)
+ char c = 0;
+
+ pthread_mutex_lock(&h->watch_mutex);
+
+ if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
+ /* Kick things off if the watch list is already non-empty. */
+ if (!list_empty(&h->watch_list))
+ while (write(h->watch_pipe[1], &c, 1) != 1)
+ continue;
+ }
+
+ pthread_mutex_unlock(&h->watch_mutex);
+
+ return h->watch_pipe[0];
+}
+
+static int get_socket(const char *connect_to)
{
struct sockaddr_un addr;
int sock, saved_errno;
- struct xs_handle *h = NULL;
sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (sock < 0)
- return NULL;
+ return -1;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, connect_to);
- if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
- h = malloc(sizeof(*h));
- if (h) {
- h->fd = sock;
- return h;
- }
- }
-
+ if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
+ saved_errno = errno;
+ close(sock);
+ errno = saved_errno;
+ return -1;
+ }
+
+ return sock;
+}
+
+static int get_dev(const char *connect_to)
+{
+ return open(connect_to, O_RDWR);
+}
+
+static struct xs_handle *get_handle(const char *connect_to)
+{
+ struct stat buf;
+ struct xs_handle *h = NULL;
+ int fd = -1, saved_errno;
+
+ if (stat(connect_to, &buf) != 0)
+ goto error;
+
+ if (S_ISSOCK(buf.st_mode))
+ fd = get_socket(connect_to);
+ else
+ fd = get_dev(connect_to);
+
+ if (fd == -1)
+ goto error;
+
+ h = malloc(sizeof(*h));
+ if (h == NULL)
+ goto error;
+
+ h->fd = fd;
+
+ /* Watch pipe is allocated on demand in xs_fileno(). */
+ h->watch_pipe[0] = h->watch_pipe[1] = -1;
+
+ INIT_LIST_HEAD(&h->watch_list);
+ pthread_mutex_init(&h->watch_mutex, NULL);
+ pthread_cond_init(&h->watch_condvar, NULL);
+
+ INIT_LIST_HEAD(&h->reply_list);
+ pthread_mutex_init(&h->reply_mutex, NULL);
+ pthread_cond_init(&h->reply_condvar, NULL);
+
+ pthread_mutex_init(&h->request_mutex, NULL);
+ pthread_mutex_init(&h->transaction_mutex, NULL);
+
+ if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
+ goto error;
+
+ return h;
+
+ error:
saved_errno = errno;
- close(sock);
+ if (h != NULL)
+ free(h);
+ if (fd != -1)
+ close(fd);
errno = saved_errno;
return NULL;
}
-static struct xs_handle *get_dev(const char *connect_to)
-{
- int fd, saved_errno;
- struct xs_handle *h;
-
- fd = open(connect_to, O_RDWR);
- if (fd < 0)
- return NULL;
-
- h = malloc(sizeof(*h));
- if (h) {
- h->fd = fd;
- return h;
- }
-
- saved_errno = errno;
- close(fd);
- errno = saved_errno;
- return NULL;
-}
-
-static struct xs_handle *get_handle(const char *connect_to)
-{
- struct stat buf;
-
- if (stat(connect_to, &buf) != 0)
- return NULL;
-
- if (S_ISSOCK(buf.st_mode))
- return get_socket(connect_to);
- else
- return get_dev(connect_to);
-}
-
struct xs_handle *xs_daemon_open(void)
{
return get_handle(xs_daemon_socket());
@@ -125,8 +196,39 @@
void xs_daemon_close(struct xs_handle *h)
{
- if (h->fd >= 0)
- close(h->fd);
+ struct xs_stored_msg *msg, *tmsg;
+
+ pthread_mutex_lock(&h->transaction_mutex);
+ pthread_mutex_lock(&h->request_mutex);
+ pthread_mutex_lock(&h->reply_mutex);
+ pthread_mutex_lock(&h->watch_mutex);
+
+ /* XXX FIXME: May leak an unpublished message buffer. */
+ pthread_cancel(h->read_thr);
+ pthread_join(h->read_thr, NULL);
+
+ list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
+ free(msg->body);
+ free(msg);
+ }
+
+ list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
+ free(msg->body);
+ free(msg);
+ }
+
+ pthread_mutex_unlock(&h->transaction_mutex);
+ pthread_mutex_unlock(&h->request_mutex);
+ pthread_mutex_unlock(&h->reply_mutex);
+ pthread_mutex_unlock(&h->watch_mutex);
+
+ if (h->watch_pipe[0] != -1) {
+ close(h->watch_pipe[0]);
+ close(h->watch_pipe[1]);
+ }
+
+ close(h->fd);
+
free(h);
}
@@ -169,31 +271,28 @@
}
/* Adds extra nul terminator, because we generally (always?) hold strings. */
-static void *read_reply(int fd, enum xsd_sockmsg_type *type, unsigned int *len)
-{
- struct xsd_sockmsg msg;
- void *ret;
- int saved_errno;
-
- if (!read_all(fd, &msg, sizeof(msg)))
- return NULL;
-
- ret = malloc(msg.len + 1);
- if (!ret)
- return NULL;
-
- if (!read_all(fd, ret, msg.len)) {
- saved_errno = errno;
- free(ret);
- errno = saved_errno;
- return NULL;
- }
-
- *type = msg.type;
+static void *read_reply(
+ struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
+{
+ struct xs_stored_msg *msg;
+ char *body;
+
+ pthread_mutex_lock(&h->reply_mutex);
+ while (list_empty(&h->reply_list))
+ pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
+ msg = list_top(&h->reply_list, struct xs_stored_msg, list);
+ list_del(&msg->list);
+ assert(list_empty(&h->reply_list));
+ pthread_mutex_unlock(&h->reply_mutex);
+
+ *type = msg->hdr.type;
if (len)
- *len = msg.len;
- ((char *)ret)[msg.len] = '\0';
- return ret;
+ *len = msg->hdr.len;
+ body = msg->body;
+
+ free(msg);
+
+ return body;
}
/* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
@@ -217,6 +316,8 @@
ignorepipe.sa_flags = 0;
sigaction(SIGPIPE, &ignorepipe, &oldact);
+ pthread_mutex_lock(&h->request_mutex);
+
if (!xs_write_all(h->fd, &msg, sizeof(msg)))
goto fail;
@@ -224,14 +325,11 @@
if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
goto fail;
- /* Watches can have fired before reply comes: daemon detects
- * and re-transmits, so we can ignore this. */
- do {
- free(ret);
- ret = read_reply(h->fd, &msg.type, len);
- if (!ret)
- goto fail;
- } while (msg.type == XS_WATCH_EVENT);
+ ret = read_reply(h, &msg.type, len);
+ if (!ret)
+ goto fail;
+
+ pthread_mutex_unlock(&h->request_mutex);
sigaction(SIGPIPE, &oldact, NULL);
if (msg.type == XS_ERROR) {
@@ -252,6 +350,7 @@
fail:
/* We're in a bad state, so close fd. */
saved_errno = errno;
+ pthread_mutex_unlock(&h->request_mutex);
sigaction(SIGPIPE, &oldact, NULL);
close_fd:
close(h->fd);
@@ -449,39 +548,45 @@
*/
char **xs_read_watch(struct xs_handle *h, unsigned int *num)
{
- struct xsd_sockmsg msg;
- char **ret;
- char *strings;
+ struct xs_stored_msg *msg;
+ char **ret, *strings, c = 0;
unsigned int num_strings, i;
- if (!read_all(h->fd, &msg, sizeof(msg)))
- return NULL;
-
- assert(msg.type == XS_WATCH_EVENT);
- strings = malloc(msg.len);
- if (!strings)
- return NULL;
-
- if (!read_all(h->fd, strings, msg.len)) {
- free_no_errno(strings);
- return NULL;
- }
-
- num_strings = xs_count_strings(strings, msg.len);
-
- ret = malloc(sizeof(char*) * num_strings + msg.len);
+ pthread_mutex_lock(&h->watch_mutex);
+
+ /* Wait on the condition variable for a watch to fire. */
+ while (list_empty(&h->watch_list))
+ pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
+ msg = list_top(&h->watch_list, struct xs_stored_msg, list);
+ list_del(&msg->list);
+
+ /* Clear the pipe token if there are no more pending watches. */
+ if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
+ while (read(h->watch_pipe[0], &c, 1) != 1)
+ continue;
+
+ pthread_mutex_unlock(&h->watch_mutex);
+
+ assert(msg->hdr.type == XS_WATCH_EVENT);
+
+ strings = msg->body;
+ num_strings = xs_count_strings(strings, msg->hdr.len);
+
+ ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
if (!ret) {
free_no_errno(strings);
+ free_no_errno(msg);
return NULL;
}
ret[0] = (char *)(ret + num_strings);
- memcpy(ret[0], strings, msg.len);
+ memcpy(ret[0], strings, msg->hdr.len);
+
free(strings);
-
- for (i = 1; i < num_strings; i++) {
+ free(msg);
+
+ for (i = 1; i < num_strings; i++)
ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
- }
*num = num_strings;
@@ -519,6 +624,7 @@
*/
bool xs_transaction_start(struct xs_handle *h)
{
+ pthread_mutex_lock(&h->transaction_mutex);
return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
}
@@ -530,12 +636,18 @@
bool xs_transaction_end(struct xs_handle *h, bool abort)
{
char abortstr[2];
+ bool rc;
if (abort)
strcpy(abortstr, "F");
else
strcpy(abortstr, "T");
- return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+
+ rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+
+ pthread_mutex_unlock(&h->transaction_mutex);
+
+ return rc;
}
/* Introduce a new domain.
@@ -584,18 +696,6 @@
return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
}
-bool xs_shutdown(struct xs_handle *h)
-{
- bool ret = xs_bool(xs_single(h, XS_SHUTDOWN, "", NULL));
- if (ret) {
- char c;
- /* Wait for it to actually shutdown. */
- while ((read(h->fd, &c, 1) < 0) && (errno == EINTR))
- continue;
- }
- return ret;
-}
-
/* Only useful for DEBUG versions */
char *xs_debug_command(struct xs_handle *h, const char *cmd,
void *data, unsigned int len)
@@ -609,3 +709,75 @@
return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
}
+
+static void *read_thread(void *arg)
+{
+ struct xs_handle *h = arg;
+ struct xs_stored_msg *msg = NULL;
+ char *body = NULL;
+
+ for (;;) {
+ msg = NULL;
+ body = NULL;
+
+ /* Allocate message structure and read the message header. */
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto error;
+ if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
+ goto error;
+
+ /* Allocate and read the message body. */
+ body = msg->body = malloc(msg->hdr.len + 1);
+ if (body == NULL)
+ goto error;
+ if (!read_all(h->fd, body, msg->hdr.len))
+ goto error;
+ body[msg->hdr.len] = '\0';
+
+ if (msg->hdr.type == XS_WATCH_EVENT) {
+ pthread_mutex_lock(&h->watch_mutex);
+
+ /* Kick users out of their select() loop. */
+ if (list_empty(&h->watch_list) &&
+ (h->watch_pipe[1] != -1))
+ while (write(h->watch_pipe[1], body, 1) != 1)
+ continue;
+
+ list_add_tail(&msg->list, &h->watch_list);
+ pthread_cond_signal(&h->watch_condvar);
+
+ pthread_mutex_unlock(&h->watch_mutex);
+ } else {
+ pthread_mutex_lock(&h->reply_mutex);
+
+ /* There should only ever be one response pending! */
+ if (!list_empty(&h->reply_list)) {
+ pthread_mutex_unlock(&h->reply_mutex);
+ goto error;
+ }
+
+ list_add_tail(&msg->list, &h->reply_list);
+ pthread_cond_signal(&h->reply_condvar);
+
+ pthread_mutex_unlock(&h->reply_mutex);
+ }
+ }
+
+ error:
+ if (body != NULL)
+ free(body);
+ if (msg != NULL)
+ free(msg);
+ return NULL;
+}
+
+/*
+ * Local variables:
+ * c-file-style: "linux"
+ * indent-tabs-mode: t
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * tab-width: 8
+ * End:
+ */
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs.h
--- a/tools/xenstore/xs.h Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/xs.h Sat Oct 8 18:19:27 2005
@@ -141,7 +141,4 @@
char *xs_debug_command(struct xs_handle *h, const char *cmd,
void *data, unsigned int len);
-/* Shut down the daemon. */
-bool xs_shutdown(struct xs_handle *h);
-
#endif /* _XS_H */
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs_random.c
--- a/tools/xenstore/xs_random.c Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/xs_random.c Sat Oct 8 18:19:27 2005
@@ -879,20 +879,11 @@
static void cleanup_xs_ops(void)
{
char *cmd;
+
if (daemon_pid) {
- struct xs_handle *h;
- h = xs_daemon_open();
- if (h) {
- if (xs_shutdown(h)) {
- waitpid(daemon_pid, NULL, 0);
- daemon_pid = 0;
- }
- xs_daemon_close(h);
- }
- if (daemon_pid) {
- kill(daemon_pid, SIGTERM);
- waitpid(daemon_pid, NULL, 0);
- }
+ kill(daemon_pid, SIGTERM);
+ waitpid(daemon_pid, NULL, 0);
+ daemon_pid = 0;
}
cmd = talloc_asprintf(NULL, "rm -rf testsuite/tmp/*");
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs_test.c
--- a/tools/xenstore/xs_test.c Sat Oct 8 09:22:01 2005
+++ b/tools/xenstore/xs_test.c Sat Oct 8 18:19:27 2005
@@ -198,7 +198,6 @@
" rm <path>\n"
" getperm <path>\n"
" setperm <path> <id> <flags> ...\n"
- " shutdown\n"
" watch <path> <token>\n"
" watchnoack <path> <token>\n"
" waitwatch\n"
@@ -214,8 +213,6 @@
" notimeout\n"
" readonly\n"
" readwrite\n"
- " noackwrite <path> <value>...\n"
- " readack\n"
" dump\n");
}
@@ -353,37 +350,6 @@
{
if (!xs_write(handles[handle], path, data, strlen(data)))
failed(handle);
-}
-
-static void do_noackwrite(unsigned int handle,
- char *path, char *data)
-{
- struct xsd_sockmsg msg;
-
- msg.len = strlen(path) + 1 + strlen(data);
- msg.type = XS_WRITE;
- if (!write_all_choice(handles[handle]->fd, &msg, sizeof(msg)))
- failed(handle);
- if (!write_all_choice(handles[handle]->fd, path, strlen(path) + 1))
- failed(handle);
- if (!write_all_choice(handles[handle]->fd, data, strlen(data)))
- failed(handle);
- /* Do not wait for ack. */
-}
-
-static void do_readack(unsigned int handle)
-{
- enum xsd_sockmsg_type type;
- char *ret = NULL;
-
- /* Watches can have fired before reply comes: daemon detects
- * and re-transmits, so we can ignore this. */
- do {
- free(ret);
- ret = read_reply(handles[handle]->fd, &type, NULL);
- if (!ret)
- failed(handle);
- } while (type == XS_WATCH_EVENT);
}
static void do_setid(unsigned int handle, char *id)
@@ -472,12 +438,6 @@
}
if (!xs_set_permissions(handles[handle], path, perms, i))
- failed(handle);
-}
-
-static void do_shutdown(unsigned int handle)
-{
- if (!xs_shutdown(handles[handle]))
failed(handle);
}
@@ -780,8 +740,6 @@
do_getperm(handle, arg(line, 1));
else if (streq(command, "setperm"))
do_setperm(handle, arg(line, 1), line);
- else if (streq(command, "shutdown"))
- do_shutdown(handle);
else if (streq(command, "watch"))
do_watch(handle, arg(line, 1), arg(line, 2), true);
else if (streq(command, "watchnoack"))
@@ -823,11 +781,7 @@
readonly = false;
xs_daemon_close(handles[handle]);
handles[handle] = NULL;
- } else if (streq(command, "noackwrite"))
- do_noackwrite(handle, arg(line,1), arg(line,2));
- else if (streq(command, "readack"))
- do_readack(handle);
- else
+ } else
barf("Unknown command %s", command);
fflush(stdout);
disarm_timeout();
diff -r e69413dca684 -r 2144de6eabcc xen/include/public/io/xs_wire.h
--- a/xen/include/public/io/xs_wire.h Sat Oct 8 09:22:01 2005
+++ b/xen/include/public/io/xs_wire.h Sat Oct 8 18:19:27 2005
@@ -31,7 +31,6 @@
enum xsd_sockmsg_type
{
XS_DEBUG,
- XS_SHUTDOWN,
XS_DIRECTORY,
XS_READ,
XS_GET_PERMS,
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/15nowait.test
--- a/tools/xenstore/testsuite/15nowait.test Sat Oct 8 09:22:01 2005
+++ /dev/null Sat Oct 8 18:19:27 2005
@@ -1,25 +0,0 @@
-# If we don't wait for an ack, we can crash daemon as it never expects to be
-# sending out two replies on top of each other.
-noackwrite /1 1
-noackwrite /2 2
-noackwrite /3 3
-noackwrite /4 4
-noackwrite /5 5
-readack
-readack
-readack
-readack
-readack
-
-expect handle is 1
-introduce 1 100 7 /my/home
-1 noackwrite /1 1
-1 noackwrite /2 2
-1 noackwrite /3 3
-1 noackwrite /4 4
-1 noackwrite /5 5
-1 readack
-1 readack
-1 readack
-1 readack
-1 readack
diff -r e69413dca684 -r 2144de6eabcc
tools/xenstore/testsuite/16block-watch-crash.test
--- a/tools/xenstore/testsuite/16block-watch-crash.test Sat Oct 8 09:22:01 2005
+++ /dev/null Sat Oct 8 18:19:27 2005
@@ -1,14 +0,0 @@
-# Test case where blocked connection gets sent watch.
-
-# FIXME: We no longer block connections
-# mkdir /test
-# watch /test token
-# 1 start
-# # This will block on above
-# noackwrite /test/entry contents
-# 1 write /test/entry2 contents
-# 1 commit
-# readack
-# expect /test/entry2:token
-# waitwatch
-# ackwatch token
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog
|