# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 8016551fde9825fc82bfa4762f17b98e7519b823
# Parent ab93a9a46bd48f9a654b1fdc9caf4b7ae07f6a8b
Refactor xenbus to break up the xenbus_lock and permit watches
to fire concurrently with request/reply pairs. Remove
watch_ack message: no longer needed.
Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c
--- a/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c Sun Oct 9
16:29:24 2005
+++ b/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c Sun Oct 9
17:52:54 2005
@@ -1327,18 +1327,14 @@
.callback = handle_vcpu_hotplug_event
};
-/* NB: Assumes xenbus_lock is held! */
static int setup_cpu_watcher(struct notifier_block *notifier,
unsigned long event, void *data)
{
- int err = 0;
-
- BUG_ON(down_trylock(&xenbus_lock) == 0);
+ int err;
+
err = register_xenbus_watch(&cpu_watch);
-
- if (err) {
+ if (err)
printk("Failed to register watch on /cpu\n");
- }
return NOTIFY_DONE;
}
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/arch/xen/kernel/reboot.c
--- a/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c Sun Oct 9 16:29:24 2005
+++ b/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c Sun Oct 9 17:52:54 2005
@@ -360,9 +360,6 @@
static struct notifier_block xenstore_notifier;
-/* Setup our watcher
- NB: Assumes xenbus_lock is held!
-*/
static int setup_shutdown_watcher(struct notifier_block *notifier,
unsigned long event,
void *data)
@@ -371,8 +368,6 @@
#ifdef CONFIG_MAGIC_SYSRQ
int err2 = 0;
#endif
-
- BUG_ON(down_trylock(&xenbus_lock) == 0);
err1 = register_xenbus_watch(&shutdown_watch);
#ifdef CONFIG_MAGIC_SYSRQ
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c
--- a/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c Sun Oct 9
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c Sun Oct 9
17:52:54 2005
@@ -370,16 +370,11 @@
}
-/* Setup our watcher
- NB: Assumes xenbus_lock is held!
-*/
int balloon_init_watcher(struct notifier_block *notifier,
unsigned long event,
void *data)
{
int err;
-
- BUG_ON(down_trylock(&xenbus_lock) == 0);
err = register_xenbus_watch(&target_watch);
if (err)
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c Sun Oct 9
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c Sun Oct 9
17:52:54 2005
@@ -130,15 +130,10 @@
wait_event(xb_waitq, output_avail(out));
- /* Read, then check: not that we don't trust store.
- * Hell, some of my best friends are daemons. But,
- * in this post-911 world... */
+ mb();
h = *out;
- mb();
- if (!check_buffer(&h)) {
- set_current_state(TASK_RUNNING);
- return -EIO; /* ETERRORIST! */
- }
+ if (!check_buffer(&h))
+ return -EIO;
dst = get_output_chunk(&h, out->buf, &avail);
if (avail > len)
@@ -173,12 +168,11 @@
const char *src;
wait_event(xb_waitq, xs_input_avail());
+
+ mb();
h = *in;
- mb();
- if (!check_buffer(&h)) {
- set_current_state(TASK_RUNNING);
+ if (!check_buffer(&h))
return -EIO;
- }
src = get_input_chunk(&h, in->buf, &avail);
if (avail > len)
@@ -195,10 +189,6 @@
notify_remote_via_evtchn(xen_start_info->store_evtchn);
}
- /* If we left something, wake watch thread to deal with it. */
- if (xs_input_avail())
- wake_up(&xb_waitq);
-
return 0;
}
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c Sun Oct 9
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c Sun Oct 9
17:52:54 2005
@@ -46,85 +46,113 @@
#include <asm/hypervisor.h>
struct xenbus_dev_data {
- /* Are there bytes left to be read in this message? */
- int bytes_left;
- /* Are we still waiting for the reply to a message we wrote? */
- int awaiting_reply;
- /* Buffer for outgoing messages. */
+ int in_transaction;
+
+ /* Partial request. */
unsigned int len;
union {
struct xsd_sockmsg msg;
char buffer[PAGE_SIZE];
} u;
+
+ /* Response queue. */
+#define MASK_READ_IDX(idx) ((idx)&(PAGE_SIZE-1))
+ char read_buffer[PAGE_SIZE];
+ unsigned int read_cons, read_prod;
+ wait_queue_head_t read_waitq;
};
static struct proc_dir_entry *xenbus_dev_intf;
-/* Reply can be long (dir, getperm): don't buffer, just examine
- * headers so we can discard rest if they die. */
static ssize_t xenbus_dev_read(struct file *filp,
char __user *ubuf,
size_t len, loff_t *ppos)
{
- struct xenbus_dev_data *data = filp->private_data;
- struct xsd_sockmsg msg;
- int err;
-
- /* Refill empty buffer? */
- if (data->bytes_left == 0) {
- if (len < sizeof(msg))
- return -EINVAL;
-
- err = xb_read(&msg, sizeof(msg));
- if (err)
- return err;
- data->bytes_left = msg.len;
- if (ubuf && copy_to_user(ubuf, &msg, sizeof(msg)) != 0)
- return -EFAULT;
- /* We can receive spurious XS_WATCH_EVENT messages. */
- if (msg.type != XS_WATCH_EVENT)
- data->awaiting_reply = 0;
- return sizeof(msg);
+ struct xenbus_dev_data *u = filp->private_data;
+ int i;
+
+ if (wait_event_interruptible(u->read_waitq,
+ u->read_prod != u->read_cons))
+ return -EINTR;
+
+ for (i = 0; i < len; i++) {
+ if (u->read_cons == u->read_prod)
+ break;
+ put_user(u->read_buffer[MASK_READ_IDX(u->read_cons)], ubuf+i);
+ u->read_cons++;
}
- /* Don't read over next header, or over temporary buffer. */
- if (len > sizeof(data->u.buffer))
- len = sizeof(data->u.buffer);
- if (len > data->bytes_left)
- len = data->bytes_left;
-
- err = xb_read(data->u.buffer, len);
- if (err)
- return err;
-
- data->bytes_left -= len;
- if (ubuf && copy_to_user(ubuf, data->u.buffer, len) != 0)
- return -EFAULT;
- return len;
-}
-
-/* We do v. basic sanity checking so they don't screw up kernel later. */
+ return i;
+}
+
+static void queue_reply(struct xenbus_dev_data *u,
+ char *data, unsigned int len)
+{
+ int i;
+
+ for (i = 0; i < len; i++, u->read_prod++)
+ u->read_buffer[MASK_READ_IDX(u->read_prod)] = data[i];
+
+ BUG_ON((u->read_prod - u->read_cons) > sizeof(u->read_buffer));
+
+ wake_up(&u->read_waitq);
+}
+
static ssize_t xenbus_dev_write(struct file *filp,
const char __user *ubuf,
size_t len, loff_t *ppos)
{
- struct xenbus_dev_data *data = filp->private_data;
- int err;
-
- /* We gather data in buffer until we're ready to send it. */
- if (len > data->len + sizeof(data->u))
+ struct xenbus_dev_data *u = filp->private_data;
+ void *reply;
+ int err = 0;
+
+ if ((len + u->len) > sizeof(u->u.buffer))
return -EINVAL;
- if (copy_from_user(data->u.buffer + data->len, ubuf, len) != 0)
+
+ if (copy_from_user(u->u.buffer + u->len, ubuf, len) != 0)
return -EFAULT;
- data->len += len;
- if (data->len >= sizeof(data->u.msg) + data->u.msg.len) {
- err = xb_write(data->u.buffer, data->len);
- if (err)
- return err;
- data->len = 0;
- data->awaiting_reply = 1;
+
+ u->len += len;
+ if (u->len < (sizeof(u->u.msg) + u->u.msg.len))
+ return len;
+
+ switch (u->u.msg.type) {
+ case XS_TRANSACTION_START:
+ case XS_TRANSACTION_END:
+ case XS_DIRECTORY:
+ case XS_READ:
+ case XS_GET_PERMS:
+ case XS_RELEASE:
+ case XS_GET_DOMAIN_PATH:
+ case XS_WRITE:
+ case XS_MKDIR:
+ case XS_RM:
+ case XS_SET_PERMS:
+ reply = xenbus_dev_request_and_reply(&u->u.msg);
+ if (IS_ERR(reply))
+ err = PTR_ERR(reply);
+ else {
+ if (u->u.msg.type == XS_TRANSACTION_START)
+ u->in_transaction = 1;
+ if (u->u.msg.type == XS_TRANSACTION_END)
+ u->in_transaction = 0;
+ queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
+ queue_reply(u, (char *)reply, u->u.msg.len);
+ kfree(reply);
+ }
+ break;
+
+ default:
+ err = -EINVAL;
+ break;
}
- return len;
+
+ if (err == 0) {
+ u->len = 0;
+ err = len;
+ }
+
+ return err;
}
static int xenbus_dev_open(struct inode *inode, struct file *filp)
@@ -134,7 +162,6 @@
if (xen_start_info->store_evtchn == 0)
return -ENOENT;
- /* Don't try seeking. */
nonseekable_open(inode, filp);
u = kmalloc(sizeof(*u), GFP_KERNEL);
@@ -142,28 +169,21 @@
return -ENOMEM;
memset(u, 0, sizeof(*u));
+ init_waitqueue_head(&u->read_waitq);
filp->private_data = u;
- down(&xenbus_lock);
-
return 0;
}
static int xenbus_dev_release(struct inode *inode, struct file *filp)
{
- struct xenbus_dev_data *data = filp->private_data;
-
- /* Discard any unread replies. */
- while (data->bytes_left || data->awaiting_reply)
- xenbus_dev_read(filp, NULL, sizeof(data->u.buffer), NULL);
-
- /* Harmless if no transaction in progress. */
- xenbus_transaction_end(1);
-
- up(&xenbus_lock);
-
- kfree(data);
+ struct xenbus_dev_data *u = filp->private_data;
+
+ if (u->in_transaction)
+ xenbus_transaction_end(1);
+
+ kfree(u);
return 0;
}
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c Sun Oct 9
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c Sun Oct 9
17:52:54 2005
@@ -43,6 +43,9 @@
static struct notifier_block *xenstore_chain;
+/* Now used to protect xenbus probes against save/restore. */
+static DECLARE_MUTEX(xenbus_lock);
+
/* If something in array of ids matches this device, return it. */
static const struct xenbus_device_id *
match_device(const struct xenbus_device_id *arr, struct xenbus_device *dev)
@@ -625,12 +628,13 @@
down(&xenbus_lock);
bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, suspend_dev);
bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, suspend_dev);
+ xs_suspend();
}
void xenbus_resume(void)
{
xb_init_comms();
- reregister_xenbus_watches();
+ xs_resume();
bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, resume_dev);
bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, resume_dev);
up(&xenbus_lock);
@@ -685,6 +689,7 @@
/* Notify others that xenstore is up */
notifier_call_chain(&xenstore_chain, 0, 0);
up(&xenbus_lock);
+
return 0;
}
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c Sun Oct 9
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c Sun Oct 9
17:52:54 2005
@@ -42,11 +42,67 @@
#define streq(a, b) (strcmp((a), (b)) == 0)
-static char printf_buffer[4096];
+struct xs_stored_msg {
+ struct xsd_sockmsg hdr;
+
+ union {
+ /* Stored replies. */
+ struct {
+ struct list_head list;
+ char *body;
+ } reply;
+
+ /* Queued watch callbacks. */
+ struct {
+ struct work_struct work;
+ struct xenbus_watch *handle;
+ char **vec;
+ unsigned int vec_size;
+ } watch;
+ } u;
+};
+
+struct xs_handle {
+ /* A list of replies. Currently only one will ever be outstanding. */
+ struct list_head reply_list;
+ spinlock_t reply_lock;
+ wait_queue_head_t reply_waitq;
+
+ /* One request at a time. */
+ struct semaphore request_mutex;
+
+ /* One transaction at a time. */
+ struct semaphore transaction_mutex;
+ int transaction_pid;
+};
+
+static struct xs_handle xs_state;
+
static LIST_HEAD(watches);
-
-DECLARE_MUTEX(xenbus_lock);
-EXPORT_SYMBOL(xenbus_lock);
+static DEFINE_SPINLOCK(watches_lock);
+
+/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
+static int xs_resuming;
+static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
+
+static void request_mutex_acquire(void)
+{
+ /*
+ * We can't distinguish non-transactional from transactional
+ * requests right now. So temporarily acquire the transaction mutex
+ * if this task is outside transaction context.
+ */
+ if (xs_state.transaction_pid != current->pid)
+ down(&xs_state.transaction_mutex);
+ down(&xs_state.request_mutex);
+}
+
+static void request_mutex_release(void)
+{
+ up(&xs_state.request_mutex);
+ if (xs_state.transaction_pid != current->pid)
+ up(&xs_state.transaction_mutex);
+}
static int get_error(const char *errorstring)
{
@@ -65,29 +121,32 @@
static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
{
- struct xsd_sockmsg msg;
- void *ret;
- int err;
-
- err = xb_read(&msg, sizeof(msg));
- if (err)
- return ERR_PTR(err);
-
- ret = kmalloc(msg.len + 1, GFP_KERNEL);
- if (!ret)
- return ERR_PTR(-ENOMEM);
-
- err = xb_read(ret, msg.len);
- if (err) {
- kfree(ret);
- return ERR_PTR(err);
- }
- ((char*)ret)[msg.len] = '\0';
-
- *type = msg.type;
+ struct xs_stored_msg *msg;
+ char *body;
+
+ spin_lock(&xs_state.reply_lock);
+
+ while (list_empty(&xs_state.reply_list)) {
+ spin_unlock(&xs_state.reply_lock);
+ wait_event(xs_state.reply_waitq,
+ !list_empty(&xs_state.reply_list));
+ spin_lock(&xs_state.reply_lock);
+ }
+
+ msg = list_entry(xs_state.reply_list.next,
+ struct xs_stored_msg, u.reply.list);
+ list_del(&msg->u.reply.list);
+
+ spin_unlock(&xs_state.reply_lock);
+
+ *type = msg->hdr.type;
if (len)
- *len = msg.len;
- return ret;
+ *len = msg->hdr.len;
+ body = msg->u.reply.body;
+
+ kfree(msg);
+
+ return body;
}
/* Emergency write. */
@@ -98,10 +157,45 @@
msg.type = XS_DEBUG;
msg.len = sizeof("print") + count + 1;
+ request_mutex_acquire();
xb_write(&msg, sizeof(msg));
xb_write("print", sizeof("print"));
xb_write(str, count);
xb_write("", 1);
+ request_mutex_release();
+}
+
+void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
+{
+ void *ret;
+ struct xsd_sockmsg req_msg = *msg;
+ int err;
+
+ if (req_msg.type == XS_TRANSACTION_START) {
+ down(&xs_state.transaction_mutex);
+ xs_state.transaction_pid = current->pid;
+ }
+
+ request_mutex_acquire();
+
+ err = xb_write(msg, sizeof(*msg) + msg->len);
+ if (err) {
+ msg->type = XS_ERROR;
+ ret = ERR_PTR(err);
+ } else {
+ ret = read_reply(&msg->type, &msg->len);
+ }
+
+ request_mutex_release();
+
+ if ((msg->type == XS_TRANSACTION_END) ||
+ ((req_msg.type == XS_TRANSACTION_START) &&
+ (msg->type == XS_ERROR))) {
+ xs_state.transaction_pid = -1;
+ up(&xs_state.transaction_mutex);
+ }
+
+ return ret;
}
/* Send message to xs, get kmalloc'ed reply. ERR_PTR() on error. */
@@ -115,31 +209,33 @@
unsigned int i;
int err;
- WARN_ON(down_trylock(&xenbus_lock) == 0);
-
msg.type = type;
msg.len = 0;
for (i = 0; i < num_vecs; i++)
msg.len += iovec[i].iov_len;
+ request_mutex_acquire();
+
err = xb_write(&msg, sizeof(msg));
- if (err)
+ if (err) {
+ up(&xs_state.request_mutex);
return ERR_PTR(err);
+ }
for (i = 0; i < num_vecs; i++) {
err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
- if (err)
+ if (err) {
+ request_mutex_release();
return ERR_PTR(err);
- }
-
- /* Watches can have fired before reply comes: daemon detects
- * and re-transmits, so we can ignore this. */
- do {
- kfree(ret);
- ret = read_reply(&msg.type, len);
- if (IS_ERR(ret))
- return ret;
- } while (msg.type == XS_WATCH_EVENT);
+ }
+ }
+
+ ret = read_reply(&msg.type, len);
+
+ request_mutex_release();
+
+ if (IS_ERR(ret))
+ return ret;
if (msg.type == XS_ERROR) {
err = get_error(ret);
@@ -187,8 +283,6 @@
{
static char buffer[4096];
- BUG_ON(down_trylock(&xenbus_lock) == 0);
- /* XXX FIXME: might not be correct if name == "" */
BUG_ON(strlen(dir) + strlen("/") + strlen(name) + 1 > sizeof(buffer));
strcpy(buffer, dir);
@@ -207,7 +301,7 @@
*num = count_strings(strings, len);
/* Transfer to one big alloc for easy freeing. */
- ret = kmalloc(*num * sizeof(char *) + len, GFP_ATOMIC);
+ ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL);
if (!ret) {
kfree(strings);
return ERR_PTR(-ENOMEM);
@@ -298,7 +392,18 @@
*/
int xenbus_transaction_start(void)
{
- return xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
+ int err;
+
+ down(&xs_state.transaction_mutex);
+ xs_state.transaction_pid = current->pid;
+
+ err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
+ if (err) {
+ xs_state.transaction_pid = -1;
+ up(&xs_state.transaction_mutex);
+ }
+
+ return err;
}
EXPORT_SYMBOL(xenbus_transaction_start);
@@ -308,12 +413,19 @@
int xenbus_transaction_end(int abort)
{
char abortstr[2];
+ int err;
if (abort)
strcpy(abortstr, "F");
else
strcpy(abortstr, "T");
- return xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
+
+ err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
+
+ xs_state.transaction_pid = -1;
+ up(&xs_state.transaction_mutex);
+
+ return err;
}
EXPORT_SYMBOL(xenbus_transaction_end);
@@ -344,14 +456,23 @@
{
va_list ap;
int ret;
-
- BUG_ON(down_trylock(&xenbus_lock) == 0);
+#define PRINTF_BUFFER_SIZE 4096
+ char *printf_buffer;
+
+ printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
+ if (printf_buffer == NULL)
+ return -ENOMEM;
+
va_start(ap, fmt);
- ret = vsnprintf(printf_buffer, sizeof(printf_buffer), fmt, ap);
+ ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
va_end(ap);
- BUG_ON(ret > sizeof(printf_buffer)-1);
- return xenbus_write(dir, node, printf_buffer);
+ BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
+ ret = xenbus_write(dir, node, printf_buffer);
+
+ kfree(printf_buffer);
+
+ return ret;
}
EXPORT_SYMBOL(xenbus_printf);
@@ -361,19 +482,28 @@
va_list ap;
int ret;
unsigned int len;
-
- BUG_ON(down_trylock(&xenbus_lock) == 0);
+ char *printf_buffer;
+
+ printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
+ if (printf_buffer == NULL)
+ goto fail;
len = sprintf(printf_buffer, "%i ", -err);
va_start(ap, fmt);
- ret = vsnprintf(printf_buffer+len, sizeof(printf_buffer)-len, fmt, ap);
+ ret = vsnprintf(printf_buffer+len, PRINTF_BUFFER_SIZE-len, fmt, ap);
va_end(ap);
- BUG_ON(len + ret > sizeof(printf_buffer)-1);
+ BUG_ON(len + ret > PRINTF_BUFFER_SIZE-1);
dev->has_error = 1;
if (xenbus_write(dev->nodename, "error", printf_buffer) != 0)
- printk("xenbus: failed to write error node for %s (%s)\n",
- dev->nodename, printf_buffer);
+ goto fail;
+
+ kfree(printf_buffer);
+ return;
+
+ fail:
+ printk("xenbus: failed to write error node for %s (%s)\n",
+ dev->nodename, printf_buffer);
}
EXPORT_SYMBOL(xenbus_dev_error);
@@ -432,26 +562,6 @@
return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
}
-static char **xs_read_watch(unsigned int *num)
-{
- enum xsd_sockmsg_type type;
- char *strings;
- unsigned int len;
-
- strings = read_reply(&type, &len);
- if (IS_ERR(strings))
- return (char **)strings;
-
- BUG_ON(type != XS_WATCH_EVENT);
-
- return split(strings, len, num);
-}
-
-static int xs_acknowledge_watch(const char *token)
-{
- return xs_error(xs_single(XS_WATCH_ACK, token, NULL));
-}
-
static int xs_unwatch(const char *path, const char *token)
{
struct kvec iov[2];
@@ -464,7 +574,6 @@
return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
}
-/* A little paranoia: we don't just trust token. */
static struct xenbus_watch *find_watch(const char *token)
{
struct xenbus_watch *i, *cmp;
@@ -474,6 +583,7 @@
list_for_each_entry(i, &watches, list)
if (i == cmp)
return i;
+
return NULL;
}
@@ -485,11 +595,20 @@
int err;
sprintf(token, "%lX", (long)watch);
+
+ spin_lock(&watches_lock);
BUG_ON(find_watch(token));
+ spin_unlock(&watches_lock);
err = xs_watch(watch->node, token);
- if (!err)
+
+ /* Ignore errors due to multiple registration. */
+ if ((err == 0) || (err == -EEXIST)) {
+ spin_lock(&watches_lock);
list_add(&watch->list, &watches);
+ spin_unlock(&watches_lock);
+ }
+
return err;
}
EXPORT_SYMBOL(register_xenbus_watch);
@@ -500,77 +619,188 @@
int err;
sprintf(token, "%lX", (long)watch);
+
+ spin_lock(&watches_lock);
BUG_ON(!find_watch(token));
+ list_del(&watch->list);
+ spin_unlock(&watches_lock);
+
+ /* Ensure xs_resume() is not in progress (see comments there). */
+ wait_event(xs_resuming_waitq, !xs_resuming);
err = xs_unwatch(watch->node, token);
- list_del(&watch->list);
-
if (err)
printk(KERN_WARNING
"XENBUS Failed to release watch %s: %i\n",
watch->node, err);
+
+ /* Make sure watch is not in use. */
+ flush_scheduled_work();
}
EXPORT_SYMBOL(unregister_xenbus_watch);
-/* Re-register callbacks to all watches. */
-void reregister_xenbus_watches(void)
-{
+void xs_suspend(void)
+{
+ down(&xs_state.transaction_mutex);
+ down(&xs_state.request_mutex);
+}
+
+void xs_resume(void)
+{
+ struct list_head *ent, *prev_ent = &watches;
struct xenbus_watch *watch;
char token[sizeof(watch) * 2 + 1];
- list_for_each_entry(watch, &watches, list) {
- sprintf(token, "%lX", (long)watch);
- xs_watch(watch->node, token);
- }
-}
-
-static int watch_thread(void *unused)
-{
+ /* Protect against concurrent unregistration and freeing of watches. */
+ BUG_ON(xs_resuming);
+ xs_resuming = 1;
+
+ up(&xs_state.request_mutex);
+ up(&xs_state.transaction_mutex);
+
+ /*
+ * Iterate over the watch list re-registering each node. We must
+ * be careful about concurrent registrations and unregistrations.
+ * We search for the node immediately following the previously
+ * re-registered node. If we get no match then either we are done
+ * (previous node is last in list) or the node was unregistered, in
+ * which case we restart from the beginning of the list.
+ * register_xenbus_watch() + unregister_xenbus_watch() is safe because
+ * it will only ever move a watch node earlier in the list, so it
+ * cannot cause us to skip nodes.
+ */
for (;;) {
- char **vec = NULL;
- unsigned int num;
-
- wait_event(xb_waitq, xs_input_avail());
-
- /* If this is a spurious wakeup caused by someone
- * doing an op, they'll hold the lock and the buffer
- * will be empty by the time we get there.
- */
- down(&xenbus_lock);
- if (xs_input_avail())
- vec = xs_read_watch(&num);
-
- if (vec && !IS_ERR(vec)) {
- struct xenbus_watch *w;
- int err;
-
- err = xs_acknowledge_watch(vec[XS_WATCH_TOKEN]);
- if (err)
- printk(KERN_WARNING "XENBUS ack %s fail %i\n",
- vec[XS_WATCH_TOKEN], err);
- w = find_watch(vec[XS_WATCH_TOKEN]);
- BUG_ON(!w);
- w->callback(w, (const char **)vec, num);
- kfree(vec);
- } else if (vec)
- printk(KERN_WARNING "XENBUS xs_read_watch: %li\n",
- PTR_ERR(vec));
- up(&xenbus_lock);
+ spin_lock(&watches_lock);
+ list_for_each(ent, &watches)
+ if (ent->prev == prev_ent)
+ break;
+ spin_unlock(&watches_lock);
+
+ /* No match because prev_ent is at the end of the list? */
+ if ((ent == &watches) && (watches.prev == prev_ent))
+ break; /* We're done! */
+
+ if ((prev_ent = ent) != &watches) {
+ /*
+ * Safe even with watch_lock not held. We are saved by
+ * (xs_resumed==1) check in unregister_xenbus_watch.
+ */
+ watch = list_entry(ent, struct xenbus_watch, list);
+ sprintf(token, "%lX", (long)watch);
+ xs_watch(watch->node, token);
+ }
+ }
+
+ xs_resuming = 0;
+ wake_up(&xs_resuming_waitq);
+}
+
+static void xenbus_fire_watch(void *arg)
+{
+ struct xs_stored_msg *msg = arg;
+
+ msg->u.watch.handle->callback(msg->u.watch.handle,
+ (const char **)msg->u.watch.vec,
+ msg->u.watch.vec_size);
+
+ kfree(msg->u.watch.vec);
+ kfree(msg);
+}
+
+static int process_msg(void)
+{
+ struct xs_stored_msg *msg;
+ char *body;
+ int err;
+
+ msg = kmalloc(sizeof(*msg), GFP_KERNEL);
+ if (msg == NULL)
+ return -ENOMEM;
+
+ err = xb_read(&msg->hdr, sizeof(msg->hdr));
+ if (err) {
+ kfree(msg);
+ return err;
+ }
+
+ body = kmalloc(msg->hdr.len + 1, GFP_KERNEL);
+ if (body == NULL) {
+ kfree(msg);
+ return -ENOMEM;
+ }
+
+ err = xb_read(body, msg->hdr.len);
+ if (err) {
+ kfree(body);
+ kfree(msg);
+ return err;
+ }
+ body[msg->hdr.len] = '\0';
+
+ if (msg->hdr.type == XS_WATCH_EVENT) {
+ INIT_WORK(&msg->u.watch.work, xenbus_fire_watch, msg);
+
+ msg->u.watch.vec = split(body, msg->hdr.len,
+ &msg->u.watch.vec_size);
+ if (IS_ERR(msg->u.watch.vec)) {
+ kfree(msg);
+ return PTR_ERR(msg->u.watch.vec);
+ }
+
+ spin_lock(&watches_lock);
+ msg->u.watch.handle = find_watch(
+ msg->u.watch.vec[XS_WATCH_TOKEN]);
+ if (msg->u.watch.handle != NULL) {
+ schedule_work(&msg->u.watch.work);
+ } else {
+ kfree(msg->u.watch.vec);
+ kfree(msg);
+ }
+ spin_unlock(&watches_lock);
+ } else {
+ msg->u.reply.body = body;
+ spin_lock(&xs_state.reply_lock);
+ list_add_tail(&msg->u.reply.list, &xs_state.reply_list);
+ spin_unlock(&xs_state.reply_lock);
+ wake_up(&xs_state.reply_waitq);
+ }
+
+ return 0;
+}
+
+static int read_thread(void *unused)
+{
+ int err;
+
+ for (;;) {
+ err = process_msg();
+ if (err)
+ printk(KERN_WARNING "XENBUS error %d while reading "
+ "message\n", err);
}
}
int xs_init(void)
{
int err;
- struct task_struct *watcher;
+ struct task_struct *reader;
+
+ INIT_LIST_HEAD(&xs_state.reply_list);
+ spin_lock_init(&xs_state.reply_lock);
+ init_waitqueue_head(&xs_state.reply_waitq);
+
+ init_MUTEX(&xs_state.request_mutex);
+ init_MUTEX(&xs_state.transaction_mutex);
+ xs_state.transaction_pid = -1;
err = xb_init_comms();
if (err)
return err;
- watcher = kthread_run(watch_thread, NULL, "kxbwatch");
- if (IS_ERR(watcher))
- return PTR_ERR(watcher);
+ reader = kthread_run(read_thread, NULL, "xenbusd");
+ if (IS_ERR(reader))
+ return PTR_ERR(reader);
+
return 0;
}
diff -r ab93a9a46bd4 -r 8016551fde98
linux-2.6-xen-sparse/include/asm-xen/xenbus.h
--- a/linux-2.6-xen-sparse/include/asm-xen/xenbus.h Sun Oct 9 16:29:24 2005
+++ b/linux-2.6-xen-sparse/include/asm-xen/xenbus.h Sun Oct 9 17:52:54 2005
@@ -78,10 +78,6 @@
int xenbus_register_backend(struct xenbus_driver *drv);
void xenbus_unregister_driver(struct xenbus_driver *drv);
-/* Caller must hold this lock to call these functions: it's also held
- * across watch callbacks. */
-extern struct semaphore xenbus_lock;
-
char **xenbus_directory(const char *dir, const char *node, unsigned int *num);
void *xenbus_read(const char *dir, const char *node, unsigned int *len);
int xenbus_write(const char *dir, const char *node, const char *string);
@@ -113,7 +109,11 @@
struct xenbus_watch
{
struct list_head list;
+
+ /* Path being watched. */
char *node;
+
+ /* Callback (executed in a process context with no locks held). */
void (*callback)(struct xenbus_watch *,
const char **vec, unsigned int len);
};
@@ -124,7 +124,11 @@
int register_xenbus_watch(struct xenbus_watch *watch);
void unregister_xenbus_watch(struct xenbus_watch *watch);
-void reregister_xenbus_watches(void);
+void xs_suspend(void);
+void xs_resume(void);
+
+/* Used by xenbus_dev to borrow kernel's store connection. */
+void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg);
/* Called from xen core code. */
void xenbus_suspend(void);
diff -r ab93a9a46bd4 -r 8016551fde98 tools/blktap/xenbus.c
--- a/tools/blktap/xenbus.c Sun Oct 9 16:29:24 2005
+++ b/tools/blktap/xenbus.c Sun Oct 9 17:52:54 2005
@@ -260,10 +260,6 @@
node = res[XS_WATCH_PATH];
token = res[XS_WATCH_TOKEN];
- er = xs_acknowledge_watch(h, token);
- if (er == 0)
- warn("Couldn't acknowledge watch (%s)", token);
-
w = find_watch(token);
if (!w)
{
diff -r ab93a9a46bd4 -r 8016551fde98 tools/console/daemon/io.c
--- a/tools/console/daemon/io.c Sun Oct 9 16:29:24 2005
+++ b/tools/console/daemon/io.c Sun Oct 9 17:52:54 2005
@@ -505,7 +505,6 @@
domain_create_ring(dom);
}
- xs_acknowledge_watch(xs, vec[1]);
free(vec);
}
diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/lowlevel/xs/xs.c
--- a/tools/python/xen/lowlevel/xs/xs.c Sun Oct 9 16:29:24 2005
+++ b/tools/python/xen/lowlevel/xs/xs.c Sun Oct 9 17:52:54 2005
@@ -442,9 +442,6 @@
#define xspy_read_watch_doc "\n" \
"Read a watch notification.\n" \
- "The notification must be acknowledged by passing\n" \
- "the token to acknowledge_watch().\n" \
- " path [string]: xenstore path.\n" \
"\n" \
"Returns: [tuple] (path, token).\n" \
"Raises RuntimeError on error.\n" \
@@ -492,44 +489,6 @@
exit:
if (xsval)
free(xsval);
- return val;
-}
-
-#define xspy_acknowledge_watch_doc "\n"
\
- "Acknowledge a watch notification that has been read.\n" \
- " token [string] : from the watch notification\n" \
- "\n" \
- "Returns None on success.\n" \
- "Raises RuntimeError on error.\n" \
- "\n"
-
-static PyObject *xspy_acknowledge_watch(PyObject *self, PyObject *args,
- PyObject *kwds)
-{
- static char *kwd_spec[] = { "token", NULL };
- static char *arg_spec = "O";
- PyObject *token;
- char token_str[MAX_STRLEN(unsigned long) + 1];
-
- struct xs_handle *xh = xshandle(self);
- PyObject *val = NULL;
- int xsval = 0;
-
- if (!xh)
- goto exit;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec, &token))
- goto exit;
- sprintf(token_str, "%li", (unsigned long)token);
- Py_BEGIN_ALLOW_THREADS
- xsval = xs_acknowledge_watch(xh, token_str);
- Py_END_ALLOW_THREADS
- if (!xsval) {
- PyErr_SetFromErrno(PyExc_RuntimeError);
- goto exit;
- }
- Py_INCREF(Py_None);
- val = Py_None;
- exit:
return val;
}
@@ -833,7 +792,6 @@
XSPY_METH(set_permissions),
XSPY_METH(watch),
XSPY_METH(read_watch),
- XSPY_METH(acknowledge_watch),
XSPY_METH(unwatch),
XSPY_METH(transaction_start),
XSPY_METH(transaction_end),
diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/xend/xenstore/xswatch.py
--- a/tools/python/xen/xend/xenstore/xswatch.py Sun Oct 9 16:29:24 2005
+++ b/tools/python/xen/xend/xenstore/xswatch.py Sun Oct 9 17:52:54 2005
@@ -8,6 +8,7 @@
import select
import threading
from xen.lowlevel import xs
+from xen.xend.xenstore.xsutil import xshandle
class xswatch:
@@ -27,10 +28,7 @@
if cls.watchThread:
cls.xslock.release()
return
- # XXX: When we fix xenstored to have better watch semantics,
- # this can change to shared xshandle(). Currently that would result
- # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
- cls.xs = xs.open()
+ cls.xs = xshandle()
cls.watchThread = threading.Thread(name="Watcher",
target=cls.watchMain)
cls.watchThread.setDaemon(True)
@@ -43,11 +41,10 @@
while True:
try:
we = cls.xs.read_watch()
- watch = we[1]
- cls.xs.acknowledge_watch(watch)
except RuntimeError, ex:
print ex
raise
+ watch = we[1]
watch.fn(*watch.args, **watch.kwargs)
watchMain = classmethod(watchMain)
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/07watch.test
--- a/tools/xenstore/testsuite/07watch.test Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/testsuite/07watch.test Sun Oct 9 17:52:54 2005
@@ -5,7 +5,6 @@
2 write /test contents2
expect 1:/test:token
1 waitwatch
-1 ackwatch token
1 close
# Check that reads don't set it off.
@@ -22,15 +21,12 @@
2 mkdir /dir/newdir
expect 1:/dir/newdir:token
1 waitwatch
-1 ackwatch token
2 setperm /dir/newdir 0 READ
expect 1:/dir/newdir:token
1 waitwatch
-1 ackwatch token
2 rm /dir/newdir
expect 1:/dir/newdir:token
1 waitwatch
-1 ackwatch token
1 close
2 close
@@ -49,7 +45,6 @@
read /dir/test
expect /dir/test:token
waitwatch
-ackwatch token
close
# watch priority test: all simultaneous
@@ -59,13 +54,10 @@
write /dir/test contents
expect 3:/dir/test:token3
3 waitwatch
-3 ackwatch token3
expect 2:/dir/test:token2
2 waitwatch
-2 ackwatch token2
expect 1:/dir/test:token1
1 waitwatch
-1 ackwatch token1
1 close
2 close
3 close
@@ -79,7 +71,6 @@
2 close
expect 1:/dir/test:token1
1 waitwatch
-1 ackwatch token1
1 close
# If one dies (without reading at all), the other should still get ack.
@@ -89,7 +80,6 @@
2 close
expect 1:/dir/test:token1
1 waitwatch
-1 ackwatch token1
1 close
2 close
@@ -111,7 +101,6 @@
2 unwatch /dir token2
expect 1:/dir/test:token1
1 waitwatch
-1 ackwatch token1
1 close
2 close
@@ -123,14 +112,12 @@
write /dir/test contents2
expect 1:/dir/test:token2
1 waitwatch
-1 ackwatch token2
# check we only get notified once.
1 watch /test token
2 write /test contents2
expect 1:/test:token
1 waitwatch
-1 ackwatch token
expect 1: waitwatch failed: Connection timed out
1 waitwatch
1 close
@@ -142,13 +129,10 @@
2 write /test3 contents
expect 1:/test1:token
1 waitwatch
-1 ackwatch token
expect 1:/test2:token
1 waitwatch
-1 ackwatch token
expect 1:/test3:token
1 waitwatch
-1 ackwatch token
1 close
# Creation of subpaths should be covered correctly.
@@ -157,10 +141,8 @@
2 write /test/subnode/subnode contents2
expect 1:/test/subnode:token
1 waitwatch
-1 ackwatch token
expect 1:/test/subnode/subnode:token
1 waitwatch
-1 ackwatch token
expect 1: waitwatch failed: Connection timed out
1 waitwatch
1 close
@@ -171,7 +153,6 @@
1 watchnoack / token2 0
expect 1:/test/subnode:token
1 waitwatch
-1 ackwatch token
expect 1:/:token2
1 waitwatch
expect 1: waitwatch failed: Connection timed out
@@ -183,7 +164,6 @@
2 rm /test
expect 1:/test/subnode:token
1 waitwatch
-1 ackwatch token
# Watch should not double-send after we ack, even if we did something in
between.
1 watch /test2 token
@@ -192,6 +172,5 @@
1 waitwatch
expect 1:contents2
1 read /test2/foo
-1 ackwatch token
expect 1: waitwatch failed: Connection timed out
1 waitwatch
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/08transaction.test
--- a/tools/xenstore/testsuite/08transaction.test Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/testsuite/08transaction.test Sun Oct 9 17:52:54 2005
@@ -68,7 +68,6 @@
2 commit
expect 1:/test/dir/sub:token
1 waitwatch
-1 ackwatch token
1 close
# Rm inside transaction works like rm outside: children get notified.
@@ -78,7 +77,6 @@
2 commit
expect 1:/test/dir/sub:token
1 waitwatch
-1 ackwatch token
1 close
# Multiple events from single transaction don't trigger assert
@@ -89,8 +87,6 @@
2 commit
expect 1:/test/1:token
1 waitwatch
-1 ackwatch token
expect 1:/test/2:token
1 waitwatch
-1 ackwatch token
1 close
diff -r ab93a9a46bd4 -r 8016551fde98
tools/xenstore/testsuite/10domain-homedir.test
--- a/tools/xenstore/testsuite/10domain-homedir.test Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/testsuite/10domain-homedir.test Sun Oct 9 17:52:54 2005
@@ -16,4 +16,3 @@
write /home/foo/bar contents
expect 1:foo/bar:token
1 waitwatch
-1 ackwatch token
diff -r ab93a9a46bd4 -r 8016551fde98
tools/xenstore/testsuite/11domain-watch.test
--- a/tools/xenstore/testsuite/11domain-watch.test Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/testsuite/11domain-watch.test Sun Oct 9 17:52:54 2005
@@ -10,7 +10,6 @@
write /test contents2
expect 1:/test:token
1 waitwatch
-1 ackwatch token
1 unwatch /test token
release 1
1 close
@@ -25,7 +24,6 @@
1 write /dir/test4 contents4
expect 1:/dir/test:token
1 waitwatch
-1 ackwatch token
release 1
1 close
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/12readonly.test
--- a/tools/xenstore/testsuite/12readonly.test Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/testsuite/12readonly.test Sun Oct 9 17:52:54 2005
@@ -36,4 +36,3 @@
1 write /test contents
expect /test:token
waitwatch
-ackwatch token
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/13watch-ack.test
--- a/tools/xenstore/testsuite/13watch-ack.test Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/testsuite/13watch-ack.test Sun Oct 9 17:52:54 2005
@@ -18,5 +18,4 @@
1 waitwatch
3 write /test/1 contents1
4 write /test/3 contents3
-1 ackwatch token2
1 close
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/xenstored_core.c Sun Oct 9 17:52:54 2005
@@ -154,7 +154,6 @@
case XS_READ: return "READ";
case XS_GET_PERMS: return "GET_PERMS";
case XS_WATCH: return "WATCH";
- case XS_WATCH_ACK: return "WATCH_ACK";
case XS_UNWATCH: return "UNWATCH";
case XS_TRANSACTION_START: return "TRANSACTION_START";
case XS_TRANSACTION_END: return "TRANSACTION_END";
@@ -1103,10 +1102,6 @@
do_watch(conn, in);
break;
- case XS_WATCH_ACK:
- do_watch_ack(conn, onearg(in));
- break;
-
case XS_UNWATCH:
do_unwatch(conn, in);
break;
@@ -1167,11 +1162,6 @@
if (verbose)
xprintf("Got message %s len %i from %p\n",
sockmsg_string(type), conn->in->hdr.msg.len, conn);
-
- /* We might get a command while waiting for an ack: this means
- * the other end discarded it: we will re-transmit. */
- if (type != XS_WATCH_ACK)
- conn->waiting_for_ack = NULL;
/* Careful: process_message may free connection. We detach
* "in" beforehand and allocate the new buffer to avoid
@@ -1266,7 +1256,6 @@
new->state = OK;
new->out = new->waiting_reply = NULL;
- new->waiting_for_ack = NULL;
new->fd = -1;
new->id = 0;
new->domain = NULL;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.h
--- a/tools/xenstore/xenstored_core.h Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/xenstored_core.h Sun Oct 9 17:52:54 2005
@@ -70,9 +70,6 @@
/* Is this a read-only connection? */
bool can_write;
-
- /* Are we waiting for a watch event ack? */
- struct watch *waiting_for_ack;
/* Buffered incoming data. */
struct buffered_data *in;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_watch.c
--- a/tools/xenstore/xenstored_watch.c Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/xenstored_watch.c Sun Oct 9 17:52:54 2005
@@ -69,18 +69,14 @@
if (conn->waiting_reply) {
conn->out = conn->waiting_reply;
conn->waiting_reply = NULL;
- conn->waiting_for_ack = NULL;
- return;
- }
-
- /* If we're already waiting for ack, don't queue more. */
- if (conn->waiting_for_ack)
- return;
+ return;
+ }
list_for_each_entry(watch, &conn->watches, list) {
event = list_top(&watch->events, struct watch_event, list);
if (event) {
- conn->waiting_for_ack = watch;
+ list_del(&event->list);
+ talloc_free(event);
send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
break;
}
@@ -181,6 +177,15 @@
}
}
+ /* Check for duplicates. */
+ list_for_each_entry(watch, &conn->watches, list) {
+ if (streq(watch->node, vec[0]) &&
+ streq(watch->token, vec[1])) {
+ send_error(conn, EEXIST);
+ return;
+ }
+ }
+
watch = talloc(conn, struct watch);
watch->node = talloc_strdup(watch, vec[0]);
watch->token = talloc_strdup(watch, vec[1]);
@@ -200,37 +205,6 @@
add_event(conn, watch, watch->node);
}
-void do_watch_ack(struct connection *conn, const char *token)
-{
- struct watch_event *event;
-
- if (!token) {
- send_error(conn, EINVAL);
- return;
- }
-
- if (!conn->waiting_for_ack) {
- send_error(conn, ENOENT);
- return;
- }
-
- if (!streq(conn->waiting_for_ack->token, token)) {
- /* They're confused: this will cause us to send event again */
- conn->waiting_for_ack = NULL;
- send_error(conn, EINVAL);
- return;
- }
-
- /* Remove event: after ack sent, core will call queue_next_event */
- event = list_top(&conn->waiting_for_ack->events, struct watch_event,
- list);
- list_del(&event->list);
- talloc_free(event);
-
- conn->waiting_for_ack = NULL;
- send_ack(conn, XS_WATCH_ACK);
-}
-
void do_unwatch(struct connection *conn, struct buffered_data *in)
{
struct watch *watch;
@@ -241,9 +215,6 @@
return;
}
- /* We don't need to worry if we're waiting for an ack for the
- * watch we're deleting: conn->waiting_for_ack was reset by
- * this command in consider_message anyway. */
node = canonicalize(conn, vec[0]);
list_for_each_entry(watch, &conn->watches, list) {
if (streq(watch->node, node) && streq(watch->token, vec[1])) {
@@ -262,11 +233,6 @@
struct watch *watch;
struct watch_event *event;
- if (conn->waiting_for_ack)
- printf(" waiting_for_ack for watch on %s token %s\n",
- conn->waiting_for_ack->node,
- conn->waiting_for_ack->token);
-
list_for_each_entry(watch, &conn->watches, list) {
printf(" watch on %s token %s\n",
watch->node, watch->token);
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.c
--- a/tools/xenstore/xs.c Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/xs.c Sun Oct 9 17:52:54 2005
@@ -78,9 +78,29 @@
/* One transaction at a time. */
pthread_mutex_t transaction_mutex;
+ pthread_t transaction_pthread;
};
static void *read_thread(void *arg);
+
+static void request_mutex_acquire(struct xs_handle *h)
+{
+ /*
+ * We can't distinguish non-transactional from transactional
+ * requests right now. So temporarily acquire the transaction mutex
+ * if this task is outside transaction context.
+ */
+ if (h->transaction_pthread != pthread_self())
+ pthread_mutex_lock(&h->transaction_mutex);
+ pthread_mutex_lock(&h->request_mutex);
+}
+
+static void request_mutex_release(struct xs_handle *h)
+{
+ pthread_mutex_unlock(&h->request_mutex);
+ if (h->transaction_pthread != pthread_self())
+ pthread_mutex_unlock(&h->transaction_mutex);
+}
int xs_fileno(struct xs_handle *h)
{
@@ -163,6 +183,7 @@
pthread_mutex_init(&h->request_mutex, NULL);
pthread_mutex_init(&h->transaction_mutex, NULL);
+ h->transaction_pthread = -1;
if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
goto error;
@@ -316,7 +337,7 @@
ignorepipe.sa_flags = 0;
sigaction(SIGPIPE, &ignorepipe, &oldact);
- pthread_mutex_lock(&h->request_mutex);
+ request_mutex_acquire(h);
if (!xs_write_all(h->fd, &msg, sizeof(msg)))
goto fail;
@@ -329,7 +350,7 @@
if (!ret)
goto fail;
- pthread_mutex_unlock(&h->request_mutex);
+ request_mutex_release(h);
sigaction(SIGPIPE, &oldact, NULL);
if (msg.type == XS_ERROR) {
@@ -350,7 +371,7 @@
fail:
/* We're in a bad state, so close fd. */
saved_errno = errno;
- pthread_mutex_unlock(&h->request_mutex);
+ request_mutex_release(h);
sigaction(SIGPIPE, &oldact, NULL);
close_fd:
close(h->fd);
@@ -593,15 +614,6 @@
return ret;
}
-/* Acknowledge watch on node. Watches must be acknowledged before
- * any other watches can be read.
- * Returns false on failure.
- */
-bool xs_acknowledge_watch(struct xs_handle *h, const char *token)
-{
- return xs_bool(xs_single(h, XS_WATCH_ACK, token, NULL));
-}
-
/* Remove a watch on a node.
* Returns false on failure (no watch on that node).
*/
@@ -624,8 +636,18 @@
*/
bool xs_transaction_start(struct xs_handle *h)
{
+ bool rc;
+
pthread_mutex_lock(&h->transaction_mutex);
- return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
+ h->transaction_pthread = pthread_self();
+
+ rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
+ if (!rc) {
+ h->transaction_pthread = -1;
+ pthread_mutex_unlock(&h->transaction_mutex);
+ }
+
+ return rc;
}
/* End a transaction.
@@ -645,6 +667,7 @@
rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+ h->transaction_pthread = -1;
pthread_mutex_unlock(&h->transaction_mutex);
return rc;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.h
--- a/tools/xenstore/xs.h Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/xs.h Sun Oct 9 17:52:54 2005
@@ -96,12 +96,6 @@
*/
char **xs_read_watch(struct xs_handle *h, unsigned int *num);
-/* Acknowledge watch on node. Watches must be acknowledged before
- * any other watches can be read.
- * Returns false on failure.
- */
-bool xs_acknowledge_watch(struct xs_handle *h, const char *token);
-
/* Remove a watch on a node: implicitly acks any outstanding watch.
* Returns false on failure (no watch on that node).
*/
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs_test.c
--- a/tools/xenstore/xs_test.c Sun Oct 9 16:29:24 2005
+++ b/tools/xenstore/xs_test.c Sun Oct 9 17:52:54 2005
@@ -201,7 +201,6 @@
" watch <path> <token>\n"
" watchnoack <path> <token>\n"
" waitwatch\n"
- " ackwatch <token>\n"
" unwatch <path> <token>\n"
" close\n"
" start <node>\n"
@@ -455,8 +454,6 @@
!streq(vec[XS_WATCH_PATH], node) ||
!streq(vec[XS_WATCH_TOKEN], token))
failed(handle);
- if (!xs_acknowledge_watch(handles[handle], token))
- failed(handle);
}
}
@@ -513,12 +510,6 @@
else
output("%s:%s\n", vec[XS_WATCH_PATH], vec[XS_WATCH_TOKEN]);
free(vec);
-}
-
-static void do_ackwatch(unsigned int handle, const char *token)
-{
- if (!xs_acknowledge_watch(handles[handle], token))
- failed(handle);
}
static void do_unwatch(unsigned int handle, const char *node, const char
*token)
@@ -746,8 +737,6 @@
do_watch(handle, arg(line, 1), arg(line, 2), false);
else if (streq(command, "waitwatch"))
do_waitwatch(handle);
- else if (streq(command, "ackwatch"))
- do_ackwatch(handle, arg(line, 1));
else if (streq(command, "unwatch"))
do_unwatch(handle, arg(line, 1), arg(line, 2));
else if (streq(command, "close")) {
diff -r ab93a9a46bd4 -r 8016551fde98 xen/include/public/io/xs_wire.h
--- a/xen/include/public/io/xs_wire.h Sun Oct 9 16:29:24 2005
+++ b/xen/include/public/io/xs_wire.h Sun Oct 9 17:52:54 2005
@@ -35,11 +35,9 @@
XS_READ,
XS_GET_PERMS,
XS_WATCH,
- XS_WATCH_ACK,
XS_UNWATCH,
XS_TRANSACTION_START,
XS_TRANSACTION_END,
- XS_OP_READ_ONLY = XS_TRANSACTION_END,
XS_INTRODUCE,
XS_RELEASE,
XS_GET_DOMAIN_PATH,
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog
|