WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-changelog

[Xen-changelog] Refactor xenbus to break up the xenbus_lock and permit w

To: xen-changelog@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-changelog] Refactor xenbus to break up the xenbus_lock and permit watches
From: Xen patchbot -unstable <patchbot-unstable@xxxxxxxxxxxxxxxxxxx>
Date: Sun, 09 Oct 2005 18:02:11 +0000
Delivery-date: Sun, 09 Oct 2005 17:59:42 +0000
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
List-help: <mailto:xen-changelog-request@lists.xensource.com?subject=help>
List-id: BK change log <xen-changelog.lists.xensource.com>
List-post: <mailto:xen-changelog@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=unsubscribe>
Reply-to: xen-devel@xxxxxxxxxxxxxxxxxxx
Sender: xen-changelog-bounces@xxxxxxxxxxxxxxxxxxx
# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 8016551fde9825fc82bfa4762f17b98e7519b823
# Parent  ab93a9a46bd48f9a654b1fdc9caf4b7ae07f6a8b
Refactor xenbus to break up the xenbus_lock and permit watches
to fire concurrently with request/reply pairs. Remove
watch_ack message: no longer needed.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c
--- a/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c       Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c       Sun Oct  9 
17:52:54 2005
@@ -1327,18 +1327,14 @@
        .callback = handle_vcpu_hotplug_event
 };
 
-/* NB: Assumes xenbus_lock is held! */
 static int setup_cpu_watcher(struct notifier_block *notifier,
                              unsigned long event, void *data)
 {
-       int err = 0;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
+       int err;
+
        err = register_xenbus_watch(&cpu_watch);
-
-       if (err) {
+       if (err)
                printk("Failed to register watch on /cpu\n");
-       }
 
        return NOTIFY_DONE;
 }
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/arch/xen/kernel/reboot.c
--- a/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c     Sun Oct  9 16:29:24 2005
+++ b/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c     Sun Oct  9 17:52:54 2005
@@ -360,9 +360,6 @@
 
 static struct notifier_block xenstore_notifier;
 
-/* Setup our watcher
-   NB: Assumes xenbus_lock is held!
-*/
 static int setup_shutdown_watcher(struct notifier_block *notifier,
                                   unsigned long event,
                                   void *data)
@@ -371,8 +368,6 @@
 #ifdef CONFIG_MAGIC_SYSRQ
        int err2 = 0;
 #endif
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
 
        err1 = register_xenbus_watch(&shutdown_watch);
 #ifdef CONFIG_MAGIC_SYSRQ
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c
--- a/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c        Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c        Sun Oct  9 
17:52:54 2005
@@ -370,16 +370,11 @@
     
 }
 
-/* Setup our watcher
-   NB: Assumes xenbus_lock is held!
-*/
 int balloon_init_watcher(struct notifier_block *notifier,
                          unsigned long event,
                          void *data)
 {
        int err;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
 
        err = register_xenbus_watch(&target_watch);
        if (err)
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c    Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c    Sun Oct  9 
17:52:54 2005
@@ -130,15 +130,10 @@
 
                wait_event(xb_waitq, output_avail(out));
 
-               /* Read, then check: not that we don't trust store.
-                * Hell, some of my best friends are daemons.  But,
-                * in this post-911 world... */
+               mb();
                h = *out;
-               mb();
-               if (!check_buffer(&h)) {
-                       set_current_state(TASK_RUNNING);
-                       return -EIO; /* ETERRORIST! */
-               }
+               if (!check_buffer(&h))
+                       return -EIO;
 
                dst = get_output_chunk(&h, out->buf, &avail);
                if (avail > len)
@@ -173,12 +168,11 @@
                const char *src;
 
                wait_event(xb_waitq, xs_input_avail());
+
+               mb();
                h = *in;
-               mb();
-               if (!check_buffer(&h)) {
-                       set_current_state(TASK_RUNNING);
+               if (!check_buffer(&h))
                        return -EIO;
-               }
 
                src = get_input_chunk(&h, in->buf, &avail);
                if (avail > len)
@@ -195,10 +189,6 @@
                        notify_remote_via_evtchn(xen_start_info->store_evtchn);
        }
 
-       /* If we left something, wake watch thread to deal with it. */
-       if (xs_input_avail())
-               wake_up(&xb_waitq);
-
        return 0;
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Sun Oct  9 
17:52:54 2005
@@ -46,85 +46,113 @@
 #include <asm/hypervisor.h>
 
 struct xenbus_dev_data {
-       /* Are there bytes left to be read in this message? */
-       int bytes_left;
-       /* Are we still waiting for the reply to a message we wrote? */
-       int awaiting_reply;
-       /* Buffer for outgoing messages. */
+       int in_transaction;
+
+       /* Partial request. */
        unsigned int len;
        union {
                struct xsd_sockmsg msg;
                char buffer[PAGE_SIZE];
        } u;
+
+       /* Response queue. */
+#define MASK_READ_IDX(idx) ((idx)&(PAGE_SIZE-1))
+       char read_buffer[PAGE_SIZE];
+       unsigned int read_cons, read_prod;
+       wait_queue_head_t read_waitq;
 };
 
 static struct proc_dir_entry *xenbus_dev_intf;
 
