WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-changelog

[Xen-changelog] xenstored now supports multiple concurrent transactions

To: xen-changelog@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-changelog] xenstored now supports multiple concurrent transactions per
From: Xen patchbot -unstable <patchbot-unstable@xxxxxxxxxxxxxxxxxxx>
Date: Mon, 10 Oct 2005 14:46:11 +0000
Delivery-date: Mon, 10 Oct 2005 14:43:41 +0000
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
List-help: <mailto:xen-changelog-request@lists.xensource.com?subject=help>
List-id: BK change log <xen-changelog.lists.xensource.com>
List-post: <mailto:xen-changelog@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=unsubscribe>
Reply-to: xen-devel@xxxxxxxxxxxxxxxxxxx
Sender: xen-changelog-bounces@xxxxxxxxxxxxxxxxxxx
# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 015f8ae8127649f0c69904fd063ca45d304d4e0c
# Parent  1ac39c7a043541cfa94655f0e9ab98d4503c29a2
xenstored now supports multiple concurrent transactions per
connection, plus interleaving of transactional and
non-transactional accesses. A transaction identifier is added
to the xsd_sockmsg header structure (0 means 'not in context
of a transaction'). The user and kernel xs interfaces accept
a pointer to a transaction handle where appropriate --
currently this is directly cast to an integer identifier in
the client library / kernel driver, but will allow for keeping
extra dynamic client-side state in future if we need to.

The transaction mutex has now gone. It's replaced with a
read-write mutex, but this is only acquired for exclusive
access during suspend/resume, to ensure there are no in-progress
transactions.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r 1ac39c7a0435 -r 015f8ae81276 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Mon Oct 10 
13:46:53 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Mon Oct 10 
14:38:01 2005
@@ -45,8 +45,14 @@
 #include <asm-xen/xen_proc.h>
 #include <asm/hypervisor.h>
 
+struct xenbus_dev_transaction {
+       struct list_head list;
+       struct xenbus_transaction *handle;
+};
+
 struct xenbus_dev_data {
-       int in_transaction;
+       /* In-progress transaction. */
+       struct list_head transactions;
 
        /* Partial request. */
        unsigned int len;
@@ -103,6 +109,7 @@
                                size_t len, loff_t *ppos)
 {
        struct xenbus_dev_data *u = filp->private_data;
+       struct xenbus_dev_transaction *trans;
        void *reply;
        int err = 0;
 
@@ -129,13 +136,24 @@
        case XS_RM:
        case XS_SET_PERMS:
                reply = xenbus_dev_request_and_reply(&u->u.msg);
-               if (IS_ERR(reply))
+               if (IS_ERR(reply)) {
                        err = PTR_ERR(reply);
-               else {
-                       if (u->u.msg.type == XS_TRANSACTION_START)
-                               u->in_transaction = 1;
-                       if (u->u.msg.type == XS_TRANSACTION_END)
-                               u->in_transaction = 0;
+               } else {
+                       if (u->u.msg.type == XS_TRANSACTION_START) {
+                               trans = kmalloc(sizeof(*trans), GFP_KERNEL);
+                               trans->handle = (struct xenbus_transaction *)
+                                       simple_strtoul(reply, NULL, 0);
+                               list_add(&trans->list, &u->transactions);
+                       } else if (u->u.msg.type == XS_TRANSACTION_END) {
+                               list_for_each_entry(trans, &u->transactions,
+                                                   list)
+                                       if ((unsigned long)trans->handle ==
+                                           (unsigned long)u->u.msg.tx_id)
+                                               break;
+                               BUG_ON(&trans->list == &u->transactions);
+                               list_del(&trans->list);
+                               kfree(trans);
+                       }
                        queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
                        queue_reply(u, (char *)reply, u->u.msg.len);
                        kfree(reply);
@@ -169,6 +187,7 @@
                return -ENOMEM;
 
        memset(u, 0, sizeof(*u));
+       INIT_LIST_HEAD(&u->transactions);
        init_waitqueue_head(&u->read_waitq);
 
        filp->private_data = u;
@@ -179,9 +198,13 @@
 static int xenbus_dev_release(struct inode *inode, struct file *filp)
 {
        struct xenbus_dev_data *u = filp->private_data;
-
-       if (u->in_transaction)
-               xenbus_transaction_end((struct xenbus_transaction *)1, 1);
+       struct xenbus_dev_transaction *trans, *tmp;
+
+       list_for_each_entry_safe(trans, tmp, &u->transactions, list) {
+               xenbus_transaction_end(trans->handle, 1);
+               list_del(&trans->list);
+               kfree(trans);
+       }
 
        kfree(u);
 
diff -r 1ac39c7a0435 -r 015f8ae81276 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Mon Oct 10 
13:46:53 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Mon Oct 10 
14:38:01 2005
@@ -71,38 +71,14 @@
        /* One request at a time. */
        struct semaphore request_mutex;
 
-       /* One transaction at a time. */
-       struct semaphore transaction_mutex;
-       int transaction_pid;
+       /* Protect transactions against save/restore. */
+       struct rw_semaphore suspend_mutex;
 };
 
 static struct xs_handle xs_state;
 
 static LIST_HEAD(watches);
 static DEFINE_SPINLOCK(watches_lock);
-
-/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
-static int xs_resuming;
-static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
-
-static void request_mutex_acquire(void)
-{
-       /*
-        * We can't distinguish non-transactional from transactional
-        * requests right now. So temporarily acquire the transaction mutex
-        * if this task is outside transaction context.
-        */
-       if (xs_state.transaction_pid != current->pid)
-               down(&xs_state.transaction_mutex);
-       down(&xs_state.request_mutex);
-}
-
-static void request_mutex_release(void)
-{
-       up(&xs_state.request_mutex);
-       if (xs_state.transaction_pid != current->pid)
-               up(&xs_state.transaction_mutex);
-}
 
 static int get_error(const char *errorstring)
 {
@@ -152,17 +128,17 @@
 /* Emergency write. */
 void xenbus_debug_write(const char *str, unsigned int count)
 {
-       struct xsd_sockmsg msg;
+       struct xsd_sockmsg msg = { 0 };
 
        msg.type = XS_DEBUG;
        msg.len = sizeof("print") + count + 1;
 
-       request_mutex_acquire();
+       down(&xs_state.request_mutex);
        xb_write(&msg, sizeof(msg));
        xb_write("print", sizeof("print"));
        xb_write(str, count);
        xb_write("", 1);
-       request_mutex_release();
+       up(&xs_state.request_mutex);
 }
 
 void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
@@ -171,12 +147,10 @@
        struct xsd_sockmsg req_msg = *msg;
        int err;
 
-       if (req_msg.type == XS_TRANSACTION_START) {
-               down(&xs_state.transaction_mutex);
-               xs_state.transaction_pid = current->pid;
-       }
-
-       request_mutex_acquire();
+       if (req_msg.type == XS_TRANSACTION_START)
+               down_read(&xs_state.suspend_mutex);
+
+       down(&xs_state.request_mutex);
 
        err = xb_write(msg, sizeof(*msg) + msg->len);
        if (err) {
@@ -186,20 +160,19 @@
                ret = read_reply(&msg->type, &msg->len);
        }
 
-       request_mutex_release();
+       up(&xs_state.request_mutex);
 
        if ((msg->type == XS_TRANSACTION_END) ||
            ((req_msg.type == XS_TRANSACTION_START) &&
-            (msg->type == XS_ERROR))) {
-               xs_state.transaction_pid = -1;
-               up(&xs_state.transaction_mutex);
-       }
+            (msg->type == XS_ERROR)))
+               up_read(&xs_state.suspend_mutex);
 
        return ret;
 }
 
 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
-static void *xs_talkv(enum xsd_sockmsg_type type,
+static void *xs_talkv(struct xenbus_transaction *t,
+                     enum xsd_sockmsg_type type,
                      const struct kvec *iovec,
                      unsigned int num_vecs,
                      unsigned int *len)
@@ -209,12 +182,13 @@
        unsigned int i;
        int err;
 
+       msg.tx_id = (u32)(unsigned long)t;
        msg.type = type;
        msg.len = 0;
        for (i = 0; i < num_vecs; i++)
                msg.len += iovec[i].iov_len;
 
-       request_mutex_acquire();
+       down(&xs_state.request_mutex);
 
        err = xb_write(&msg, sizeof(msg));
        if (err) {
@@ -225,14 +199,14 @@
        for (i = 0; i < num_vecs; i++) {
                err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
                if (err) {
-                       request_mutex_release();
+                       up(&xs_state.request_mutex);
                        return ERR_PTR(err);
                }
        }
 
        ret = read_reply(&msg.type, len);
 
-       request_mutex_release();
+       up(&xs_state.request_mutex);
 
        if (IS_ERR(ret))
                return ret;
@@ -248,14 +222,16 @@
 }
 
 /* Simplified version of xs_talkv: single message. */
-static void *xs_single(enum xsd_sockmsg_type type,
-                      const char *string, unsigned int *len)
+static void *xs_single(struct xenbus_transaction *t,
+                      enum xsd_sockmsg_type type,
+                      const char *string,
+                      unsigned int *len)
 {
        struct kvec iovec;
 
        iovec.iov_base = (void *)string;
        iovec.iov_len = strlen(string) + 1;
-       return xs_talkv(type, &iovec, 1, len);
+       return xs_talkv(t, type, &iovec, 1, len);
 }
 
 /* Many commands only need an ack, don't care what it says. */
@@ -322,7 +298,7 @@
        char *strings;
        unsigned int len;
 
-       strings = xs_single(XS_DIRECTORY, join(dir, node), &len);
+       strings = xs_single(t, XS_DIRECTORY, join(dir, node), &len);
        if (IS_ERR(strings))
                return (char **)strings;
 
@@ -352,7 +328,7 @@
 void *xenbus_read(struct xenbus_transaction *t,
                  const char *dir, const char *node, unsigned int *len)
 {
-       return xs_single(XS_READ, join(dir, node), len);
+       return xs_single(t, XS_READ, join(dir, node), len);
 }
 EXPORT_SYMBOL(xenbus_read);
 
@@ -372,7 +348,7 @@
        iovec[1].iov_base = (void *)string;
        iovec[1].iov_len = strlen(string);
 
-       return xs_error(xs_talkv(XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
+       return xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
 }
 EXPORT_SYMBOL(xenbus_write);
 
@@ -380,14 +356,14 @@
 int xenbus_mkdir(struct xenbus_transaction *t,
                 const char *dir, const char *node)
 {
-       return xs_error(xs_single(XS_MKDIR, join(dir, node), NULL));
+       return xs_error(xs_single(t, XS_MKDIR, join(dir, node), NULL));
 }
 EXPORT_SYMBOL(xenbus_mkdir);
 
 /* Destroy a file or directory (directories must be empty). */
 int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node)
 {
-       return xs_error(xs_single(XS_RM, join(dir, node), NULL));
+       return xs_error(xs_single(t, XS_RM, join(dir, node), NULL));
 }
 EXPORT_SYMBOL(xenbus_rm);
 
@@ -396,18 +372,21 @@
  */
 struct xenbus_transaction *xenbus_transaction_start(void)
 {
-       int err;
-
-       down(&xs_state.transaction_mutex);
-       xs_state.transaction_pid = current->pid;
-
-       err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
-       if (err) {
-               xs_state.transaction_pid = -1;
-               up(&xs_state.transaction_mutex);
-       }
-
-       return err ? ERR_PTR(err) : (struct xenbus_transaction *)1;
+       char *id_str;
+       unsigned long id;
+
+       down_read(&xs_state.suspend_mutex);
+
+       id_str = xs_single(NULL, XS_TRANSACTION_START, "", NULL);
+       if (IS_ERR(id_str)) {
+               up_read(&xs_state.suspend_mutex);
+               return (struct xenbus_transaction *)id_str;
+       }
+
+       id = simple_strtoul(id_str, NULL, 0);
+       kfree(id_str);
+
+       return (struct xenbus_transaction *)id;
 }
 EXPORT_SYMBOL(xenbus_transaction_start);
 
@@ -419,17 +398,14 @@
        char abortstr[2];
        int err;
 
-       BUG_ON(t == NULL);
-
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
 
-       err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
-
-       xs_state.transaction_pid = -1;
-       up(&xs_state.transaction_mutex);
+       err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
+
+       up_read(&xs_state.suspend_mutex);
 
        return err;
 }
@@ -567,7 +543,8 @@
        iov[1].iov_base = (void *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_error(xs_talkv(NULL, XS_WATCH, iov,
+                                ARRAY_SIZE(iov), NULL));
 }
 
 static int xs_unwatch(const char *path, const char *token)
@@ -579,7 +556,8 @@
        iov[1].iov_base = (char *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_error(xs_talkv(NULL, XS_UNWATCH, iov,
+                                ARRAY_SIZE(iov), NULL));
 }
 
 static struct xenbus_watch *find_watch(const char *token)
@@ -603,6 +581,8 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       down_read(&xs_state.suspend_mutex);
 
        spin_lock(&watches_lock);
        BUG_ON(find_watch(token));
@@ -617,6 +597,8 @@
                spin_unlock(&watches_lock);
        }
 
+       up_read(&xs_state.suspend_mutex);
+
        return err;
 }
 EXPORT_SYMBOL(register_xenbus_watch);
