ChangeSet 1.1336.1.1, 2005/03/22 15:50:43+00:00, jrb44@xxxxxxxxxxxxxxxxxx
Added asynchronous support to the blockstore.
blockstore.c | 531 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
blockstore.h | 4
2 files changed, 455 insertions(+), 80 deletions(-)
diff -Nru a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c
--- a/tools/blktap/blockstore.c 2005-03-22 15:03:49 -05:00
+++ b/tools/blktap/blockstore.c 2005-03-22 15:03:49 -05:00
@@ -13,31 +13,73 @@
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <stdarg.h>
#include "blockstore.h"
#define BLOCKSTORE_REMOTE
+//#define BSDEBUG
-#ifdef BLOCKSTORE_REMOTE
+/*****************************************************************************
+ * Debugging
+ */
+#ifdef BSDEBUG
+void DB(char *format, ...)
+{
+ va_list args;
+
+ va_start(args, format);
+ vfprintf(stderr, format, args);
+ va_end(args);
+}
+#else
+#define DB(format, ...) (void)0
+#endif
-//#define BSDEBUG
+#ifdef BLOCKSTORE_REMOTE
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <netdb.h>
-#define ENTER_QUEUE_CR (void)0
-#define LEAVE_QUEUE_CR (void)0
+/*****************************************************************************
+ * *
+ *****************************************************************************/
+
+/*****************************************************************************
+ * Network state *
+ *****************************************************************************/
+/* The individual disk servers we talks to. These will be referenced by
+ * an integer index into bsservers[].
+ */
bsserver_t bsservers[MAX_SERVERS];
+
+/* The cluster map. This is indexed by an integer cluster number.
+ */
bscluster_t bsclusters[MAX_CLUSTERS];
+/* Local socket.
+ */
struct sockaddr_in sin_local;
int bssock = 0;
+/*****************************************************************************
+ * Message queue management *
+ *****************************************************************************/
+
+/* Protects the queue manipulation critcal regions.
+ */
+#define ENTER_QUEUE_CR (void)0
+#define LEAVE_QUEUE_CR (void)0
+
+/* A message queue entry. We allocate one of these for every request we send.
+ * Asynchronous reply reception also used one of these.
+ */
typedef struct bsq_t_struct {
struct bsq_t_struct *prev;
struct bsq_t_struct *next;
+ int status;
int server;
int length;
struct msghdr msghdr;
@@ -46,8 +88,134 @@
void *block;
} bsq_t;
+#define BSQ_STATUS_MATCHED 1
+
+#define ENTER_LUID_CR (void)0
+#define LEAVE_LUID_CR (void)0
+
+static u64 luid_cnt = 0x1000ULL;
+u64 new_luid(void) {
+ u64 luid;
+ ENTER_LUID_CR;
+ luid = luid_cnt++;
+ LEAVE_LUID_CR;
+ return luid;
+}
+
+/* Queue of outstanding requests.
+ */
bsq_t *bs_head = NULL;
bsq_t *bs_tail = NULL;
+int bs_qlen = 0;
+
+/*
+ */
+void queuedebug(char *msg) {
+ bsq_t *q;
+ ENTER_QUEUE_CR;
+ fprintf(stderr, "Q: %s len=%u\n", msg, bs_qlen);
+ for (q = bs_head; q; q = q->next) {
+ fprintf(stderr, " luid=%016llx server=%u\n",
+ q->message.luid, q->server);
+ }
+ LEAVE_QUEUE_CR;
+}
+
+int enqueue(bsq_t *qe) {
+ ENTER_QUEUE_CR;
+ qe->next = NULL;
+ qe->prev = bs_tail;
+ if (!bs_head)
+ bs_head = qe;
+ else
+ bs_tail->next = qe;
+ bs_tail = qe;
+ bs_qlen++;
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("enqueue");
+#endif
+ return 0;
+}
+
+int dequeue(bsq_t *qe) {
+ bsq_t *q;
+ ENTER_QUEUE_CR;
+ for (q = bs_head; q; q = q->next) {
+ if (q == qe) {
+ if (q->prev)
+ q->prev->next = q->next;
+ else
+ bs_head = q->next;
+ if (q->next)
+ q->next->prev = q->prev;
+ else
+ bs_tail = q->prev;
+ bs_qlen--;
+ goto found;
+ }
+ }
+
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("dequeue not found");
+#endif
+ return 0;
+
+ found:
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("dequeue not found");
+#endif
+ return 1;
+}
+
+bsq_t *queuesearch(bsq_t *qe) {
+ bsq_t *q;
+ ENTER_QUEUE_CR;
+ for (q = bs_head; q; q = q->next) {
+ if ((qe->server == q->server) &&
+ (qe->message.operation == q->message.operation) &&
+ (qe->message.luid == q->message.luid)) {
+
+ if ((q->message.operation == BSOP_READBLOCK) &&
+ ((q->message.flags & BSOP_FLAG_ERROR) == 0)) {
+ q->block = qe->block;
+ qe->block = NULL;
+ }
+ q->length = qe->length;
+ q->message.flags = qe->message.flags;
+ q->message.id = qe->message.id;
+ q->status |= BSQ_STATUS_MATCHED;
+
+ if (q->prev)
+ q->prev->next = q->next;
+ else
+ bs_head = q->next;
+ if (q->next)
+ q->next->prev = q->prev;
+ else
+ bs_tail = q->prev;
+ q->next = NULL;
+ q->prev = NULL;
+ bs_qlen--;
+ goto found;
+ }
+ }
+
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("queuesearch not found");
+#endif
+ return NULL;
+
+ found:
+ LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+ queuedebug("queuesearch found");
+#endif
+ return q;
+}
int send_message(bsq_t *qe) {
int rc;
@@ -71,16 +239,21 @@
qe->iov[1].iov_len = BLOCK_SIZE;
}
- rc = sendmsg(bssock, &(qe->msghdr), 0);
+ qe->message.luid = new_luid();
+
+ qe->status = 0;
+ if (enqueue(qe) < 0) {
+ fprintf(stderr, "Error enqueuing request.\n");
+ return -1;
+ }
+
+ DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
+ rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
//rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
// (struct sockaddr *)&(bsservers[qe->server].sin),
// sizeof(struct sockaddr_in));
if (rc < 0)
return rc;
-
- ENTER_QUEUE_CR;
-
- LEAVE_QUEUE_CR;
return rc;
}
@@ -115,22 +288,148 @@
return rc;
}
+int get_server_number(struct sockaddr_in *sin) {
+ int i;
+
-------------------------------------------------------
This SF.net email is sponsored by: 2005 Windows Mobile Application Contest
Submit applications for Windows Mobile(tm)-based Pocket PCs or Smartphones
for the chance to win $25,000 and application distribution. Enter today at
http://ads.osdn.com/?ad_id=6882&alloc_id=15148&op=click
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxxxx
https://lists.sourceforge.net/lists/listinfo/xen-changelog
|