-/* Reply can be long (dir, getperm): don't buffer, just examine
- * headers so we can discard rest if they die. */
 static ssize_t xenbus_dev_read(struct file *filp,
                               char __user *ubuf,
                               size_t len, loff_t *ppos)
 {
-       struct xenbus_dev_data *data = filp->private_data;
-       struct xsd_sockmsg msg;
-       int err;
-
-       /* Refill empty buffer? */
-       if (data->bytes_left == 0) {
-               if (len < sizeof(msg))
-                       return -EINVAL;
-
-               err = xb_read(&msg, sizeof(msg));
-               if (err)
-                       return err;
-               data->bytes_left = msg.len;
-               if (ubuf && copy_to_user(ubuf, &msg, sizeof(msg)) != 0)
-                       return -EFAULT;
-               /* We can receive spurious XS_WATCH_EVENT messages. */
-               if (msg.type != XS_WATCH_EVENT)
-                       data->awaiting_reply = 0;
-               return sizeof(msg);
+       struct xenbus_dev_data *u = filp->private_data;
+       int i;
+
+       if (wait_event_interruptible(u->read_waitq,
+                                    u->read_prod != u->read_cons))
+               return -EINTR;
+
+       for (i = 0; i < len; i++) {
+               if (u->read_cons == u->read_prod)
+                       break;
+               put_user(u->read_buffer[MASK_READ_IDX(u->read_cons)], ubuf+i);
+               u->read_cons++;
        }
 
-       /* Don't read over next header, or over temporary buffer. */
-       if (len > sizeof(data->u.buffer))
-               len = sizeof(data->u.buffer);
-       if (len > data->bytes_left)
-               len = data->bytes_left;
-
-       err = xb_read(data->u.buffer, len);
-       if (err)
-               return err;
-
-       data->bytes_left -= len;
-       if (ubuf && copy_to_user(ubuf, data->u.buffer, len) != 0)
-               return -EFAULT;
-       return len;
-}
-
-/* We do v. basic sanity checking so they don't screw up kernel later. */
+       return i;
+}
+
+static void queue_reply(struct xenbus_dev_data *u,
+                       char *data, unsigned int len)
+{
+       int i;
+
+       for (i = 0; i < len; i++, u->read_prod++)
+               u->read_buffer[MASK_READ_IDX(u->read_prod)] = data[i];
+
+       BUG_ON((u->read_prod - u->read_cons) > sizeof(u->read_buffer));
+
+       wake_up(&u->read_waitq);
+}
+
 static ssize_t xenbus_dev_write(struct file *filp,
                                const char __user *ubuf,
                                size_t len, loff_t *ppos)
 {
-       struct xenbus_dev_data *data = filp->private_data;
-       int err;
-
-       /* We gather data in buffer until we're ready to send it. */
-       if (len > data->len + sizeof(data->u))
+       struct xenbus_dev_data *u = filp->private_data;
+       void *reply;
+       int err = 0;
+
+       if ((len + u->len) > sizeof(u->u.buffer))
                return -EINVAL;
-       if (copy_from_user(data->u.buffer + data->len, ubuf, len) != 0)
+
+       if (copy_from_user(u->u.buffer + u->len, ubuf, len) != 0)
                return -EFAULT;
-       data->len += len;
-       if (data->len >= sizeof(data->u.msg) + data->u.msg.len) {
-               err = xb_write(data->u.buffer, data->len);
-               if (err)
-                       return err;
-               data->len = 0;
-               data->awaiting_reply = 1;
+
+       u->len += len;
+       if (u->len < (sizeof(u->u.msg) + u->u.msg.len))
+               return len;
+
+       switch (u->u.msg.type) {
+       case XS_TRANSACTION_START:
+       case XS_TRANSACTION_END:
+       case XS_DIRECTORY:
+       case XS_READ:
+       case XS_GET_PERMS:
+       case XS_RELEASE:
+       case XS_GET_DOMAIN_PATH:
+       case XS_WRITE:
+       case XS_MKDIR:
+       case XS_RM:
+       case XS_SET_PERMS:
+               reply = xenbus_dev_request_and_reply(&u->u.msg);
+               if (IS_ERR(reply))
+                       err = PTR_ERR(reply);
+               else {
+                       if (u->u.msg.type == XS_TRANSACTION_START)
+                               u->in_transaction = 1;
+                       if (u->u.msg.type == XS_TRANSACTION_END)
+                               u->in_transaction = 0;
+                       queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
+                       queue_reply(u, (char *)reply, u->u.msg.len);
+                       kfree(reply);
+               }
+               break;
+
+       default:
+               err = -EINVAL;
+               break;
        }
-       return len;
+
+       if (err == 0) {
+               u->len = 0;
+               err = len;
+       }
+
+       return err;
 }
 
 static int xenbus_dev_open(struct inode *inode, struct file *filp)
@@ -134,7 +162,6 @@
        if (xen_start_info->store_evtchn == 0)
                return -ENOENT;
 
-       /* Don't try seeking. */
        nonseekable_open(inode, filp);
 
        u = kmalloc(sizeof(*u), GFP_KERNEL);
@@ -142,28 +169,21 @@
                return -ENOMEM;
 
        memset(u, 0, sizeof(*u));
+       init_waitqueue_head(&u->read_waitq);
 
        filp->private_data = u;
 
-       down(&xenbus_lock);
-
        return 0;
 }
 
 static int xenbus_dev_release(struct inode *inode, struct file *filp)
 {
-       struct xenbus_dev_data *data = filp->private_data;
-
-       /* Discard any unread replies. */
-       while (data->bytes_left || data->awaiting_reply)
-               xenbus_dev_read(filp, NULL, sizeof(data->u.buffer), NULL);
-
-       /* Harmless if no transaction in progress. */
-       xenbus_transaction_end(1);
-
-       up(&xenbus_lock);
-
-       kfree(data);
+       struct xenbus_dev_data *u = filp->private_data;
+
+       if (u->in_transaction)
+               xenbus_transaction_end(1);
+
+       kfree(u);
 
        return 0;
 }
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c    Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c    Sun Oct  9 
17:52:54 2005
@@ -43,6 +43,9 @@
 
 static struct notifier_block *xenstore_chain;
 
+/* Now used to protect xenbus probes against save/restore. */
+static DECLARE_MUTEX(xenbus_lock);
+
 /* If something in array of ids matches this device, return it. */
 static const struct xenbus_device_id *
 match_device(const struct xenbus_device_id *arr, struct xenbus_device *dev)
@@ -625,12 +628,13 @@
        down(&xenbus_lock);
        bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, suspend_dev);
        bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, suspend_dev);
+       xs_suspend();
 }
 
 void xenbus_resume(void)
 {
        xb_init_comms();
-       reregister_xenbus_watches();
+       xs_resume();
        bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, resume_dev);
        bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, resume_dev);
        up(&xenbus_lock);
@@ -685,6 +689,7 @@
        /* Notify others that xenstore is up */
        notifier_call_chain(&xenstore_chain, 0, 0);
        up(&xenbus_lock);
+
        return 0;
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Sun Oct  9 
17:52:54 2005
@@ -42,11 +42,67 @@
 
 #define streq(a, b) (strcmp((a), (b)) == 0)
 
-static char printf_buffer[4096];
+struct xs_stored_msg {
+       struct xsd_sockmsg hdr;
+
+       union {
+               /* Stored replies. */
+               struct {
+                       struct list_head list;
+                       char *body;
+               } reply;
+
+               /* Queued watch callbacks. */
+               struct {
+                       struct work_struct work;
+                       struct xenbus_watch *handle;
+                       char **vec;
+                       unsigned int vec_size;
+               } watch;
+       } u;
+};
+
+struct xs_handle {
+       /* A list of replies. Currently only one will ever be outstanding. */
+       struct list_head reply_list;
+       spinlock_t reply_lock;
+       wait_queue_head_t reply_waitq;
+
+       /* One request at a time. */
+       struct semaphore request_mutex;
+
+       /* One transaction at a time. */
+       struct semaphore transaction_mutex;
+       int transaction_pid;
+};
+
+static struct xs_handle xs_state;
+
 static LIST_HEAD(watches);