@@ -627,14 +609,13 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       down_read(&xs_state.suspend_mutex);
 
        spin_lock(&watches_lock);
        BUG_ON(!find_watch(token));
        list_del(&watch->list);
        spin_unlock(&watches_lock);
-
-       /* Ensure xs_resume() is not in progress (see comments there). */
-       wait_event(xs_resuming_waitq, !xs_resuming);
 
        err = xs_unwatch(watch->node, token);
        if (err)
@@ -642,6 +623,8 @@
                       "XENBUS Failed to release watch %s: %i\n",
                       watch->node, err);
 
+       up_read(&xs_state.suspend_mutex);
+
        /* Make sure watch is not in use. */
        flush_scheduled_work();
 }
@@ -649,58 +632,24 @@
 
 void xs_suspend(void)
 {
-       down(&xs_state.transaction_mutex);
+       down_write(&xs_state.suspend_mutex);
        down(&xs_state.request_mutex);
 }
 
 void xs_resume(void)
 {
-       struct list_head *ent, *prev_ent = &watches;
        struct xenbus_watch *watch;
        char token[sizeof(watch) * 2 + 1];
 
-       /* Protect against concurrent unregistration and freeing of watches. */
-       BUG_ON(xs_resuming);
-       xs_resuming = 1;
-
        up(&xs_state.request_mutex);
-       up(&xs_state.transaction_mutex);
-
-       /*
-        * Iterate over the watch list re-registering each node. We must
-        * be careful about concurrent registrations and unregistrations.
-        * We search for the node immediately following the previously
-        * re-registered node. If we get no match then either we are done
-        * (previous node is last in list) or the node was unregistered, in
-        * which case we restart from the beginning of the list.
-        * register_xenbus_watch() + unregister_xenbus_watch() is safe because
-        * it will only ever move a watch node earlier in the list, so it
-        * cannot cause us to skip nodes.
-        */
-       for (;;) {
-               spin_lock(&watches_lock);
-               list_for_each(ent, &watches)
-                       if (ent->prev == prev_ent)
-                               break;
-               spin_unlock(&watches_lock);
-
-               /* No match because prev_ent is at the end of the list? */
-               if ((ent == &watches) && (watches.prev == prev_ent))
-                        break; /* We're done! */
-
-               if ((prev_ent = ent) != &watches) {
-                       /*
-                        * Safe even with watch_lock not held. We are saved by
-                        * (xs_resumed==1) check in unregister_xenbus_watch.
-                        */
-                       watch = list_entry(ent, struct xenbus_watch, list);
-                       sprintf(token, "%lX", (long)watch);
-                       xs_watch(watch->node, token);
-               }
-       }
-
-       xs_resuming = 0;
-       wake_up(&xs_resuming_waitq);
+
+       /* No need for watches_lock: the suspend_mutex is sufficient. */
+       list_for_each_entry(watch, &watches, list) {
+               sprintf(token, "%lX", (long)watch);
+               xs_watch(watch->node, token);
+       }
+
+       up_write(&xs_state.suspend_mutex);
 }
 
 static void xenbus_fire_watch(void *arg)
