# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 015f8ae8127649f0c69904fd063ca45d304d4e0c
# Parent 1ac39c7a043541cfa94655f0e9ab98d4503c29a2
xenstored now supports multiple concurrent transactions per
connection, plus interleaving of transactional and
non-transactional accesses. A transaction identifier is added
to the xsd_sockmsg header structure (0 means 'not in context
of a transaction'). The user and kernel xs interfaces accept
a pointer to a transaction handle where appropriate --
currently this is directly cast to an integer identifier in
the client library / kernel driver, but will allow for keeping
extra dynamic client-side state in future if we need to.
The transaction mutex has now gone. It's replaced with a
read-write mutex, but this is only acquired for exclusive
access during suspend/resume, to ensure there are no in-progress
transactions.
Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>
diff -r 1ac39c7a0435 -r 015f8ae81276
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c Mon Oct 10
13:46:53 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c Mon Oct 10
14:38:01 2005
@@ -45,8 +45,14 @@
#include <asm-xen/xen_proc.h>
#include <asm/hypervisor.h>
+struct xenbus_dev_transaction {
+ struct list_head list;
+ struct xenbus_transaction *handle;
+};
+
struct xenbus_dev_data {
- int in_transaction;
+ /* In-progress transaction. */
+ struct list_head transactions;
/* Partial request. */
unsigned int len;
@@ -103,6 +109,7 @@
size_t len, loff_t *ppos)
{
struct xenbus_dev_data *u = filp->private_data;
+ struct xenbus_dev_transaction *trans;
void *reply;
int err = 0;
@@ -129,13 +136,24 @@
case XS_RM:
case XS_SET_PERMS:
reply = xenbus_dev_request_and_reply(&u->u.msg);
- if (IS_ERR(reply))
+ if (IS_ERR(reply)) {
err = PTR_ERR(reply);
- else {
- if (u->u.msg.type == XS_TRANSACTION_START)
- u->in_transaction = 1;
- if (u->u.msg.type == XS_TRANSACTION_END)
- u->in_transaction = 0;
+ } else {
+ if (u->u.msg.type == XS_TRANSACTION_START) {
+ trans = kmalloc(sizeof(*trans), GFP_KERNEL);
+ trans->handle = (struct xenbus_transaction *)
+ simple_strtoul(reply, NULL, 0);
+ list_add(&trans->list, &u->transactions);
+ } else if (u->u.msg.type == XS_TRANSACTION_END) {
+ list_for_each_entry(trans, &u->transactions,
+ list)
+ if ((unsigned long)trans->handle ==
+ (unsigned long)u->u.msg.tx_id)
+ break;
+ BUG_ON(&trans->list == &u->transactions);
+ list_del(&trans->list);
+ kfree(trans);
+ }
queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
queue_reply(u, (char *)reply, u->u.msg.len);
kfree(reply);
@@ -169,6 +187,7 @@
return -ENOMEM;
memset(u, 0, sizeof(*u));
+ INIT_LIST_HEAD(&u->transactions);
init_waitqueue_head(&u->read_waitq);
filp->private_data = u;
@@ -179,9 +198,13 @@
static int xenbus_dev_release(struct inode *inode, struct file *filp)
{
struct xenbus_dev_data *u = filp->private_data;
-
- if (u->in_transaction)
- xenbus_transaction_end((struct xenbus_transaction *)1, 1);
+ struct xenbus_dev_transaction *trans, *tmp;
+
+ list_for_each_entry_safe(trans, tmp, &u->transactions, list) {
+ xenbus_transaction_end(trans->handle, 1);
+ list_del(&trans->list);
+ kfree(trans);
+ }
kfree(u);
diff -r 1ac39c7a0435 -r 015f8ae81276
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c Mon Oct 10
13:46:53 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c Mon Oct 10
14:38:01 2005
@@ -71,38 +71,14 @@
/* One request at a time. */
struct semaphore request_mutex;
- /* One transaction at a time. */
- struct semaphore transaction_mutex;
- int transaction_pid;
+ /* Protect transactions against save/restore. */
+ struct rw_semaphore suspend_mutex;
};
static struct xs_handle xs_state;
static LIST_HEAD(watches);
static DEFINE_SPINLOCK(watches_lock);
-
-/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
-static int xs_resuming;
-static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
-
-static void request_mutex_acquire(void)
-{
- /*
- * We can't distinguish non-transactional from transactional
- * requests right now. So temporarily acquire the transaction mutex
- * if this task is outside transaction context.
- */
- if (xs_state.transaction_pid != current->pid)
- down(&xs_state.transaction_mutex);
- down(&xs_state.request_mutex);
-}
-
-static void request_mutex_release(void)
-{
- up(&xs_state.request_mutex);
- if (xs_state.transaction_pid != current->pid)
- up(&xs_state.transaction_mutex);
-}
static int get_error(const char *errorstring)
{
@@ -152,17 +128,17 @@
/* Emergency write. */
void xenbus_debug_write(const char *str, unsigned int count)
{
- struct xsd_sockmsg msg;
+ struct xsd_sockmsg msg = { 0 };
msg.type = XS_DEBUG;
msg.len = sizeof("print") + count + 1;
- request_mutex_acquire();
+ down(&xs_state.request_mutex);
xb_write(&msg, sizeof(msg));
xb_write("print", sizeof("print"));
xb_write(str, count);
xb_write("", 1);
- request_mutex_release();
+ up(&xs_state.request_mutex);
}
void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
@@ -171,12 +147,10 @@
struct xsd_sockmsg req_msg = *msg;
int err;
- if (req_msg.type == XS_TRANSACTION_START) {
- down(&xs_state.transaction_mutex);
- xs_state.transaction_pid = current->pid;
- }
-
- request_mutex_acquire();
+ if (req_msg.type == XS_TRANSACTION_START)
+ down_read(&xs_state.suspend_mutex);
+
+ down(&xs_state.request_mutex);
err = xb_write(msg, sizeof(*msg) + msg->len);
if (err) {
@@ -186,20 +160,19 @@
ret = read_reply(&msg->type, &msg->len);
}
- request_mutex_release();
+ up(&xs_state.request_mutex);
if ((msg->type == XS_TRANSACTION_END) ||
((req_msg.type == XS_TRANSACTION_START) &&
- (msg->type == XS_ERROR))) {
- xs_state.transaction_pid = -1;
- up(&xs_state.transaction_mutex);
- }
+ (msg->type == XS_ERROR)))
+ up_read(&xs_state.suspend_mutex);
return ret;
}
/* Send message to xs, get kmalloc'ed reply. ERR_PTR() on error. */
-static void *xs_talkv(enum xsd_sockmsg_type type,
+static void *xs_talkv(struct xenbus_transaction *t,
+ enum xsd_sockmsg_type type,
const struct kvec *iovec,
unsigned int num_vecs,
unsigned int *len)
@@ -209,12 +182,13 @@
unsigned int i;
int err;
+ msg.tx_id = (u32)(unsigned long)t;
msg.type = type;
msg.len = 0;
for (i = 0; i < num_vecs; i++)
msg.len += iovec[i].iov_len;
- request_mutex_acquire();
+ down(&xs_state.request_mutex);
err = xb_write(&msg, sizeof(msg));
if (err) {
@@ -225,14 +199,14 @@
for (i = 0; i < num_vecs; i++) {
err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
if (err) {
- request_mutex_release();
+ up(&xs_state.request_mutex);
return ERR_PTR(err);
}
}
ret = read_reply(&msg.type, len);
- request_mutex_release();
+ up(&xs_state.request_mutex);
if (IS_ERR(ret))
return ret;
@@ -248,14 +222,16 @@
}
/* Simplified version of xs_talkv: single message. */
-static void *xs_single(enum xsd_sockmsg_type type,
- const char *string, unsigned int *len)
+static void *xs_single(struct xenbus_transaction *t,
+ enum xsd_sockmsg_type type,
+ const char *string,
+ unsigned int *len)
{
struct kvec iovec;
iovec.iov_base = (void *)string;
iovec.iov_len = strlen(string) + 1;
- return xs_talkv(type, &iovec, 1, len);
+ return xs_talkv(t, type, &iovec, 1, len);
}
/* Many commands only need an ack, don't care what it says. */
@@ -322,7 +298,7 @@
char *strings;
unsigned int len;
- strings = xs_single(XS_DIRECTORY, join(dir, node), &len);
+ strings = xs_single(t, XS_DIRECTORY, join(dir, node), &len);
if (IS_ERR(strings))
return (char **)strings;
@@ -352,7 +328,7 @@
void *xenbus_read(struct xenbus_transaction *t,
const char *dir, const char *node, unsigned int *len)
{
- return xs_single(XS_READ, join(dir, node), len);
+ return xs_single(t, XS_READ, join(dir, node), len);
}
EXPORT_SYMBOL(xenbus_read);
@@ -372,7 +348,7 @@
iovec[1].iov_base = (void *)string;
iovec[1].iov_len = strlen(string);
- return xs_error(xs_talkv(XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
+ return xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
}
EXPORT_SYMBOL(xenbus_write);
@@ -380,14 +356,14 @@
int xenbus_mkdir(struct xenbus_transaction *t,
const char *dir, const char *node)
{
- return xs_error(xs_single(XS_MKDIR, join(dir, node), NULL));
+ return xs_error(xs_single(t, XS_MKDIR, join(dir, node), NULL));
}
EXPORT_SYMBOL(xenbus_mkdir);
/* Destroy a file or directory (directories must be empty). */
int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node)
{
- return xs_error(xs_single(XS_RM, join(dir, node), NULL));
+ return xs_error(xs_single(t, XS_RM, join(dir, node), NULL));
}
EXPORT_SYMBOL(xenbus_rm);
@@ -396,18 +372,21 @@
*/
struct xenbus_transaction *xenbus_transaction_start(void)
{
- int err;
-
- down(&xs_state.transaction_mutex);
- xs_state.transaction_pid = current->pid;
-
- err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
- if (err) {
- xs_state.transaction_pid = -1;
- up(&xs_state.transaction_mutex);
- }
-
- return err ? ERR_PTR(err) : (struct xenbus_transaction *)1;
+ char *id_str;
+ unsigned long id;
+
+ down_read(&xs_state.suspend_mutex);
+
+ id_str = xs_single(NULL, XS_TRANSACTION_START, "", NULL);
+ if (IS_ERR(id_str)) {
+ up_read(&xs_state.suspend_mutex);
+ return (struct xenbus_transaction *)id_str;
+ }
+
+ id = simple_strtoul(id_str, NULL, 0);
+ kfree(id_str);
+
+ return (struct xenbus_transaction *)id;
}
EXPORT_SYMBOL(xenbus_transaction_start);
@@ -419,17 +398,14 @@
char abortstr[2];
int err;
- BUG_ON(t == NULL);
-
if (abort)
strcpy(abortstr, "F");
else
strcpy(abortstr, "T");
- err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
-
- xs_state.transaction_pid = -1;
- up(&xs_state.transaction_mutex);
+ err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
+
+ up_read(&xs_state.suspend_mutex);
return err;
}
@@ -567,7 +543,8 @@
iov[1].iov_base = (void *)token;
iov[1].iov_len = strlen(token) + 1;
- return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
+ return xs_error(xs_talkv(NULL, XS_WATCH, iov,
+ ARRAY_SIZE(iov), NULL));
}
static int xs_unwatch(const char *path, const char *token)
@@ -579,7 +556,8 @@
iov[1].iov_base = (char *)token;
iov[1].iov_len = strlen(token) + 1;
- return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
+ return xs_error(xs_talkv(NULL, XS_UNWATCH, iov,
+ ARRAY_SIZE(iov), NULL));
}
static struct xenbus_watch *find_watch(const char *token)
@@ -603,6 +581,8 @@
int err;
sprintf(token, "%lX", (long)watch);
+
+ down_read(&xs_state.suspend_mutex);
spin_lock(&watches_lock);
BUG_ON(find_watch(token));
@@ -617,6 +597,8 @@
spin_unlock(&watches_lock);
}
+ up_read(&xs_state.suspend_mutex);
+
return err;
}
EXPORT_SYMBOL(register_xenbus_watch);
@@ -627,14 +609,13 @@
int err;
sprintf(token, "%lX", (long)watch);
+
+ down_read(&xs_state.suspend_mutex);
spin_lock(&watches_lock);
BUG_ON(!find_watch(token));
list_del(&watch->list);
spin_unlock(&watches_lock);
-
- /* Ensure xs_resume() is not in progress (see comments there). */
- wait_event(xs_resuming_waitq, !xs_resuming);
err = xs_unwatch(watch->node, token);
if (err)
@@ -642,6 +623,8 @@
"XENBUS Failed to release watch %s: %i\n",
watch->node, err);
+ up_read(&xs_state.suspend_mutex);
+
/* Make sure watch is not in use. */
flush_scheduled_work();
}
@@ -649,58 +632,24 @@
void xs_suspend(void)
{
- down(&xs_state.transaction_mutex);
+ down_write(&xs_state.suspend_mutex);
down(&xs_state.request_mutex);
}
void xs_resume(void)
{
- struct list_head *ent, *prev_ent = &watches;
struct xenbus_watch *watch;
char token[sizeof(watch) * 2 + 1];
- /* Protect against concurrent unregistration and freeing of watches. */
- BUG_ON(xs_resuming);
- xs_resuming = 1;
-
up(&xs_state.request_mutex);
- up(&xs_state.transaction_mutex);
-
- /*
- * Iterate over the watch list re-registering each node. We must
- * be careful about concurrent registrations and unregistrations.
- * We search for the node immediately following the previously
- * re-registered node. If we get no match then either we are done
- * (previous node is last in list) or the node was unregistered, in
- * which case we restart from the beginning of the list.
- * register_xenbus_watch() + unregister_xenbus_watch() is safe because
- * it will only ever move a watch node earlier in the list, so it
- * cannot cause us to skip nodes.
- */
- for (;;) {
- spin_lock(&watches_lock);
- list_for_each(ent, &watches)
- if (ent->prev == prev_ent)
- break;
- spin_unlock(&watches_lock);
-
- /* No match because prev_ent is at the end of the list? */
- if ((ent == &watches) && (watches.prev == prev_ent))
- break; /* We're done! */
-
- if ((prev_ent = ent) != &watches) {
- /*
- * Safe even with watch_lock not held. We are saved by
- * (xs_resumed==1) check in unregister_xenbus_watch.
- */
- watch = list_entry(ent, struct xenbus_watch, list);
- sprintf(token, "%lX", (long)watch);
- xs_watch(watch->node, token);
- }
- }
-
- xs_resuming = 0;
- wake_up(&xs_resuming_waitq);
+
+ /* No need for watches_lock: the suspend_mutex is sufficient. */
+ list_for_each_entry(watch, &watches, list) {
+ sprintf(token, "%lX", (long)watch);
+ xs_watch(watch->node, token);
+ }
+
+ up_write(&xs_state.suspend_mutex);
}
static void xenbus_fire_watch(void *arg)
@@ -801,8 +750,7 @@
init_waitqueue_head(&xs_state.reply_waitq);
init_MUTEX(&xs_state.request_mutex);
- init_MUTEX(&xs_state.transaction_mutex);
- xs_state.transaction_pid = -1;
+ init_rwsem(&xs_state.suspend_mutex);
/* Initialize the shared memory rings to talk to xenstored */
err = xb_init_comms();
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/utils.h
--- a/tools/xenstore/utils.h Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/utils.h Mon Oct 10 14:38:01 2005
@@ -55,4 +55,34 @@
#define dprintf(_fmt, _args...) ((void)0)
#endif
+/*
+ * Mux errno values onto returned pointers.
+ */
+
+static inline void *ERR_PTR(long error)
+{
+ return (void *)error;
+}
+
+static inline long PTR_ERR(const void *ptr)
+{
+ return (long)ptr;
+}
+
+static inline long IS_ERR(const void *ptr)
+{
+ return ((unsigned long)ptr > (unsigned long)-1000L);
+}
+
+
#endif /* _UTILS_H */
+
+/*
+ * Local variables:
+ * c-file-style: "linux"
+ * indent-tabs-mode: t
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * tab-width: 8
+ * End:
+ */
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstore_client.c
--- a/tools/xenstore/xenstore_client.c Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstore_client.c Mon Oct 10 14:38:01 2005
@@ -34,14 +34,10 @@
struct xs_handle *xsh;
struct xs_transaction_handle *xth;
bool success;
- int ret = 0;
+ int ret = 0, socket = 0;
#if defined(CLIENT_read) || defined(CLIENT_list)
int prefix = 0;
#endif
-
- xsh = xs_domain_open();
- if (xsh == NULL)
- err(1, "xs_domain_open");
while (1) {
int c, index = 0;
@@ -50,10 +46,11 @@
#if defined(CLIENT_read) || defined(CLIENT_list)
{"prefix", 0, 0, 'p'},
#endif
+ {"socket", 0, 0, 's'},
{0, 0, 0, 0}
};
- c = getopt_long(argc, argv, "h"
+ c = getopt_long(argc, argv, "hs"
#if defined(CLIENT_read) || defined(CLIENT_list)
"p"
#endif
@@ -65,6 +62,9 @@
case 'h':
usage(argv[0]);
/* NOTREACHED */
+ case 's':
+ socket = 1;
+ break;
#if defined(CLIENT_read) || defined(CLIENT_list)
case 'p':
prefix = 1;
@@ -83,6 +83,10 @@
/* NOTREACHED */
}
#endif
+
+ xsh = socket ? xs_daemon_open() : xs_domain_open();
+ if (xsh == NULL)
+ err(1, socket ? "xs_daemon_open" : "xs_domain_open");
again:
xth = xs_transaction_start(xsh);
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_core.c Mon Oct 10 14:38:01 2005
@@ -238,46 +238,47 @@
static bool write_messages(struct connection *conn)
{
int ret;
- struct buffered_data *out, *tmp;
-
- list_for_each_entry_safe(out, tmp, &conn->out_list, list) {
- if (out->inhdr) {
- if (verbose)
- xprintf("Writing msg %s (%s) out to %p\n",
- sockmsg_string(out->hdr.msg.type),
- out->buffer, conn);
- ret = conn->write(conn, out->hdr.raw + out->used,
- sizeof(out->hdr) - out->used);
- if (ret < 0)
- return false;
-
- out->used += ret;
- if (out->used < sizeof(out->hdr))
- return true;
-
- out->inhdr = false;
- out->used = 0;
-
- /* Second write might block if non-zero. */
- if (out->hdr.msg.len && !conn->domain)
- return true;
- }
-
- ret = conn->write(conn, out->buffer + out->used,
- out->hdr.msg.len - out->used);
-
+ struct buffered_data *out;
+
+ out = list_top(&conn->out_list, struct buffered_data, list);
+ if (out == NULL)
+ return true;
+
+ if (out->inhdr) {
+ if (verbose)
+ xprintf("Writing msg %s (%s) out to %p\n",
+ sockmsg_string(out->hdr.msg.type),
+ out->buffer, conn);
+ ret = conn->write(conn, out->hdr.raw + out->used,
+ sizeof(out->hdr) - out->used);
if (ret < 0)
return false;
out->used += ret;
- if (out->used != out->hdr.msg.len)
+ if (out->used < sizeof(out->hdr))
return true;
- trace_io(conn, "OUT", out);
-
- list_del(&out->list);
- talloc_free(out);
- }
+ out->inhdr = false;
+ out->used = 0;
+
+ /* Second write might block if non-zero. */
+ if (out->hdr.msg.len && !conn->domain)
+ return true;
+ }
+
+ ret = conn->write(conn, out->buffer + out->used,
+ out->hdr.msg.len - out->used);
+ if (ret < 0)
+ return false;
+
+ out->used += ret;
+ if (out->used != out->hdr.msg.len)
+ return true;
+
+ trace_io(conn, "OUT", out);
+
+ list_del(&out->list);
+ talloc_free(out);
return true;
}
@@ -1042,6 +1043,17 @@
*/
static void process_message(struct connection *conn, struct buffered_data *in)
{
+ struct transaction *trans;
+
+ trans = transaction_lookup(conn, in->hdr.msg.tx_id);
+ if (IS_ERR(trans)) {
+ send_error(conn, -PTR_ERR(trans));
+ return;
+ }
+
+ assert(conn->transaction == NULL);
+ conn->transaction = trans;
+
switch (in->hdr.msg.type) {
case XS_DIRECTORY:
send_directory(conn, onearg(in));
@@ -1116,11 +1128,13 @@
do_get_domain_path(conn, onearg(in));
break;
- case XS_WATCH_EVENT:
default:
eprintf("Client unknown operation %i", in->hdr.msg.type);
send_error(conn, ENOSYS);
- }
+ break;
+ }
+
+ conn->transaction = NULL;
}
static int out_of_mem(void *data)
@@ -1239,15 +1253,14 @@
if (!new)
return NULL;
+ memset(new, 0, sizeof(*new));
new->fd = -1;
- new->id = 0;
- new->domain = NULL;
- new->transaction = NULL;
new->write = write;
new->read = read;
new->can_write = true;
INIT_LIST_HEAD(&new->out_list);
INIT_LIST_HEAD(&new->watches);
+ INIT_LIST_HEAD(&new->transaction_list);
talloc_set_fail_handler(out_of_mem, &talloc_fail);
if (setjmp(talloc_fail)) {
@@ -1410,6 +1423,7 @@
static struct option options[] = {
+ { "no-domain-init", 0, NULL, 'D' },
{ "pid-file", 1, NULL, 'F' },
{ "no-fork", 0, NULL, 'N' },
{ "output-pid", 0, NULL, 'P' },
@@ -1424,11 +1438,15 @@
fd_set inset, outset;
bool dofork = true;
bool outputpid = false;
+ bool no_domain_init = false;
const char *pidfile = NULL;
- while ((opt = getopt_long(argc, argv, "F:NPT:V", options,
+ while ((opt = getopt_long(argc, argv, "DF:NPT:V", options,
NULL)) != -1) {
switch (opt) {
+ case 'D':
+ no_domain_init = true;
+ break;
case 'F':
pidfile = optarg;
break;
@@ -1501,7 +1519,8 @@
setup_structure();
/* Listen to hypervisor. */
- event_fd = domain_init();
+ if (!no_domain_init)
+ event_fd = domain_init();
/* Restore existing connections. */
restore_existing_connections();
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_core.h
--- a/tools/xenstore/xenstored_core.h Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_core.h Mon Oct 10 14:38:01 2005
@@ -71,8 +71,12 @@
/* Buffered output data */
struct list_head out_list;
- /* My transaction, if any. */
+ /* Transaction context for current request (NULL if none). */
struct transaction *transaction;
+
+ /* List of in-progress transactions. */
+ struct list_head transaction_list;
+ u32 next_transaction_id;
/* The domain I'm associated with, if any. */
struct domain *domain;
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_transaction.c
--- a/tools/xenstore/xenstored_transaction.c Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_transaction.c Mon Oct 10 14:38:01 2005
@@ -37,7 +37,7 @@
struct changed_node
{
- /* The list within this transaction. */
+ /* List of all changed nodes in the context of this transaction. */
struct list_head list;
/* The name of the node. */
@@ -49,14 +49,14 @@
struct transaction
{
- /* Global list of transactions. */
+ /* List of all transactions active on this connection. */
struct list_head list;
+
+ /* Connection-local identifier for this transaction. */
+ u32 id;
/* Generation when transaction started. */
unsigned int generation;
-
- /* My owner (conn->transaction == me). */
- struct connection *conn;
/* TDB to work on, and filename */
TDB_CONTEXT *tdb;
@@ -65,7 +65,7 @@
/* List of changed nodes. */
struct list_head changes;
};
-static LIST_HEAD(transactions);
+
static unsigned int generation;
/* Return tdb context to use for this connection. */
@@ -100,7 +100,6 @@
{
struct transaction *trans = _transaction;
- list_del(&trans->list);
trace_destroy(trans, "transaction");
if (trans->tdb)
tdb_close(trans->tdb);
@@ -108,10 +107,26 @@
return 0;
}
+struct transaction *transaction_lookup(struct connection *conn, u32 id)
+{
+ struct transaction *trans;
+
+ if (id == 0)
+ return NULL;
+
+ list_for_each_entry(trans, &conn->transaction_list, list)
+ if (trans->id == id)
+ return trans;
+
+ return ERR_PTR(-ENOENT);
+}
+
void do_transaction_start(struct connection *conn, struct buffered_data *in)
{
- struct transaction *trans;
-
+ struct transaction *trans, *exists;
+ char id_str[20];
+
+ /* We don't support nested transactions. */
if (conn->transaction) {
send_error(conn, EBUSY);
return;
@@ -120,7 +135,6 @@
/* Attach transaction to input for autofree until it's complete */
trans = talloc(in, struct transaction);
INIT_LIST_HEAD(&trans->changes);
- trans->conn = conn;
trans->generation = generation;
trans->tdb_name = talloc_asprintf(trans, "%s.%p",
xs_daemon_tdb(), trans);
@@ -132,11 +146,19 @@
/* Make it close if we go away. */
talloc_steal(trans, trans->tdb);
+ /* Pick an unused transaction identifier. */
+ do {
+ trans->id = conn->next_transaction_id;
+ exists = transaction_lookup(conn, conn->next_transaction_id++);
+ } while (!IS_ERR(exists));
+
/* Now we own it. */
- conn->transaction = talloc_steal(conn, trans);
- list_add_tail(&trans->list, &transactions);
+ list_add_tail(&trans->list, &conn->transaction_list);
+ talloc_steal(conn, trans);
talloc_set_destructor(trans, destroy_transaction);
- send_ack(conn, XS_TRANSACTION_START);
+
+ sprintf(id_str, "%u", trans->id);
+ send_reply(conn, XS_TRANSACTION_START, id_str, strlen(id_str)+1);
}
void do_transaction_end(struct connection *conn, const char *arg)
@@ -149,13 +171,13 @@
return;
}
- if (!conn->transaction) {
+ if ((trans = conn->transaction) == NULL) {
send_error(conn, ENOENT);
return;
}
- trans = conn->transaction;
conn->transaction = NULL;
+ list_del(&trans->list);
/* Attach transaction to arg for auto-cleanup */
talloc_steal(arg, trans);
@@ -181,3 +203,12 @@
send_ack(conn, XS_TRANSACTION_END);
}
+/*
+ * Local variables:
+ * c-file-style: "linux"
+ * indent-tabs-mode: t
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * tab-width: 8
+ * End:
+ */
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_transaction.h
--- a/tools/xenstore/xenstored_transaction.h Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_transaction.h Mon Oct 10 14:38:01 2005
@@ -25,10 +25,11 @@
void do_transaction_start(struct connection *conn, struct buffered_data *node);
void do_transaction_end(struct connection *conn, const char *arg);
-bool transaction_block(struct connection *conn);
+struct transaction *transaction_lookup(struct connection *conn, u32 id);
/* This node was changed: can fail and longjmp. */
-void add_change_node(struct transaction *trans, const char *node, bool
recurse);
+void add_change_node(struct transaction *trans, const char *node,
+ bool recurse);
/* Return tdb context to use for this connection. */
TDB_CONTEXT *tdb_transaction_context(struct transaction *trans);
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xs.c
--- a/tools/xenstore/xs.c Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xs.c Mon Oct 10 14:38:01 2005
@@ -75,36 +75,9 @@
/* One request at a time. */
pthread_mutex_t request_mutex;
-
- /* One transaction at a time. */
- pthread_mutex_t transaction_mutex;
- pthread_t transaction_pthread;
};
-struct xs_transaction_handle {
- int id;
-};
-
static void *read_thread(void *arg);
-
-static void request_mutex_acquire(struct xs_handle *h)
-{
- /*
- * We can't distinguish non-transactional from transactional
- * requests right now. So temporarily acquire the transaction mutex
- * if this task is outside transaction context.
- */
- if (h->transaction_pthread != pthread_self())
- pthread_mutex_lock(&h->transaction_mutex);
- pthread_mutex_lock(&h->request_mutex);
-}
-
-static void request_mutex_release(struct xs_handle *h)
-{
- pthread_mutex_unlock(&h->request_mutex);
- if (h->transaction_pthread != pthread_self())
- pthread_mutex_unlock(&h->transaction_mutex);
-}
int xs_fileno(struct xs_handle *h)
{
@@ -186,8 +159,6 @@
pthread_cond_init(&h->reply_condvar, NULL);
pthread_mutex_init(&h->request_mutex, NULL);
- pthread_mutex_init(&h->transaction_mutex, NULL);
- h->transaction_pthread = -1;
if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
goto error;
@@ -223,7 +194,6 @@
{
struct xs_stored_msg *msg, *tmsg;
- pthread_mutex_lock(&h->transaction_mutex);
pthread_mutex_lock(&h->request_mutex);
pthread_mutex_lock(&h->reply_mutex);
pthread_mutex_lock(&h->watch_mutex);
@@ -242,7 +212,6 @@
free(msg);
}
- pthread_mutex_unlock(&h->transaction_mutex);
pthread_mutex_unlock(&h->request_mutex);
pthread_mutex_unlock(&h->reply_mutex);
pthread_mutex_unlock(&h->watch_mutex);
@@ -321,8 +290,10 @@
}
/* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
-static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type,
- const struct iovec *iovec, unsigned int num_vecs,
+static void *xs_talkv(struct xs_handle *h, struct xs_transaction_handle *t,
+ enum xsd_sockmsg_type type,
+ const struct iovec *iovec,
+ unsigned int num_vecs,
unsigned int *len)
{
struct xsd_sockmsg msg;
@@ -331,6 +302,7 @@
unsigned int i;
struct sigaction ignorepipe, oldact;
+ msg.tx_id = (u32)(unsigned long)t;
msg.type = type;
msg.len = 0;
for (i = 0; i < num_vecs; i++)
@@ -341,7 +313,7 @@
ignorepipe.sa_flags = 0;
sigaction(SIGPIPE, &ignorepipe, &oldact);
- request_mutex_acquire(h);
+ pthread_mutex_lock(&h->request_mutex);
if (!xs_write_all(h->fd, &msg, sizeof(msg)))
goto fail;
@@ -354,7 +326,7 @@
if (!ret)
goto fail;
- request_mutex_release(h);
+ pthread_mutex_unlock(&h->request_mutex);
sigaction(SIGPIPE, &oldact, NULL);
if (msg.type == XS_ERROR) {
@@ -375,7 +347,7 @@
fail:
/* We're in a bad state, so close fd. */
saved_errno = errno;
- request_mutex_release(h);
+ pthread_mutex_unlock(&h->request_mutex);
sigaction(SIGPIPE, &oldact, NULL);
close_fd:
close(h->fd);
@@ -393,14 +365,16 @@
}
/* Simplified version of xs_talkv: single message. */
-static void *xs_single(struct xs_handle *h, enum xsd_sockmsg_type type,
- const char *string, unsigned int *len)
+static void *xs_single(struct xs_handle *h, struct xs_transaction_handle *t,
+ enum xsd_sockmsg_type type,
+ const char *string,
+ unsigned int *len)
{
struct iovec iovec;
iovec.iov_base = (void *)string;
iovec.iov_len = strlen(string) + 1;
- return xs_talkv(h, type, &iovec, 1, len);
+ return xs_talkv(h, t, type, &iovec, 1, len);
}
static bool xs_bool(char *reply)
@@ -417,7 +391,7 @@
char *strings, *p, **ret;
unsigned int len;
- strings = xs_single(h, XS_DIRECTORY, path, &len);
+ strings = xs_single(h, t, XS_DIRECTORY, path, &len);
if (!strings)
return NULL;
@@ -446,7 +420,7 @@
void *xs_read(struct xs_handle *h, struct xs_transaction_handle *t,
const char *path, unsigned int *len)
{
- return xs_single(h, XS_READ, path, len);
+ return xs_single(h, t, XS_READ, path, len);
}
/* Write the value of a single file.
@@ -462,7 +436,8 @@
iovec[1].iov_base = (void *)data;
iovec[1].iov_len = len;
- return xs_bool(xs_talkv(h, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
+ return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
+ ARRAY_SIZE(iovec), NULL));
}
/* Create a new directory.
@@ -471,7 +446,7 @@
bool xs_mkdir(struct xs_handle *h, struct xs_transaction_handle *t,
const char *path)
{
- return xs_bool(xs_single(h, XS_MKDIR, path, NULL));
+ return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
}
/* Destroy a file or directory (directories must be empty).
@@ -480,7 +455,7 @@
bool xs_rm(struct xs_handle *h, struct xs_transaction_handle *t,
const char *path)
{
- return xs_bool(xs_single(h, XS_RM, path, NULL));
+ return xs_bool(xs_single(h, t, XS_RM, path, NULL));
}
/* Get permissions of node (first element is owner).
@@ -494,7 +469,7 @@
unsigned int len;
struct xs_permissions *ret;
- strings = xs_single(h, XS_GET_PERMS, path, &len);
+ strings = xs_single(h, t, XS_GET_PERMS, path, &len);
if (!strings)
return NULL;
@@ -544,7 +519,7 @@
goto unwind;
}
- if (!xs_bool(xs_talkv(h, XS_SET_PERMS, iov, 1+num_perms, NULL)))
+ if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
goto unwind;
for (i = 0; i < num_perms; i++)
free(iov[i+1].iov_base);
@@ -571,7 +546,8 @@
iov[1].iov_base = (void *)token;
iov[1].iov_len = strlen(token) + 1;
- return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
+ return xs_bool(xs_talkv(h, NULL, XS_WATCH, iov,
+ ARRAY_SIZE(iov), NULL));
}
/* Find out what node change was on (will block if nothing pending).
@@ -637,7 +613,8 @@
iov[1].iov_base = (char *)token;
iov[1].iov_len = strlen(token) + 1;
- return xs_bool(xs_talkv(h, XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
+ return xs_bool(xs_talkv(h, NULL, XS_UNWATCH, iov,
+ ARRAY_SIZE(iov), NULL));
}
/* Start a transaction: changes by others will not be seen during this
@@ -647,18 +624,17 @@
*/
struct xs_transaction_handle *xs_transaction_start(struct xs_handle *h)
{
- bool rc;
-
- pthread_mutex_lock(&h->transaction_mutex);
- h->transaction_pthread = pthread_self();
-
- rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
- if (!rc) {
- h->transaction_pthread = -1;
- pthread_mutex_unlock(&h->transaction_mutex);
- }
-
- return (struct xs_transaction_handle *)rc;
+ char *id_str;
+ unsigned long id;
+
+ id_str = xs_single(h, NULL, XS_TRANSACTION_START, "", NULL);
+ if (id_str == NULL)
+ return NULL;
+
+ id = strtoul(id_str, NULL, 0);
+ free(id_str);
+
+ return (struct xs_transaction_handle *)id;
}
/* End a transaction.
@@ -670,22 +646,13 @@
bool abort)
{
char abortstr[2];
- bool rc;
-
- if (t == NULL)
- return -EINVAL;
if (abort)
strcpy(abortstr, "F");
else
strcpy(abortstr, "T");
- rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
-
- h->transaction_pthread = -1;
- pthread_mutex_unlock(&h->transaction_mutex);
-
- return rc;
+ return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
}
/* Introduce a new domain.
@@ -713,7 +680,8 @@
iov[3].iov_base = (char *)path;
iov[3].iov_len = strlen(path) + 1;
- return xs_bool(xs_talkv(h, XS_INTRODUCE, iov, ARRAY_SIZE(iov), NULL));
+ return xs_bool(xs_talkv(h, NULL, XS_INTRODUCE, iov,
+ ARRAY_SIZE(iov), NULL));
}
bool xs_release_domain(struct xs_handle *h, domid_t domid)
@@ -722,7 +690,7 @@
sprintf(domid_str, "%u", domid);
- return xs_bool(xs_single(h, XS_RELEASE, domid_str, NULL));
+ return xs_bool(xs_single(h, NULL, XS_RELEASE, domid_str, NULL));
}
char *xs_get_domain_path(struct xs_handle *h, domid_t domid)
@@ -731,7 +699,7 @@
sprintf(domid_str, "%u", domid);
- return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
+ return xs_single(h, NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
}
/* Only useful for DEBUG versions */
@@ -745,7 +713,8 @@
iov[1].iov_base = data;
iov[1].iov_len = len;
- return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
+ return xs_talkv(h, NULL, XS_DEBUG, iov,
+ ARRAY_SIZE(iov), NULL);
}
static void *read_thread(void *arg)
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog
|