-
-DECLARE_MUTEX(xenbus_lock);
-EXPORT_SYMBOL(xenbus_lock);
+static DEFINE_SPINLOCK(watches_lock);
+
+/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
+static int xs_resuming;
+static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
+
+static void request_mutex_acquire(void)
+{
+       /*
+        * We can't distinguish non-transactional from transactional
+        * requests right now. So temporarily acquire the transaction mutex
+        * if this task is outside transaction context.
+        */
+       if (xs_state.transaction_pid != current->pid)
+               down(&xs_state.transaction_mutex);
+       down(&xs_state.request_mutex);
+}
+
+static void request_mutex_release(void)
+{
+       up(&xs_state.request_mutex);
+       if (xs_state.transaction_pid != current->pid)
+               up(&xs_state.transaction_mutex);
+}
 
 static int get_error(const char *errorstring)
 {
@@ -65,29 +121,32 @@
 
 static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
 {
-       struct xsd_sockmsg msg;
-       void *ret;
-       int err;
-
-       err = xb_read(&msg, sizeof(msg));
-       if (err)
-               return ERR_PTR(err);
-
-       ret = kmalloc(msg.len + 1, GFP_KERNEL);
-       if (!ret)
-               return ERR_PTR(-ENOMEM);
-
-       err = xb_read(ret, msg.len);
-       if (err) {
-               kfree(ret);
-               return ERR_PTR(err);
-       }
-       ((char*)ret)[msg.len] = '\0';
-
-       *type = msg.type;
+       struct xs_stored_msg *msg;
+       char *body;
+
+       spin_lock(&xs_state.reply_lock);
+
+       while (list_empty(&xs_state.reply_list)) {
+               spin_unlock(&xs_state.reply_lock);
+               wait_event(xs_state.reply_waitq,
+                          !list_empty(&xs_state.reply_list));
+               spin_lock(&xs_state.reply_lock);
+       }
+
+       msg = list_entry(xs_state.reply_list.next,
+                        struct xs_stored_msg, u.reply.list);
+       list_del(&msg->u.reply.list);
+
+       spin_unlock(&xs_state.reply_lock);
+
+       *type = msg->hdr.type;
        if (len)
-               *len = msg.len;
-       return ret;
+               *len = msg->hdr.len;
+       body = msg->u.reply.body;
+
+       kfree(msg);
+
+       return body;
 }
 
 /* Emergency write. */
@@ -98,10 +157,45 @@
        msg.type = XS_DEBUG;
        msg.len = sizeof("print") + count + 1;
 
+       request_mutex_acquire();
        xb_write(&msg, sizeof(msg));
        xb_write("print", sizeof("print"));
        xb_write(str, count);
        xb_write("", 1);
+       request_mutex_release();
+}
+
+void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
+{
+       void *ret;
+       struct xsd_sockmsg req_msg = *msg;
+       int err;
+
+       if (req_msg.type == XS_TRANSACTION_START) {
+               down(&xs_state.transaction_mutex);
+               xs_state.transaction_pid = current->pid;
+       }
+
+       request_mutex_acquire();
+
+       err = xb_write(msg, sizeof(*msg) + msg->len);
+       if (err) {
+               msg->type = XS_ERROR;
+               ret = ERR_PTR(err);
+       } else {
+               ret = read_reply(&msg->type, &msg->len);
+       }
+
+       request_mutex_release();
+
+       if ((msg->type == XS_TRANSACTION_END) ||
+           ((req_msg.type == XS_TRANSACTION_START) &&
+            (msg->type == XS_ERROR))) {
+               xs_state.transaction_pid = -1;
+               up(&xs_state.transaction_mutex);
+       }
+
+       return ret;
 }
 
 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
@@ -115,31 +209,33 @@
        unsigned int i;
        int err;
 
-       WARN_ON(down_trylock(&xenbus_lock) == 0);
-
        msg.type = type;
        msg.len = 0;
        for (i = 0; i < num_vecs; i++)
                msg.len += iovec[i].iov_len;
 
+       request_mutex_acquire();
+
        err = xb_write(&msg, sizeof(msg));
-       if (err)
+       if (err) {
+               up(&xs_state.request_mutex);
                return ERR_PTR(err);
+       }
 
        for (i = 0; i < num_vecs; i++) {
                err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
-               if (err)
+               if (err) {
+                       request_mutex_release();
                        return ERR_PTR(err);
-       }
-
-       /* Watches can have fired before reply comes: daemon detects
-        * and re-transmits, so we can ignore this. */
-       do {
-               kfree(ret);
-               ret = read_reply(&msg.type, len);
-               if (IS_ERR(ret))
-                       return ret;
-       } while (msg.type == XS_WATCH_EVENT);
+               }
+       }
+
+       ret = read_reply(&msg.type, len);
+
+       request_mutex_release();
+
+       if (IS_ERR(ret))
+               return ret;
 
        if (msg.type == XS_ERROR) {
                err = get_error(ret);
@@ -187,8 +283,6 @@
 {
        static char buffer[4096];
 
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
-       /* XXX FIXME: might not be correct if name == "" */
        BUG_ON(strlen(dir) + strlen("/") + strlen(name) + 1 > sizeof(buffer));
 
        strcpy(buffer, dir);
@@ -207,7 +301,7 @@
        *num = count_strings(strings, len);
 
        /* Transfer to one big alloc for easy freeing. */
-       ret = kmalloc(*num * sizeof(char *) + len, GFP_ATOMIC);
+       ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL);
        if (!ret) {
                kfree(strings);
                return ERR_PTR(-ENOMEM);
@@ -298,7 +392,18 @@
  */
 int xenbus_transaction_start(void)
 {
-       return xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
+       int err;
+
+       down(&xs_state.transaction_mutex);
+       xs_state.transaction_pid = current->pid;
+
+       err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
+       if (err) {
+               xs_state.transaction_pid = -1;
+               up(&xs_state.transaction_mutex);
+       }
+
+       return err;
 }
 EXPORT_SYMBOL(xenbus_transaction_start);
 
@@ -308,12 +413,19 @@
 int xenbus_transaction_end(int abort)
 {
        char abortstr[2];
+       int err;
 
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
-       return xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
+
+       err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
+
+       xs_state.transaction_pid = -1;
+       up(&xs_state.transaction_mutex);
+
+       return err;
 }
 EXPORT_SYMBOL(xenbus_transaction_end);
 
@@ -344,14 +456,23 @@
 {
        va_list ap;
        int ret;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
+#define PRINTF_BUFFER_SIZE 4096
+       char *printf_buffer;
+
+       printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
+       if (printf_buffer == NULL)
+               return -ENOMEM;
+
        va_start(ap, fmt);
-       ret = vsnprintf(printf_buffer, sizeof(printf_buffer), fmt, ap);
+       ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
        va_end(ap);
 
-       BUG_ON(ret > sizeof(printf_buffer)-1);
-       return xenbus_write(dir, node, printf_buffer);
+       BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
+       ret = xenbus_write(dir, node, printf_buffer);
+
+       kfree(printf_buffer);
+
+       return ret;
 }
 EXPORT_SYMBOL(xenbus_printf);
 
@@ -361,19 +482,28 @@
        va_list ap;
        int ret;
        unsigned int len;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
+       char *printf_buffer;
+
+       printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
+       if (printf_buffer == NULL)
+               goto fail;
 
        len = sprintf(printf_buffer, "%i ", -err);
        va_start(ap, fmt);
-       ret = vsnprintf(printf_buffer+len, sizeof(printf_buffer)-len, fmt, ap);
+       ret = vsnprintf(printf_buffer+len, PRINTF_BUFFER_SIZE-len, fmt, ap);
        va_end(ap);
 
-       BUG_ON(len + ret > sizeof(printf_buffer)-1);
+       BUG_ON(len + ret > PRINTF_BUFFER_SIZE-1);
        dev->has_error = 1;
        if (xenbus_write(dev->nodename, "error", printf_buffer) != 0)
-               printk("xenbus: failed to write error node for %s (%s)\n",
-                      dev->nodename, printf_buffer);
+               goto fail;
+
+       kfree(printf_buffer);
+       return;
+
+ fail:
+       printk("xenbus: failed to write error node for %s (%s)\n",
+              dev->nodename, printf_buffer);
 }
 EXPORT_SYMBOL(xenbus_dev_error);
 
@@ -432,26 +562,6 @@
        return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
 }
 