@@ -801,8 +750,7 @@
        init_waitqueue_head(&xs_state.reply_waitq);
 
        init_MUTEX(&xs_state.request_mutex);
-       init_MUTEX(&xs_state.transaction_mutex);
-       xs_state.transaction_pid = -1;
+       init_rwsem(&xs_state.suspend_mutex);
 
        /* Initialize the shared memory rings to talk to xenstored */
        err = xb_init_comms();
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/utils.h
--- a/tools/xenstore/utils.h    Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/utils.h    Mon Oct 10 14:38:01 2005
@@ -55,4 +55,34 @@
 #define dprintf(_fmt, _args...) ((void)0)
 #endif
 
+/*
+ * Mux errno values onto returned pointers.
+ */
+
+static inline void *ERR_PTR(long error)
+{
+       return (void *)error;
+}
+
+static inline long PTR_ERR(const void *ptr)
+{
+       return (long)ptr;
+}
+
+static inline long IS_ERR(const void *ptr)
+{
+       return ((unsigned long)ptr > (unsigned long)-1000L);
+}
+
+
 #endif /* _UTILS_H */
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstore_client.c
--- a/tools/xenstore/xenstore_client.c  Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstore_client.c  Mon Oct 10 14:38:01 2005
@@ -34,14 +34,10 @@
     struct xs_handle *xsh;
     struct xs_transaction_handle *xth;
     bool success;