-static char **xs_read_watch(unsigned int *num)
-{
-       enum xsd_sockmsg_type type;
-       char *strings;
-       unsigned int len;
-
-       strings = read_reply(&type, &len);
-       if (IS_ERR(strings))
-               return (char **)strings;
-
-       BUG_ON(type != XS_WATCH_EVENT);
-
-       return split(strings, len, num);
-}
-
-static int xs_acknowledge_watch(const char *token)
-{
-       return xs_error(xs_single(XS_WATCH_ACK, token, NULL));
-}
-
 static int xs_unwatch(const char *path, const char *token)
 {
        struct kvec iov[2];
@@ -464,7 +574,6 @@
        return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
 }
 
-/* A little paranoia: we don't just trust token. */
 static struct xenbus_watch *find_watch(const char *token)
 {
        struct xenbus_watch *i, *cmp;
@@ -474,6 +583,7 @@
        list_for_each_entry(i, &watches, list)
                if (i == cmp)
                        return i;
+
        return NULL;
 }
 
@@ -485,11 +595,20 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       spin_lock(&watches_lock);
        BUG_ON(find_watch(token));
+       spin_unlock(&watches_lock);
 
        err = xs_watch(watch->node, token);
-       if (!err)
+
+       /* Ignore errors due to multiple registration. */
+       if ((err == 0) || (err == -EEXIST)) {
+               spin_lock(&watches_lock);
                list_add(&watch->list, &watches);
+               spin_unlock(&watches_lock);
+       }
+
        return err;
 }
 EXPORT_SYMBOL(register_xenbus_watch);
@@ -500,77 +619,188 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       spin_lock(&watches_lock);
        BUG_ON(!find_watch(token));
+       list_del(&watch->list);
+       spin_unlock(&watches_lock);
+
+       /* Ensure xs_resume() is not in progress (see comments there). */
+       wait_event(xs_resuming_waitq, !xs_resuming);
 
        err = xs_unwatch(watch->node, token);
-       list_del(&watch->list);
-
        if (err)
                printk(KERN_WARNING
                       "XENBUS Failed to release watch %s: %i\n",
                       watch->node, err);
+
+       /* Make sure watch is not in use. */
+       flush_scheduled_work();
 }
 EXPORT_SYMBOL(unregister_xenbus_watch);
 