-    int ret = 0;
+    int ret = 0, socket = 0;
 #if defined(CLIENT_read) || defined(CLIENT_list)
     int prefix = 0;
 #endif
-
-    xsh = xs_domain_open();
-    if (xsh == NULL)
-       err(1, "xs_domain_open");
 
     while (1) {
        int c, index = 0;
@@ -50,10 +46,11 @@
 #if defined(CLIENT_read) || defined(CLIENT_list)
            {"prefix", 0, 0, 'p'},
 #endif
+            {"socket", 0, 0, 's'},
            {0, 0, 0, 0}
        };
 
-       c = getopt_long(argc, argv, "h"
+       c = getopt_long(argc, argv, "hs"
 #if defined(CLIENT_read) || defined(CLIENT_list)
                        "p"
 #endif
@@ -65,6 +62,9 @@
        case 'h':
            usage(argv[0]);
            /* NOTREACHED */
+        case 's':
+            socket = 1;
+            break;
 #if defined(CLIENT_read) || defined(CLIENT_list)
        case 'p':
            prefix = 1;
@@ -83,6 +83,10 @@
        /* NOTREACHED */
     }
 #endif
+
+    xsh = socket ? xs_daemon_open() : xs_domain_open();
+    if (xsh == NULL)
+       err(1, socket ? "xs_daemon_open" : "xs_domain_open");
 
   again:
     xth = xs_transaction_start(xsh);
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c   Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_core.c   Mon Oct 10 14:38:01 2005
@@ -238,46 +238,47 @@
 static bool write_messages(struct connection *conn)
 {
        int ret;
-       struct buffered_data *out, *tmp;
-
-       list_for_each_entry_safe(out, tmp, &conn->out_list, list) {
-               if (out->inhdr) {
-                       if (verbose)
-                               xprintf("Writing msg %s (%s) out to %p\n",
-                                       sockmsg_string(out->hdr.msg.type),
-                                       out->buffer, conn);
-                       ret = conn->write(conn, out->hdr.raw + out->used,
-                                         sizeof(out->hdr) - out->used);
-                       if (ret < 0)
-                               return false;
-
-                       out->used += ret;
-                       if (out->used < sizeof(out->hdr))
-                               return true;
-
-                       out->inhdr = false;
-                       out->used = 0;
-
-                       /* Second write might block if non-zero. */
-                       if (out->hdr.msg.len && !conn->domain)
-                               return true;
-               }
-
-               ret = conn->write(conn, out->buffer + out->used,
-                                 out->hdr.msg.len - out->used);
-
+       struct buffered_data *out;
+
+       out = list_top(&conn->out_list, struct buffered_data, list);
+       if (out == NULL)
+               return true;
+
+       if (out->inhdr) {
+               if (verbose)
+                       xprintf("Writing msg %s (%s) out to %p\n",
+                               sockmsg_string(out->hdr.msg.type),
+                               out->buffer, conn);
+               ret = conn->write(conn, out->hdr.raw + out->used,
+                                 sizeof(out->hdr) - out->used);
                if (ret < 0)
                        return false;
 
                out->used += ret;
-               if (out->used != out->hdr.msg.len)
+               if (out->used < sizeof(out->hdr))
                        return true;
 
-               trace_io(conn, "OUT", out);
-
-               list_del(&out->list);
-               talloc_free(out);
-       }
+               out->inhdr = false;
+               out->used = 0;
+
+               /* Second write might block if non-zero. */
+               if (out->hdr.msg.len && !conn->domain)
+                       return true;
+       }
+
+       ret = conn->write(conn, out->buffer + out->used,
+                         out->hdr.msg.len - out->used);
+       if (ret < 0)
+               return false;
+
+       out->used += ret;
+       if (out->used != out->hdr.msg.len)
+               return true;
+
+       trace_io(conn, "OUT", out);
+
+       list_del(&out->list);
+       talloc_free(out);
 
        return true;
 }
@@ -1042,6 +1043,17 @@
  */
 static void process_message(struct connection *conn, struct buffered_data *in)
 {
+       struct transaction *trans;
+
+       trans = transaction_lookup(conn, in->hdr.msg.tx_id);
+       if (IS_ERR(trans)) {
+               send_error(conn, -PTR_ERR(trans));
+               return;
+       }
+
+       assert(conn->transaction == NULL);
+       conn->transaction = trans;
+
        switch (in->hdr.msg.type) {
        case XS_DIRECTORY:
                send_directory(conn, onearg(in));
@@ -1116,11 +1128,13 @@
                do_get_domain_path(conn, onearg(in));
                break;
 
-       case XS_WATCH_EVENT:
        default:
                eprintf("Client unknown operation %i", in->hdr.msg.type);
                send_error(conn, ENOSYS);
-       }
+               break;
+       }
+
+       conn->transaction = NULL;
 }
 
 static int out_of_mem(void *data)