-/* Re-register callbacks to all watches. */
-void reregister_xenbus_watches(void)
-{
+void xs_suspend(void)
+{
+       down(&xs_state.transaction_mutex);
+       down(&xs_state.request_mutex);
+}
+
+void xs_resume(void)
+{
+       struct list_head *ent, *prev_ent = &watches;
        struct xenbus_watch *watch;
        char token[sizeof(watch) * 2 + 1];
 
-       list_for_each_entry(watch, &watches, list) {
-               sprintf(token, "%lX", (long)watch);
-               xs_watch(watch->node, token);
-       }
-}
-
-static int watch_thread(void *unused)
-{
+       /* Protect against concurrent unregistration and freeing of watches. */
+       BUG_ON(xs_resuming);
+       xs_resuming = 1;
+
+       up(&xs_state.request_mutex);
+       up(&xs_state.transaction_mutex);
+
+       /*
+        * Iterate over the watch list re-registering each node. We must
+        * be careful about concurrent registrations and unregistrations.
+        * We search for the node immediately following the previously
+        * re-registered node. If we get no match then either we are done
+        * (previous node is last in list) or the node was unregistered, in
+        * which case we restart from the beginning of the list.
+        * register_xenbus_watch() + unregister_xenbus_watch() is safe because
+        * it will only ever move a watch node earlier in the list, so it
+        * cannot cause us to skip nodes.
+        */
        for (;;) {
-               char **vec = NULL;
-               unsigned int num;
-
-               wait_event(xb_waitq, xs_input_avail());
-
-               /* If this is a spurious wakeup caused by someone
-                * doing an op, they'll hold the lock and the buffer
-                * will be empty by the time we get there.               
-                */
-               down(&xenbus_lock);
-               if (xs_input_avail())
-                       vec = xs_read_watch(&num);
-
-               if (vec && !IS_ERR(vec)) {
-                       struct xenbus_watch *w;
-                       int err;
-
-                       err = xs_acknowledge_watch(vec[XS_WATCH_TOKEN]);
-                       if (err)
-                               printk(KERN_WARNING "XENBUS ack %s fail %i\n",
-                                      vec[XS_WATCH_TOKEN], err);
-                       w = find_watch(vec[XS_WATCH_TOKEN]);
-                       BUG_ON(!w);
-                       w->callback(w, (const char **)vec, num);
-                       kfree(vec);
-               } else if (vec)
-                       printk(KERN_WARNING "XENBUS xs_read_watch: %li\n",
-                              PTR_ERR(vec));
-               up(&xenbus_lock);
+               spin_lock(&watches_lock);
+               list_for_each(ent, &watches)
+                       if (ent->prev == prev_ent)
+                               break;
+               spin_unlock(&watches_lock);
+
+               /* No match because prev_ent is at the end of the list? */
+               if ((ent == &watches) && (watches.prev == prev_ent))
+                        break; /* We're done! */
+
+               if ((prev_ent = ent) != &watches) {
+                       /*
+                        * Safe even with watch_lock not held. We are saved by
+                        * (xs_resumed==1) check in unregister_xenbus_watch.
+                        */
+                       watch = list_entry(ent, struct xenbus_watch, list);
+                       sprintf(token, "%lX", (long)watch);
+                       xs_watch(watch->node, token);
+               }
+       }
+
+       xs_resuming = 0;
+       wake_up(&xs_resuming_waitq);
+}
+
+static void xenbus_fire_watch(void *arg)
+{
+       struct xs_stored_msg *msg = arg;
+
+       msg->u.watch.handle->callback(msg->u.watch.handle,
+                                     (const char **)msg->u.watch.vec,
+                                     msg->u.watch.vec_size);
+
+       kfree(msg->u.watch.vec);
+       kfree(msg);
+}
+
+static int process_msg(void)
+{
+       struct xs_stored_msg *msg;
+       char *body;
+       int err;
+
+       msg = kmalloc(sizeof(*msg), GFP_KERNEL);
+       if (msg == NULL)
+               return -ENOMEM;
+
+       err = xb_read(&msg->hdr, sizeof(msg->hdr));
+       if (err) {
+               kfree(msg);
+               return err;
+       }
+
+       body = kmalloc(msg->hdr.len + 1, GFP_KERNEL);
+       if (body == NULL) {
+               kfree(msg);
+               return -ENOMEM;
+       }
+
+       err = xb_read(body, msg->hdr.len);
+       if (err) {
+               kfree(body);
+               kfree(msg);
+               return err;
+       }
+       body[msg->hdr.len] = '\0';
+
+       if (msg->hdr.type == XS_WATCH_EVENT) {
+               INIT_WORK(&msg->u.watch.work, xenbus_fire_watch, msg);
+
+               msg->u.watch.vec = split(body, msg->hdr.len,
+                                        &msg->u.watch.vec_size);
+               if (IS_ERR(msg->u.watch.vec)) {
+                       kfree(msg);
+                       return PTR_ERR(msg->u.watch.vec);
+               }
+
+               spin_lock(&watches_lock);
+               msg->u.watch.handle = find_watch(
+                       msg->u.watch.vec[XS_WATCH_TOKEN]);
+               if (msg->u.watch.handle != NULL) {
+                       schedule_work(&msg->u.watch.work);
+               } else {
+                       kfree(msg->u.watch.vec);
+                       kfree(msg);
+               }
+               spin_unlock(&watches_lock);
+       } else {
+               msg->u.reply.body = body;
+               spin_lock(&xs_state.reply_lock);
+               list_add_tail(&msg->u.reply.list, &xs_state.reply_list);
+               spin_unlock(&xs_state.reply_lock);
+               wake_up(&xs_state.reply_waitq);
+       }
+
+       return 0;
+}
+
+static int read_thread(void *unused)
+{
+       int err;
+
+       for (;;) {
+               err = process_msg();
+               if (err)
+                       printk(KERN_WARNING "XENBUS error %d while reading "
+                              "message\n", err);
        }
 }
 
 int xs_init(void)
 {
        int err;
-       struct task_struct *watcher;
+       struct task_struct *reader;
+
+       INIT_LIST_HEAD(&xs_state.reply_list);
+       spin_lock_init(&xs_state.reply_lock);
+       init_waitqueue_head(&xs_state.reply_waitq);
+
+       init_MUTEX(&xs_state.request_mutex);
+       init_MUTEX(&xs_state.transaction_mutex);
+       xs_state.transaction_pid = -1;
 
        err = xb_init_comms();
        if (err)
                return err;
        
-       watcher = kthread_run(watch_thread, NULL, "kxbwatch");
-       if (IS_ERR(watcher))
-               return PTR_ERR(watcher);
+       reader = kthread_run(read_thread, NULL, "xenbusd");
+       if (IS_ERR(reader))
+               return PTR_ERR(reader);
+
        return 0;
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/include/asm-xen/xenbus.h
--- a/linux-2.6-xen-sparse/include/asm-xen/xenbus.h     Sun Oct  9 16:29:24 2005
+++ b/linux-2.6-xen-sparse/include/asm-xen/xenbus.h     Sun Oct  9 17:52:54 2005
@@ -78,10 +78,6 @@
 int xenbus_register_backend(struct xenbus_driver *drv);
 void xenbus_unregister_driver(struct xenbus_driver *drv);
 
-/* Caller must hold this lock to call these functions: it's also held
- * across watch callbacks. */
-extern struct semaphore xenbus_lock;
-
 char **xenbus_directory(const char *dir, const char *node, unsigned int *num);
 void *xenbus_read(const char *dir, const char *node, unsigned int *len);
 int xenbus_write(const char *dir, const char *node, const char *string);
@@ -113,7 +109,11 @@
 struct xenbus_watch
 {
        struct list_head list;
+
+       /* Path being watched. */
        char *node;
+
+       /* Callback (executed in a process context with no locks held). */
        void (*callback)(struct xenbus_watch *,
                         const char **vec, unsigned int len);
 };
@@ -124,7 +124,11 @@
 
 int register_xenbus_watch(struct xenbus_watch *watch);
 void unregister_xenbus_watch(struct xenbus_watch *watch);
-void reregister_xenbus_watches(void);
+void xs_suspend(void);
+void xs_resume(void);
+
+/* Used by xenbus_dev to borrow kernel's store connection. */
+void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg);
 
 /* Called from xen core code. */
 void xenbus_suspend(void);
diff -r ab93a9a46bd4 -r 8016551fde98 tools/blktap/xenbus.c
--- a/tools/blktap/xenbus.c     Sun Oct  9 16:29:24 2005
+++ b/tools/blktap/xenbus.c     Sun Oct  9 17:52:54 2005
@@ -260,10 +260,6 @@
     node  = res[XS_WATCH_PATH];
     token = res[XS_WATCH_TOKEN];
 
-    er = xs_acknowledge_watch(h, token);
-    if (er == 0)
-        warn("Couldn't acknowledge watch (%s)", token);
-
     w = find_watch(token);
     if (!w)
     {
diff -r ab93a9a46bd4 -r 8016551fde98 tools/console/daemon/io.c
--- a/tools/console/daemon/io.c Sun Oct  9 16:29:24 2005
+++ b/tools/console/daemon/io.c Sun Oct  9 17:52:54 2005
@@ -505,7 +505,6 @@
                        domain_create_ring(dom);
        }
 
-       xs_acknowledge_watch(xs, vec[1]);
        free(vec);
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/lowlevel/xs/xs.c
--- a/tools/python/xen/lowlevel/xs/xs.c Sun Oct  9 16:29:24 2005
+++ b/tools/python/xen/lowlevel/xs/xs.c Sun Oct  9 17:52:54 2005
@@ -442,9 +442,6 @@
 
 #define xspy_read_watch_doc "\n"                               \
        "Read a watch notification.\n"                          \
-       "The notification must be acknowledged by passing\n"    \
-       "the token to acknowledge_watch().\n"                   \
-       " path [string]: xenstore path.\n"                      \
        "\n"                                                    \
        "Returns: [tuple] (path, token).\n"                     \
        "Raises RuntimeError on error.\n"                       \
@@ -492,44 +489,6 @@
  exit:
     if (xsval)
         free(xsval);
-    return val;
-}
-
-#define xspy_acknowledge_watch_doc "\n"                                        
\
-       "Acknowledge a watch notification that has been read.\n"        \
-       " token [string] : from the watch notification\n"               \
-       "\n"                                                            \
-       "Returns None on success.\n"                                    \
-       "Raises RuntimeError on error.\n"                               \
-       "\n"
-
-static PyObject *xspy_acknowledge_watch(PyObject *self, PyObject *args,
-                                        PyObject *kwds)
-{
-    static char *kwd_spec[] = { "token", NULL };
-    static char *arg_spec = "O";
-    PyObject *token;
-    char token_str[MAX_STRLEN(unsigned long) + 1];
-
-    struct xs_handle *xh = xshandle(self);
-    PyObject *val = NULL;
-    int xsval = 0;
-
-    if (!xh)
-        goto exit;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec, &token))
-        goto exit;
-    sprintf(token_str, "%li", (unsigned long)token);
-    Py_BEGIN_ALLOW_THREADS
-    xsval = xs_acknowledge_watch(xh, token_str);
-    Py_END_ALLOW_THREADS
-    if (!xsval) {
-        PyErr_SetFromErrno(PyExc_RuntimeError);
-        goto exit;
-    }
-    Py_INCREF(Py_None);
-    val = Py_None;
- exit:
     return val;
 }
 