@@ -1239,15 +1253,14 @@
        if (!new)
                return NULL;
 
+       memset(new, 0, sizeof(*new));
        new->fd = -1;
-       new->id = 0;
-       new->domain = NULL;
-       new->transaction = NULL;
        new->write = write;
        new->read = read;
        new->can_write = true;
        INIT_LIST_HEAD(&new->out_list);
        INIT_LIST_HEAD(&new->watches);
+       INIT_LIST_HEAD(&new->transaction_list);
 
        talloc_set_fail_handler(out_of_mem, &talloc_fail);
        if (setjmp(talloc_fail)) {
@@ -1410,6 +1423,7 @@
 
 
 static struct option options[] = {
+       { "no-domain-init", 0, NULL, 'D' },
        { "pid-file", 1, NULL, 'F' },
        { "no-fork", 0, NULL, 'N' },
        { "output-pid", 0, NULL, 'P' },
@@ -1424,11 +1438,15 @@
        fd_set inset, outset;
        bool dofork = true;
        bool outputpid = false;
+       bool no_domain_init = false;
        const char *pidfile = NULL;
 
-       while ((opt = getopt_long(argc, argv, "F:NPT:V", options,
+       while ((opt = getopt_long(argc, argv, "DF:NPT:V", options,
                                  NULL)) != -1) {
                switch (opt) {
+               case 'D':
+                       no_domain_init = true;
+                       break;
                case 'F':
                        pidfile = optarg;
                        break;
@@ -1501,7 +1519,8 @@
        setup_structure();
 
        /* Listen to hypervisor. */
-       event_fd = domain_init();
+       if (!no_domain_init)
+               event_fd = domain_init();
 
        /* Restore existing connections. */
        restore_existing_connections();
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_core.h
--- a/tools/xenstore/xenstored_core.h   Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_core.h   Mon Oct 10 14:38:01 2005
@@ -71,8 +71,12 @@
        /* Buffered output data */
        struct list_head out_list;
 
-       /* My transaction, if any. */
+       /* Transaction context for current request (NULL if none). */
        struct transaction *transaction;
+
+       /* List of in-progress transactions. */
+       struct list_head transaction_list;
+       u32 next_transaction_id;
 
        /* The domain I'm associated with, if any. */
        struct domain *domain;
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_transaction.c
--- a/tools/xenstore/xenstored_transaction.c    Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_transaction.c    Mon Oct 10 14:38:01 2005
@@ -37,7 +37,7 @@
 
 struct changed_node
 {
-       /* The list within this transaction. */
+       /* List of all changed nodes in the context of this transaction. */
        struct list_head list;
 
        /* The name of the node. */
@@ -49,14 +49,14 @@
 
 struct transaction
 {
-       /* Global list of transactions. */
+       /* List of all transactions active on this connection. */
        struct list_head list;
+
+       /* Connection-local identifier for this transaction. */
+       u32 id;
 
        /* Generation when transaction started. */
        unsigned int generation;
-
-       /* My owner (conn->transaction == me). */
-       struct connection *conn;
 
        /* TDB to work on, and filename */
        TDB_CONTEXT *tdb;
@@ -65,7 +65,7 @@
        /* List of changed nodes. */
        struct list_head changes;
 };
-static LIST_HEAD(transactions);
+
 static unsigned int generation;
 
 /* Return tdb context to use for this connection. */
@@ -100,7 +100,6 @@
 {
        struct transaction *trans = _transaction;
 
-       list_del(&trans->list);
        trace_destroy(trans, "transaction");
        if (trans->tdb)
                tdb_close(trans->tdb);
@@ -108,10 +107,26 @@
        return 0;
 }
 
+struct transaction *transaction_lookup(struct connection *conn, u32 id)
+{
+       struct transaction *trans;
+
+       if (id == 0)
+               return NULL;
+
+       list_for_each_entry(trans, &conn->transaction_list, list)
+               if (trans->id == id)
+                       return trans;
+
+       return ERR_PTR(-ENOENT);
+}
+
 void do_transaction_start(struct connection *conn, struct buffered_data *in)
 {
-       struct transaction *trans;
-
+       struct transaction *trans, *exists;
+       char id_str[20];
+
+       /* We don't support nested transactions. */
        if (conn->transaction) {
                send_error(conn, EBUSY);
                return;
@@ -120,7 +135,6 @@
        /* Attach transaction to input for autofree until it's complete */
        trans = talloc(in, struct transaction);
        INIT_LIST_HEAD(&trans->changes);
-       trans->conn = conn;
        trans->generation = generation;
        trans->tdb_name = talloc_asprintf(trans, "%s.%p",
                                          xs_daemon_tdb(), trans);
@@ -132,11 +146,19 @@
        /* Make it close if we go away. */
        talloc_steal(trans, trans->tdb);
 
+       /* Pick an unused transaction identifier. */
+       do {
+               trans->id = conn->next_transaction_id;
+               exists = transaction_lookup(conn, conn->next_transaction_id++);
+       } while (!IS_ERR(exists));
+
        /* Now we own it. */
-       conn->transaction = talloc_steal(conn, trans);
-       list_add_tail(&trans->list, &transactions);
+       list_add_tail(&trans->list, &conn->transaction_list);
+       talloc_steal(conn, trans);
        talloc_set_destructor(trans, destroy_transaction);
-       send_ack(conn, XS_TRANSACTION_START);
+
+       sprintf(id_str, "%u", trans->id);
+       send_reply(conn, XS_TRANSACTION_START, id_str, strlen(id_str)+1);
 }
 
 void do_transaction_end(struct connection *conn, const char *arg)
@@ -149,13 +171,13 @@
                return;
        }
 
-       if (!conn->transaction) {
+       if ((trans = conn->transaction) == NULL) {
                send_error(conn, ENOENT);
                return;
        }
 
-       trans = conn->transaction;
        conn->transaction = NULL;
+       list_del(&trans->list);
 
        /* Attach transaction to arg for auto-cleanup */
        talloc_steal(arg, trans);
@@ -181,3 +203,12 @@
        send_ack(conn, XS_TRANSACTION_END);
 }
 
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_transaction.h
--- a/tools/xenstore/xenstored_transaction.h    Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_transaction.h    Mon Oct 10 14:38:01 2005
@@ -25,10 +25,11 @@
 void do_transaction_start(struct connection *conn, struct buffered_data *node);
 void do_transaction_end(struct connection *conn, const char *arg);
 
-bool transaction_block(struct connection *conn);
+struct transaction *transaction_lookup(struct connection *conn, u32 id);
 
 /* This node was changed: can fail and longjmp. */
-void add_change_node(struct transaction *trans, const char *node, bool 
recurse);
+void add_change_node(struct transaction *trans, const char *node,
+                     bool recurse);
 
 /* Return tdb context to use for this connection. */
 TDB_CONTEXT *tdb_transaction_context(struct transaction *trans);
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xs.c
--- a/tools/xenstore/xs.c       Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xs.c       Mon Oct 10 14:38:01 2005
@@ -75,36 +75,9 @@
 
        /* One request at a time. */
        pthread_mutex_t request_mutex;
-
-       /* One transaction at a time. */
-       pthread_mutex_t transaction_mutex;
-       pthread_t transaction_pthread;
 };
 
-struct xs_transaction_handle {
-       int id;
-};
-
 static void *read_thread(void *arg);
-
-static void request_mutex_acquire(struct xs_handle *h)
-{
-       /*
-        * We can't distinguish non-transactional from transactional
-        * requests right now. So temporarily acquire the transaction mutex
-        * if this task is outside transaction context.
-        */
-       if (h->transaction_pthread != pthread_self())
-               pthread_mutex_lock(&h->transaction_mutex);
-       pthread_mutex_lock(&h->request_mutex);
-}
-
-static void request_mutex_release(struct xs_handle *h)
-{
-       pthread_mutex_unlock(&h->request_mutex);
-       if (h->transaction_pthread != pthread_self())
-               pthread_mutex_unlock(&h->transaction_mutex);
-}
 
 int xs_fileno(struct xs_handle *h)
 {
@@ -186,8 +159,6 @@
        pthread_cond_init(&h->reply_condvar, NULL);
 
        pthread_mutex_init(&h->request_mutex, NULL);
-       pthread_mutex_init(&h->transaction_mutex, NULL);
-       h->transaction_pthread = -1;
 
        if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
                goto error;
@@ -223,7 +194,6 @@
 {
        struct xs_stored_msg *msg, *tmsg;
 
-       pthread_mutex_lock(&h->transaction_mutex);
        pthread_mutex_lock(&h->request_mutex);
        pthread_mutex_lock(&h->reply_mutex);
        pthread_mutex_lock(&h->watch_mutex);
@@ -242,7 +212,6 @@
                free(msg);
        }
 
-       pthread_mutex_unlock(&h->transaction_mutex);
        pthread_mutex_unlock(&h->request_mutex);
        pthread_mutex_unlock(&h->reply_mutex);
        pthread_mutex_unlock(&h->watch_mutex);
@@ -321,8 +290,10 @@
 }
 
 /* Send message to xs, get malloc'ed reply.  NULL and set errno on error. */
-static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type,
-                     const struct iovec *iovec, unsigned int num_vecs,
+static void *xs_talkv(struct xs_handle *h, struct xs_transaction_handle *t,
+                     enum xsd_sockmsg_type type,
+                     const struct iovec *iovec,
+                     unsigned int num_vecs,
                      unsigned int *len)
 {
        struct xsd_sockmsg msg;
@@ -331,6 +302,7 @@
        unsigned int i;
        struct sigaction ignorepipe, oldact;
 
+       msg.tx_id = (u32)(unsigned long)t;
        msg.type = type;
        msg.len = 0;
        for (i = 0; i < num_vecs; i++)
@@ -341,7 +313,7 @@
        ignorepipe.sa_flags = 0;
        sigaction(SIGPIPE, &ignorepipe, &oldact);
 
-       request_mutex_acquire(h);
+       pthread_mutex_lock(&h->request_mutex);
 
        if (!xs_write_all(h->fd, &msg, sizeof(msg)))
                goto fail;
@@ -354,7 +326,7 @@
        if (!ret)
                goto fail;
 
-       request_mutex_release(h);
+       pthread_mutex_unlock(&h->request_mutex);
 
        sigaction(SIGPIPE, &oldact, NULL);
        if (msg.type == XS_ERROR) {
@@ -375,7 +347,7 @@
 fail:
        /* We're in a bad state, so close fd. */
        saved_errno = errno;
-       request_mutex_release(h);
+       pthread_mutex_unlock(&h->request_mutex);
        sigaction(SIGPIPE, &oldact, NULL);
 close_fd:
        close(h->fd);
@@ -393,14 +365,16 @@
 }
 
 /* Simplified version of xs_talkv: single message. */
-static void *xs_single(struct xs_handle *h, enum xsd_sockmsg_type type,
-                      const char *string, unsigned int *len)
+static void *xs_single(struct xs_handle *h, struct xs_transaction_handle *t,
+                      enum xsd_sockmsg_type type,
+                      const char *string,
+                      unsigned int *len)
 {
        struct iovec iovec;
 
        iovec.iov_base = (void *)string;
        iovec.iov_len = strlen(string) + 1;
-       return xs_talkv(h, type, &iovec, 1, len);
+       return xs_talkv(h, t, type, &iovec, 1, len);
 }
 
 static bool xs_bool(char *reply)
@@ -417,7 +391,7 @@
        char *strings, *p, **ret;
        unsigned int len;
 
-       strings = xs_single(h, XS_DIRECTORY, path, &len);
+       strings = xs_single(h, t, XS_DIRECTORY, path, &len);
        if (!strings)
                return NULL;
 
@@ -446,7 +420,7 @@
 void *xs_read(struct xs_handle *h, struct xs_transaction_handle *t,
              const char *path, unsigned int *len)
 {
-       return xs_single(h, XS_READ, path, len);
+       return xs_single(h, t, XS_READ, path, len);
 }
 
 /* Write the value of a single file.
@@ -462,7 +436,8 @@
        iovec[1].iov_base = (void *)data;
        iovec[1].iov_len = len;
 
-       return xs_bool(xs_talkv(h, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
+       return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
+                               ARRAY_SIZE(iovec), NULL));
 }
 
 /* Create a new directory.
@@ -471,7 +446,7 @@
 bool xs_mkdir(struct xs_handle *h, struct xs_transaction_handle *t,
              const char *path)
 {
-       return xs_bool(xs_single(h, XS_MKDIR, path, NULL));
+       return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
 }
 
 /* Destroy a file or directory (directories must be empty).
@@ -480,7 +455,7 @@
 bool xs_rm(struct xs_handle *h, struct xs_transaction_handle *t,
           const char *path)
 {
-       return xs_bool(xs_single(h, XS_RM, path, NULL));
+       return xs_bool(xs_single(h, t, XS_RM, path, NULL));
 }
 
 /* Get permissions of node (first element is owner).
@@ -494,7 +469,7 @@
        unsigned int len;
        struct xs_permissions *ret;
 
-       strings = xs_single(h, XS_GET_PERMS, path, &len);
+       strings = xs_single(h, t, XS_GET_PERMS, path, &len);
        if (!strings)
                return NULL;
 
@@ -544,7 +519,7 @@
                        goto unwind;
        }
 
-       if (!xs_bool(xs_talkv(h, XS_SET_PERMS, iov, 1+num_perms, NULL)))
+       if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
                goto unwind;
        for (i = 0; i < num_perms; i++)
                free(iov[i+1].iov_base);
@@ -571,7 +546,8 @@
        iov[1].iov_base = (void *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_bool(xs_talkv(h, NULL, XS_WATCH, iov,
+                               ARRAY_SIZE(iov), NULL));
 }
 
 /* Find out what node change was on (will block if nothing pending).
@@ -637,7 +613,8 @@
        iov[1].iov_base = (char *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_bool(xs_talkv(h, XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_bool(xs_talkv(h, NULL, XS_UNWATCH, iov,
+                               ARRAY_SIZE(iov), NULL));
 }
 
 /* Start a transaction: changes by others will not be seen during this
@@ -647,18 +624,17 @@
  */
 struct xs_transaction_handle *xs_transaction_start(struct xs_handle *h)
 {
-       bool rc;
-
-       pthread_mutex_lock(&h->transaction_mutex);
-       h->transaction_pthread = pthread_self();
-
-       rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
-       if (!rc) {
-               h->transaction_pthread = -1;
-               pthread_mutex_unlock(&h->transaction_mutex);
-       }
-
-       return (struct xs_transaction_handle *)rc;
+       char *id_str;
+       unsigned long id;
+
+       id_str = xs_single(h, NULL, XS_TRANSACTION_START, "", NULL);
+       if (id_str == NULL)
+               return NULL;
+
+       id = strtoul(id_str, NULL, 0);
+       free(id_str);
+
+       return (struct xs_transaction_handle *)id;
 }
 
 /* End a transaction.
@@ -670,22 +646,13 @@
                        bool abort)
 {
        char abortstr[2];
-       bool rc;
-
-       if (t == NULL)
-               return -EINVAL;
 
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
        
-       rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
-
-       h->transaction_pthread = -1;
-       pthread_mutex_unlock(&h->transaction_mutex);
-
-       return rc;
+       return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
 }
 
 /* Introduce a new domain.
@@ -713,7 +680,8 @@
        iov[3].iov_base = (char *)path;
        iov[3].iov_len = strlen(path) + 1;
 
-       return xs_bool(xs_talkv(h, XS_INTRODUCE, iov, ARRAY_SIZE(iov), NULL));
+       return xs_bool(xs_talkv(h, NULL, XS_INTRODUCE, iov,
+                               ARRAY_SIZE(iov), NULL));
 }
 
 bool xs_release_domain(struct xs_handle *h, domid_t domid)
@@ -722,7 +690,7 @@
 
        sprintf(domid_str, "%u", domid);
 
-       return xs_bool(xs_single(h, XS_RELEASE, domid_str, NULL));
+       return xs_bool(xs_single(h, NULL, XS_RELEASE, domid_str, NULL));
 }
 
 char *xs_get_domain_path(struct xs_handle *h, domid_t domid)
@@ -731,7 +699,7 @@
 
        sprintf(domid_str, "%u", domid);
 
-       return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
+       return xs_single(h, NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
 }
 
 /* Only useful for DEBUG versions */
@@ -745,7 +713,8 @@
        iov[1].iov_base = data;
        iov[1].iov_len = len;
 
-       return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
+       return xs_talkv(h, NULL, XS_DEBUG, iov,
+                       ARRAY_SIZE(iov), NULL);
 }
 
 static void *read_thread(void *arg)

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog

<Prev in Thread] Current Thread [Next in Thread>
  • [Xen-changelog] xenstored now supports multiple concurrent transactions per, Xen patchbot -unstable <=