@@ -833,7 +792,6 @@
      XSPY_METH(set_permissions),
      XSPY_METH(watch),
      XSPY_METH(read_watch),
-     XSPY_METH(acknowledge_watch),
      XSPY_METH(unwatch),
      XSPY_METH(transaction_start),
      XSPY_METH(transaction_end),
diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/xend/xenstore/xswatch.py
--- a/tools/python/xen/xend/xenstore/xswatch.py Sun Oct  9 16:29:24 2005
+++ b/tools/python/xen/xend/xenstore/xswatch.py Sun Oct  9 17:52:54 2005
@@ -8,6 +8,7 @@
 import select
 import threading
 from xen.lowlevel import xs
+from xen.xend.xenstore.xsutil import xshandle
 
 class xswatch:
 
@@ -27,10 +28,7 @@
         if cls.watchThread:
             cls.xslock.release()
             return
-        # XXX: When we fix xenstored to have better watch semantics,
-        # this can change to shared xshandle(). Currently that would result
-        # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
-        cls.xs = xs.open()
+        cls.xs = xshandle()
         cls.watchThread = threading.Thread(name="Watcher",
                                            target=cls.watchMain)
         cls.watchThread.setDaemon(True)
@@ -43,11 +41,10 @@
         while True:
             try:
                 we = cls.xs.read_watch()
-                watch = we[1]
-                cls.xs.acknowledge_watch(watch)
             except RuntimeError, ex:
                 print ex
                 raise
+            watch = we[1]
             watch.fn(*watch.args, **watch.kwargs)
 
     watchMain = classmethod(watchMain)
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/07watch.test
--- a/tools/xenstore/testsuite/07watch.test     Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/07watch.test     Sun Oct  9 17:52:54 2005
@@ -5,7 +5,6 @@
 2 write /test contents2
 expect 1:/test:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Check that reads don't set it off.
@@ -22,15 +21,12 @@
 2 mkdir /dir/newdir
 expect 1:/dir/newdir:token
 1 waitwatch
-1 ackwatch token
 2 setperm /dir/newdir 0 READ
 expect 1:/dir/newdir:token
 1 waitwatch
-1 ackwatch token
 2 rm /dir/newdir
 expect 1:/dir/newdir:token
 1 waitwatch
-1 ackwatch token
 1 close
 2 close
 
@@ -49,7 +45,6 @@
 read /dir/test
 expect /dir/test:token
 waitwatch
-ackwatch token
 close
 
 # watch priority test: all simultaneous
@@ -59,13 +54,10 @@
 write /dir/test contents
 expect 3:/dir/test:token3
 3 waitwatch
-3 ackwatch token3
 expect 2:/dir/test:token2
 2 waitwatch
-2 ackwatch token2
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 2 close
 3 close
@@ -79,7 +71,6 @@
 2 close
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 
 # If one dies (without reading at all), the other should still get ack.
@@ -89,7 +80,6 @@
 2 close
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 2 close
 
@@ -111,7 +101,6 @@
 2 unwatch /dir token2
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 2 close
 
@@ -123,14 +112,12 @@
 write /dir/test contents2
 expect 1:/dir/test:token2
 1 waitwatch
-1 ackwatch token2
 
 # check we only get notified once.
 1 watch /test token
 2 write /test contents2
 expect 1:/test:token
 1 waitwatch
-1 ackwatch token
 expect 1: waitwatch failed: Connection timed out
 1 waitwatch
 1 close
@@ -142,13 +129,10 @@
 2 write /test3 contents
 expect 1:/test1:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test2:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test3:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Creation of subpaths should be covered correctly.
@@ -157,10 +141,8 @@
 2 write /test/subnode/subnode contents2
 expect 1:/test/subnode:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test/subnode/subnode:token
 1 waitwatch
-1 ackwatch token
 expect 1: waitwatch failed: Connection timed out
 1 waitwatch
 1 close
@@ -171,7 +153,6 @@
 1 watchnoack / token2 0
 expect 1:/test/subnode:token
 1 waitwatch
-1 ackwatch token
 expect 1:/:token2
 1 waitwatch
 expect 1: waitwatch failed: Connection timed out
@@ -183,7 +164,6 @@
 2 rm /test
 expect 1:/test/subnode:token
 1 waitwatch
-1 ackwatch token
 
 # Watch should not double-send after we ack, even if we did something in 
between.
 1 watch /test2 token
@@ -192,6 +172,5 @@
 1 waitwatch
 expect 1:contents2
 1 read /test2/foo
-1 ackwatch token
 expect 1: waitwatch failed: Connection timed out
 1 waitwatch
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/08transaction.test
--- a/tools/xenstore/testsuite/08transaction.test       Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/08transaction.test       Sun Oct  9 17:52:54 2005
@@ -68,7 +68,6 @@
 2 commit
 expect 1:/test/dir/sub:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Rm inside transaction works like rm outside: children get notified.
@@ -78,7 +77,6 @@
 2 commit
 expect 1:/test/dir/sub:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Multiple events from single transaction don't trigger assert
@@ -89,8 +87,6 @@
 2 commit
 expect 1:/test/1:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test/2:token
 1 waitwatch
-1 ackwatch token
 1 close
diff -r ab93a9a46bd4 -r 8016551fde98 
tools/xenstore/testsuite/10domain-homedir.test
--- a/tools/xenstore/testsuite/10domain-homedir.test    Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/10domain-homedir.test    Sun Oct  9 17:52:54 2005
@@ -16,4 +16,3 @@
 write /home/foo/bar contents
 expect 1:foo/bar:token
 1 waitwatch
-1 ackwatch token
diff -r ab93a9a46bd4 -r 8016551fde98 
tools/xenstore/testsuite/11domain-watch.test
--- a/tools/xenstore/testsuite/11domain-watch.test      Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/11domain-watch.test      Sun Oct  9 17:52:54 2005
@@ -10,7 +10,6 @@
 write /test contents2
 expect 1:/test:token
 1 waitwatch
-1 ackwatch token
 1 unwatch /test token
 release 1
 1 close
@@ -25,7 +24,6 @@
 1 write /dir/test4 contents4
 expect 1:/dir/test:token
 1 waitwatch
-1 ackwatch token
 release 1
 1 close
 
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/12readonly.test
--- a/tools/xenstore/testsuite/12readonly.test  Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/12readonly.test  Sun Oct  9 17:52:54 2005
@@ -36,4 +36,3 @@
 1 write /test contents
 expect /test:token
 waitwatch
-ackwatch token
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/13watch-ack.test
--- a/tools/xenstore/testsuite/13watch-ack.test Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/13watch-ack.test Sun Oct  9 17:52:54 2005
@@ -18,5 +18,4 @@
 1 waitwatch
 3 write /test/1 contents1
 4 write /test/3 contents3
-1 ackwatch token2
 1 close
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c   Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xenstored_core.c   Sun Oct  9 17:52:54 2005
@@ -154,7 +154,6 @@
        case XS_READ: return "READ";
        case XS_GET_PERMS: return "GET_PERMS";
        case XS_WATCH: return "WATCH";
-       case XS_WATCH_ACK: return "WATCH_ACK";
        case XS_UNWATCH: return "UNWATCH";
        case XS_TRANSACTION_START: return "TRANSACTION_START";
        case XS_TRANSACTION_END: return "TRANSACTION_END";
@@ -1103,10 +1102,6 @@
                do_watch(conn, in);
                break;
 
-       case XS_WATCH_ACK:
-               do_watch_ack(conn, onearg(in));
-               break;
-
        case XS_UNWATCH:
                do_unwatch(conn, in);
                break;
@@ -1167,11 +1162,6 @@
        if (verbose)
                xprintf("Got message %s len %i from %p\n",
                        sockmsg_string(type), conn->in->hdr.msg.len, conn);
-
-       /* We might get a command while waiting for an ack: this means
-        * the other end discarded it: we will re-transmit. */
-       if (type != XS_WATCH_ACK)
-               conn->waiting_for_ack = NULL;
 
        /* Careful: process_message may free connection.  We detach
         * "in" beforehand and allocate the new buffer to avoid
@@ -1266,7 +1256,6 @@
 
        new->state = OK;
        new->out = new->waiting_reply = NULL;
-       new->waiting_for_ack = NULL;
        new->fd = -1;
        new->id = 0;
        new->domain = NULL;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.h
--- a/tools/xenstore/xenstored_core.h   Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xenstored_core.h   Sun Oct  9 17:52:54 2005
@@ -70,9 +70,6 @@
 
        /* Is this a read-only connection? */
        bool can_write;
-
-       /* Are we waiting for a watch event ack? */
-       struct watch *waiting_for_ack;
 
        /* Buffered incoming data. */
        struct buffered_data *in;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_watch.c
--- a/tools/xenstore/xenstored_watch.c  Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xenstored_watch.c  Sun Oct  9 17:52:54 2005
@@ -69,18 +69,14 @@
        if (conn->waiting_reply) {
                conn->out = conn->waiting_reply;
                conn->waiting_reply = NULL;
-               conn->waiting_for_ack = NULL;
-               return;
-       }
-
-       /* If we're already waiting for ack, don't queue more. */
-       if (conn->waiting_for_ack)
-               return;
+               return;
+       }
 
        list_for_each_entry(watch, &conn->watches, list) {
                event = list_top(&watch->events, struct watch_event, list);
                if (event) {
-                       conn->waiting_for_ack = watch;
+                       list_del(&event->list);
+                       talloc_free(event);
                        send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
                        break;
                }
@@ -181,6 +177,15 @@
                }
        }
 
+       /* Check for duplicates. */
+       list_for_each_entry(watch, &conn->watches, list) {
+               if (streq(watch->node, vec[0]) &&
+                    streq(watch->token, vec[1])) {
+                       send_error(conn, EEXIST);
+                       return;
+               }
+       }
+
        watch = talloc(conn, struct watch);
        watch->node = talloc_strdup(watch, vec[0]);
        watch->token = talloc_strdup(watch, vec[1]);
@@ -200,37 +205,6 @@
        add_event(conn, watch, watch->node);
 }
 
-void do_watch_ack(struct connection *conn, const char *token)
-{
-       struct watch_event *event;
-
-       if (!token) {
-               send_error(conn, EINVAL);
-               return;
-       }
-
-       if (!conn->waiting_for_ack) {
-               send_error(conn, ENOENT);
-               return;
-       }
-
-       if (!streq(conn->waiting_for_ack->token, token)) {
-               /* They're confused: this will cause us to send event again */
-               conn->waiting_for_ack = NULL;
-               send_error(conn, EINVAL);
-               return;
-       }
-
-       /* Remove event: after ack sent, core will call queue_next_event */
-       event = list_top(&conn->waiting_for_ack->events, struct watch_event,
-                        list);
-       list_del(&event->list);
-       talloc_free(event);
-
-       conn->waiting_for_ack = NULL;
-       send_ack(conn, XS_WATCH_ACK);
-}
-
 void do_unwatch(struct connection *conn, struct buffered_data *in)
 {
        struct watch *watch;
@@ -241,9 +215,6 @@
                return;
        }
 
-       /* We don't need to worry if we're waiting for an ack for the
-        * watch we're deleting: conn->waiting_for_ack was reset by
-        * this command in consider_message anyway. */
        node = canonicalize(conn, vec[0]);
        list_for_each_entry(watch, &conn->watches, list) {
                if (streq(watch->node, node) && streq(watch->token, vec[1])) {
@@ -262,11 +233,6 @@
        struct watch *watch;
        struct watch_event *event;
 
-       if (conn->waiting_for_ack)
-               printf("    waiting_for_ack for watch on %s token %s\n",
-                      conn->waiting_for_ack->node,
-                      conn->waiting_for_ack->token);
-
        list_for_each_entry(watch, &conn->watches, list) {
                printf("    watch on %s token %s\n",
                       watch->node, watch->token);
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.c
--- a/tools/xenstore/xs.c       Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xs.c       Sun Oct  9 17:52:54 2005
@@ -78,9 +78,29 @@
 
        /* One transaction at a time. */
        pthread_mutex_t transaction_mutex;
+       pthread_t transaction_pthread;
 };
 
 static void *read_thread(void *arg);
+
+static void request_mutex_acquire(struct xs_handle *h)
+{
+       /*
+        * We can't distinguish non-transactional from transactional
+        * requests right now. So temporarily acquire the transaction mutex
+        * if this task is outside transaction context.
+        */
+       if (h->transaction_pthread != pthread_self())
+               pthread_mutex_lock(&h->transaction_mutex);
+       pthread_mutex_lock(&h->request_mutex);
+}
+
+static void request_mutex_release(struct xs_handle *h)
+{
+       pthread_mutex_unlock(&h->request_mutex);
+       if (h->transaction_pthread != pthread_self())
+               pthread_mutex_unlock(&h->transaction_mutex);
+}
 
 int xs_fileno(struct xs_handle *h)
 {
@@ -163,6 +183,7 @@
 
        pthread_mutex_init(&h->request_mutex, NULL);
        pthread_mutex_init(&h->transaction_mutex, NULL);
+       h->transaction_pthread = -1;
 
        if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
                goto error;
@@ -316,7 +337,7 @@
        ignorepipe.sa_flags = 0;
        sigaction(SIGPIPE, &ignorepipe, &oldact);
 
-       pthread_mutex_lock(&h->request_mutex);
+       request_mutex_acquire(h);
 
        if (!xs_write_all(h->fd, &msg, sizeof(msg)))
                goto fail;
@@ -329,7 +350,7 @@
        if (!ret)
                goto fail;
 
-       pthread_mutex_unlock(&h->request_mutex);
+       request_mutex_release(h);
 
        sigaction(SIGPIPE, &oldact, NULL);
        if (msg.type == XS_ERROR) {
@@ -350,7 +371,7 @@
 fail:
        /* We're in a bad state, so close fd. */
        saved_errno = errno;
-       pthread_mutex_unlock(&h->request_mutex);
+       request_mutex_release(h);
        sigaction(SIGPIPE, &oldact, NULL);
 close_fd:
        close(h->fd);
@@ -593,15 +614,6 @@
        return ret;
 }
 
-/* Acknowledge watch on node.  Watches must be acknowledged before
- * any other watches can be read.
- * Returns false on failure.
- */
-bool xs_acknowledge_watch(struct xs_handle *h, const char *token)
-{
-       return xs_bool(xs_single(h, XS_WATCH_ACK, token, NULL));
-}
-
 /* Remove a watch on a node.
  * Returns false on failure (no watch on that node).
  */
@@ -624,8 +636,18 @@
  */
 bool xs_transaction_start(struct xs_handle *h)
 {
+       bool rc;
+
        pthread_mutex_lock(&h->transaction_mutex);
-       return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
+       h->transaction_pthread = pthread_self();
+
+       rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
+       if (!rc) {
+               h->transaction_pthread = -1;
+               pthread_mutex_unlock(&h->transaction_mutex);
+       }
+
+       return rc;
 }
 
 /* End a transaction.
@@ -645,6 +667,7 @@
        
        rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
 
+       h->transaction_pthread = -1;
        pthread_mutex_unlock(&h->transaction_mutex);
 
        return rc;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.h
--- a/tools/xenstore/xs.h       Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xs.h       Sun Oct  9 17:52:54 2005
@@ -96,12 +96,6 @@
  */
 char **xs_read_watch(struct xs_handle *h, unsigned int *num);
 
-/* Acknowledge watch on node.  Watches must be acknowledged before
- * any other watches can be read.
- * Returns false on failure.
- */
-bool xs_acknowledge_watch(struct xs_handle *h, const char *token);
-
 /* Remove a watch on a node: implicitly acks any outstanding watch.
  * Returns false on failure (no watch on that node).
  */
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs_test.c
--- a/tools/xenstore/xs_test.c  Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xs_test.c  Sun Oct  9 17:52:54 2005
@@ -201,7 +201,6 @@
             "  watch <path> <token>\n"
             "  watchnoack <path> <token>\n"
             "  waitwatch\n"
-            "  ackwatch <token>\n"
             "  unwatch <path> <token>\n"
             "  close\n"
             "  start <node>\n"
@@ -455,8 +454,6 @@
                    !streq(vec[XS_WATCH_PATH], node) ||
                    !streq(vec[XS_WATCH_TOKEN], token))
                        failed(handle);
-               if (!xs_acknowledge_watch(handles[handle], token))
-                       failed(handle);
        }
 }
 
@@ -513,12 +510,6 @@
        else
                output("%s:%s\n", vec[XS_WATCH_PATH], vec[XS_WATCH_TOKEN]);
        free(vec);
-}
-
-static void do_ackwatch(unsigned int handle, const char *token)
-{
-       if (!xs_acknowledge_watch(handles[handle], token))
-               failed(handle);
 }
 
 static void do_unwatch(unsigned int handle, const char *node, const char 
*token)
@@ -746,8 +737,6 @@
                do_watch(handle, arg(line, 1), arg(line, 2), false);
        else if (streq(command, "waitwatch"))
                do_waitwatch(handle);
-       else if (streq(command, "ackwatch"))
-               do_ackwatch(handle, arg(line, 1));
        else if (streq(command, "unwatch"))
                do_unwatch(handle, arg(line, 1), arg(line, 2));
        else if (streq(command, "close")) {
diff -r ab93a9a46bd4 -r 8016551fde98 xen/include/public/io/xs_wire.h
--- a/xen/include/public/io/xs_wire.h   Sun Oct  9 16:29:24 2005
+++ b/xen/include/public/io/xs_wire.h   Sun Oct  9 17:52:54 2005
@@ -35,11 +35,9 @@
        XS_READ,
        XS_GET_PERMS,
        XS_WATCH,
-       XS_WATCH_ACK,
        XS_UNWATCH,
        XS_TRANSACTION_START,
        XS_TRANSACTION_END,
-       XS_OP_READ_ONLY = XS_TRANSACTION_END,
        XS_INTRODUCE,
        XS_RELEASE,
        XS_GET_DOMAIN_PATH,

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog

<Prev in Thread] Current Thread [Next in Thread>
  • [Xen-changelog] Refactor xenbus to break up the xenbus_lock and permit watches, Xen patchbot -unstable <=