[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 11 of 11] blktap2: add remus driver



# HG changeset patch
# User Brendan Cully <brendan@xxxxxxxxx>
# Date 1252530408 25200
# Node ID aaf56934865afa3f5611cd891347953cdd1e5729
# Parent  5e0779189ef3b9382675b2e574b75936a6e9fb15
blktap2: add remus driver

Blktap2 port of remus disk driver. Backwards compatable with blktap1
implementation.

Signed-off-by: Ryan O'Connor <rjo@xxxxxxxxx>
Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>

diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -36,7 +36,7 @@
 CRYPT_LIB += -lcrypto
 endif
 
-LDFLAGS_img := $(CRYPT_LIB) -lpthread -lz
+LDFLAGS_img := $(CRYPT_LIB) -lpthread -lz -lm
 
 LIBS += -L$(LIBVHDDIR) -lvhd
 
@@ -44,6 +44,14 @@
 LIBS += -luuid
 endif
 
+REMUS-OBJS  := block-remus.o
+REMUS-OBJS  += hashtable.o
+REMUS-OBJS  += hashtable_itr.o
+REMUS-OBJS  += hashtable_utility.o
+
+$(REMUS-OBJS): CFLAGS += -fgnu89-inline -I$(XEN_XENSTORE)
+
+
 LIBAIO_DIR = $(XEN_ROOT)/tools/libaio/src
 tapdisk2 tapdisk-stream tapdisk-diff $(QCOW_UTIL): AIOLIBS := 
$(LIBAIO_DIR)/libaio.a
 tapdisk-client tapdisk-stream tapdisk-diff $(QCOW_UTIL): CFLAGS  += 
-I$(LIBAIO_DIR) -I$(XEN_LIBXC)
@@ -81,6 +89,7 @@
 BLK-OBJS-y  += block-qcow.o
 BLK-OBJS-y  += aes.o
 BLK-OBJS-y  += $(PORTABLE-OBJS-y)
+BLK-OBJS-y  += $(REMUS-OBJS)
 
 all: $(IBIN) lock-util qcow-util
 
diff --git a/tools/blktap2/drivers/block-remus.c 
b/tools/blktap2/drivers/block-remus.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap2/drivers/block-remus.c
@@ -0,0 +1,1670 @@
+/* block-remus.c
+ *
+ * This disk sends all writes to a backup via a network interface before
+ * passing them to an underlying device.
+ * The backup is a bit more complicated:
+ *  1. It applies all incoming writes to a ramdisk.
+ *  2. When a checkpoint request arrives, it moves the ramdisk to
+ *     a committing state and uses a new ramdisk for subsequent writes.
+ *     It also acknowledges the request, to let the sender know it can
+ *     release output.
+ *  3. The ramdisk flushes its contents to the underlying driver.
+ *  4. At failover, the backup waits for the in-flight ramdisk (if any) to
+ *     drain before letting the domain be activated.
+ *
+ * The driver determines whether it is the client or server by attempting
+ * to bind to the replication address. If the address is not local,
+ * the driver acts as client.
+ *
+ * The following messages are defined for the replication stream:
+ * 1. write request
+ *    "wreq"      4
+ *    num_sectors 4
+ *    sector      8
+ *    buffer      (num_sectors * sector_size)
+ * 2. submit request (may be used as a barrier
+ *    "sreq"      4
+ * 3. commit request
+ *    "creq"      4
+ * After a commit request, the client must wait for a competion message:
+ * 4. completion
+ *    "done"      4
+ */
+
+/* due to architectural choices in tapdisk, block-buffer is forced to
+ * reimplement some code which is meant to be private */
+#define TAPDISK
+#include "tapdisk.h"
+#include "tapdisk-server.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-interface.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+#include "hashtable_utility.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/param.h>
+#include <sys/sysctl.h>
+#include <unistd.h>
+
+/* timeout for reads and writes in ms */
+#define NET_TIMEOUT 500
+#define RAMDISK_HASHSIZE 128
+
+/* connect retry timeout (seconds) */
+#define REMUS_CONNRETRY_TIMEOUT 10
+
+#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
+
+enum tdremus_mode {
+  mode_invalid = 0,
+  mode_unprotected,
+  mode_primary,
+  mode_backup
+};
+
+struct tdremus_req {
+  uint64_t sector;
+  int nb_sectors;
+  char buf[4096];
+};
+
+struct req_ring {
+  /* waste one slot to distinguish between empty and full */
+  struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
+  unsigned int head;
+  unsigned int tail;
+};
+
+/* TODO: This isn't very pretty, but to properly generate our own treqs (needed
+ * by the backup) we need to know our td_vbt_t and td_image_t (blktap2
+ * internals). As a proper fix, we should consider extending the tapdisk
+ * interface with a td_create_request() function, or something similar.
+ *
+ * For now, we just grab the vbd in the td_open() command, and the td_image_t
+ * from the first read request.
+ */
+td_vbd_t *device_vbd = NULL;
+td_image_t *remus_image = NULL;
+
+struct ramdisk {
+  size_t sector_size;
+  struct hashtable* h;
+  /* when a ramdisk is flushed, h is given a new empty hash for writes
+   * while the old ramdisk (prev) is drained asynchronously. To avoid
+   * a race where a read request points to a sector in prev which has
+   * not yet been flushed, check prev on a miss in h */
+  struct hashtable* prev;
+  /* count of outstanding requests to the base driver */
+  size_t inflight;
+};
+
+/* the ramdisk intercepts the original callback for reads and writes.
+ * This holds the original data. */
+/* Might be worth making this a static array in struct ramdisk to avoid
+ * a malloc per request */
+
+struct tdremus_state;
+
+struct ramdisk_cbdata {
+  td_callback_t cb;
+  void* private;
+  char* buf;
+  struct tdremus_state* state;
+};
+
+struct ramdisk_write_cbdata {
+  struct tdremus_state* state;
+  char* buf;
+};
+
+typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
+
+/* poll_fd type for blktap2 fd system. taken from block_log.c */
+typedef struct poll_fd {
+  int        fd;
+  event_id_t id;
+} poll_fd_t;
+
+struct tdremus_state {
+//  struct tap_disk* driver;
+  void* driver_data;
+
+  /* XXX: this is needed so that the server can perform operations on
+   * the driver from the stream_fd event handler. fix this. */
+  td_driver_t *tdremus_driver;
+
+  /* TODO: we may wish to replace these two FIFOs with a unix socket */
+  char*     ctl_path; /* receive flush instruction here */
+  poll_fd_t ctl_fd;     /* io_fd slot for control FIFO */
+  char*     msg_path; /* output completion message here */
+  poll_fd_t msg_fd;
+
+  /* replication host */
+  struct sockaddr_in sa;
+  poll_fd_t server_fd;    /* server listen port */
+  poll_fd_t stream_fd;     /* replication channel */
+
+  /* queue write requests, batch-replicate at submit */
+  struct req_ring write_ring;
+
+  /* ramdisk data*/
+  struct ramdisk ramdisk;
+
+  /* mode methods */
+  enum tdremus_mode mode;
+  int (*queue_flush)(td_driver_t *driver);
+};
+
+typedef struct tdremus_wire {
+  uint32_t op;
+  uint64_t id;
+  uint64_t sec;
+  uint32_t secs;
+} tdremus_wire_t;
+
+#define TDREMUS_READ "rreq"
+#define TDREMUS_WRITE "wreq"
+#define TDREMUS_SUBMIT "sreq"
+#define TDREMUS_COMMIT "creq"
+#define TDREMUS_DONE "done"
+#define TDREMUS_FAIL "fail"
+
+/* primary read/write functions */
+static void primary_queue_read(td_driver_t *driver, td_request_t treq);
+static void primary_queue_write(td_driver_t *driver, td_request_t treq);
+
+/* backup read/write functions */
+static void backup_queue_read(td_driver_t *driver, td_request_t treq);
+static void backup_queue_write(td_driver_t *driver, td_request_t treq);
+
+/* unpritected read/write functions */
+static void unprotected_queue_read(td_driver_t *driver, td_request_t treq);
+static void unprotected_queue_write(td_driver_t *driver, td_request_t treq);
+
+static int tdremus_close(td_driver_t *driver);
+
+static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
+static int ctl_respond(struct tdremus_state *s, const char *response);
+
+/* ring functions */
+static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
+{
+  if (++pos >= MAX_REQUESTS * 2 + 1)
+    return 0;
+
+  return pos;
+}
+
+static inline int ring_isempty(struct req_ring* ring)
+{
+  return ring->head == ring->tail;
+}
+
+static inline int ring_isfull(struct req_ring* ring)
+{
+  return ring_next(ring, ring->tail) == ring->head;
+}
+
+/* functions to create and sumbit treq's */
+
+static void
+replicated_write_callback(td_request_t treq, int err)
+{
+  struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
+  td_vbd_request_t *vreq;
+
+  vreq = (td_vbd_request_t *) treq.private;
+
+  /* the write failed for now, lets panic. this is very bad */
+  if (err) {
+    RPRINTF("ramdisk write failed, disk image is not consistent\n");
+    exit(-1);
+  }
+
+  /* The write succeeded. let's pull the vreq off whatever request list
+   * it is on and free() it */
+  list_del(&vreq->next);
+  free(vreq);
+
+  s->ramdisk.inflight--;
+  if (!s->ramdisk.inflight && !s->ramdisk.prev) {
+    /* TODO: the ramdisk has been flushed */
+  }
+}
+
+static inline int
+create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, 
char *buf)
+{
+  td_request_t treq;
+  td_vbd_request_t *vreq;
+  
+  treq.op      = TD_OP_WRITE;
+  treq.buf     = buf;
+  treq.sec     = sec;
+  treq.secs    = secs;
+  treq.image   = remus_image;
+  treq.cb      = replicated_write_callback;
+  treq.cb_data = state;
+  treq.id      = 0;
+  treq.sidx    = 0;
+
+  vreq         = calloc(1, sizeof(td_vbd_request_t));
+  treq.private = vreq;
+
+  if(!vreq)
+    return -1;
+  
+  vreq->submitting = 1;
+  INIT_LIST_HEAD(&vreq->next);
+  tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
+
+  /* TODO:
+   * we should probably leave it up to the caller to forward the request */
+  td_forward_request(treq);
+
+  vreq->submitting--;
+ 
+  return 0; 
+}
+
+
+/* ramdisk methods */
+static int ramdisk_flush(td_driver_t *driver, struct tdremus_state *s);
+
+/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
+static unsigned int uint64_hash(void* k)
+{
+  uint64_t key = *(uint64_t*)k;
+
+  key = (~key) + (key << 18);
+  key = key ^ (key >> 31);
+  key = key * 21;
+  key = key ^ (key >> 11);
+  key = key + (key << 6);
+  key = key ^ (key >> 22);
+  
+  return (unsigned int)key;
+}
+
+static int rd_hash_equal(void* k1, void* k2)
+{
+  uint64_t key1, key2;
+
+  key1 = *(uint64_t*)k1;
+  key2 = *(uint64_t*)k2;
+
+  return key1 == key2;
+}
+
+static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
+                       int nb_sectors, char* buf)
+{
+  int i;
+  char* v;
+  uint64_t key;
+
+  for (i = 0; i < nb_sectors; i++) {
+    key = sector + i;
+    if (!(v = hashtable_search(ramdisk->h, &key))) {
+      /* check whether it is queued in a previous flush request */
+      if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key))))
+       return -1;
+    }
+    memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
+  }
+
+  return 0;
+}
+
+static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
+                             size_t len)
+{
+  char* v;
+  uint64_t* key;
+  
+  if ((v = hashtable_search(h, &sector))) {
+    memcpy(v, buf, len);
+    return 0;
+  }
+
+  if (!(v = malloc(len))) {
+    DPRINTF("ramdisk_write_hash: malloc failed\n");
+    return -1;
+  }
+  memcpy(v, buf, len);
+  if (!(key = malloc(sizeof(*key)))) {
+    DPRINTF("ramdisk_write_hash: error allocating key\n");
+    free(v);
+    return -1;
+  }
+  *key = sector;
+  if (!hashtable_insert(h, key, v)) {
+    DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
+    free(key);
+    free(v);
+    return -1;
+  }
+
+  return 0;
+}
+
+static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
+                               int nb_sectors, char* buf)
+{
+  int i, rc;
+
+  for (i = 0; i < nb_sectors; i++) {
+    rc = ramdisk_write_hash(ramdisk->h, sector + i,
+                           buf + i * ramdisk->sector_size,
+                           ramdisk->sector_size);
+    if (rc)
+      return rc;
+  }
+
+  return 0;
+}
+
+static int ramdisk_write_cb(td_driver_t *driver, int res, uint64_t sector,
+                           int nb_sectors, int id, void* private)
+{
+  struct ramdisk_write_cbdata *cbdata = (struct ramdisk_write_cbdata*)private;
+  struct tdremus_state *s = cbdata->state;
+  int rc;
+
+  /*
+  RPRINTF("ramdisk write callback: rc %d, %d sectors @ %" PRIu64 "\n", res, 
nb_sectors,
+         sector);
+  */
+
+  free(cbdata->buf);
+  free(cbdata);
+
+  s->ramdisk.inflight--;
+  if (!s->ramdisk.inflight && !s->ramdisk.prev) {
+    /* when this reaches 0 and prev is empty, the disk is flushed. */
+    /*
+    RPRINTF("ramdisk flush complete\n");
+    */
+  }
+
+  if (s->ramdisk.prev) {
+    /* resubmit as much as possible in the remaining disk */
+    /*
+    RPRINTF("calling ramdisk_flush from write callback\n");
+    */
+    return ramdisk_flush(driver, s);
+  }
+
+  return 0;
+}
+
+static int uint64_compare(const void* k1, const void* k2)
+{
+  uint64_t u1 = *(uint64_t*)k1;
+  uint64_t u2 = *(uint64_t*)k2;
+
+  /* u1 - u2 is unsigned */
+  return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
+}
+
+/* set psectors to an array of the sector numbers in the hash, returning
+ * the number of entries (or -1 on error) */
+static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
+{
+  struct hashtable_itr* itr;
+  uint64_t* sectors;
+  int count;
+
+  if (!(count = hashtable_count(h)))
+    return 0;
+
+  if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
+    DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
+    return -1;
+  }
+  sectors = *psectors;
+
+  itr = hashtable_iterator(h);
+  count = 0;
+  do {
+    sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
+  } while (hashtable_iterator_advance(itr));
+  free(itr);
+
+  return count;
+}
+
+static char* merge_requests(struct ramdisk* ramdisk, uint64_t start,
+                           size_t count)
+{
+  char* buf;
+  char* sector;
+  int i;
+
+  if (!(buf = valloc(count * ramdisk->sector_size))) {
+    DPRINTF("merge_request: allocation failed\n");
+    return NULL;
+  }
+
+  for (i = 0; i < count; i++) {
+    if (!(sector = hashtable_search(ramdisk->prev, &start))) {
+      DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
+      return NULL;
+    }
+
+    memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
+    free(sector);
+
+    start++;
+  }
+
+  return buf;
+}
+
+/* The underlying driver may not handle having the whole ramdisk queued at
+ * once. We queue what we can and let the callbacks attempt to queue more. */
+/* NOTE: may be called from callback, while dd->private still belongs to
+ * the underlying driver */
+static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
+{
+  uint64_t* sectors;
+  char* buf;
+  uint64_t base, batchlen;
+  int i, j, count = 0;
+
+  // RPRINTF("ramdisk flush\n");
+
+  if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
+    return count;
+
+  /*
+  RPRINTF("ramdisk: flushing %d sectors\n", count);
+  */
+  
+  /* sort and merge sectors to improve disk performance */
+  qsort(sectors, count, sizeof(*sectors), uint64_compare);
+
+  for (i = 0; i < count;) {
+    base = sectors[i++];
+    while (i < count && sectors[i] == sectors[i-1] + 1)
+      i++;
+    batchlen = sectors[i-1] - base + 1;
+
+    if (!(buf = merge_requests(&s->ramdisk, base, batchlen))) {
+      RPRINTF("ramdisk_flush: merge_requests failed\n");
+      free(sectors);
+      return -1;
+    }
+
+    /* NOTE: create_write_request() creates a treq AND forwards it down
+     * the driver chain */
+    // RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 
"\n", base, batchlen);
+    create_write_request(s, base, batchlen, buf);
+    //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", 
base, batchlen);
+
+    s->ramdisk.inflight++;
+    
+    for (j = 0; j < batchlen; j++) {
+      hashtable_remove(s->ramdisk.prev, &base);
+      base++;
+    }
+  }
+
+  if (!hashtable_count(s->ramdisk.prev)) {
+    /* everything is in flight */
+    hashtable_destroy(s->ramdisk.prev, 0);
+    s->ramdisk.prev = NULL;
+  }
+  
+  free(sectors);
+
+  // RPRINTF("ramdisk flush done\n");
+  return 0;
+}
+
+/* flush ramdisk contents to disk */
+static int ramdisk_start_flush(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+  uint64_t* key;
+  char* buf;
+  int rc = 0;
+  int i, j, count, batchlen;
+  uint64_t* sectors;
+
+  if (!hashtable_count(s->ramdisk.h)) {
+    /*
+    RPRINTF("Nothing to flush\n");
+    */
+    return 0;
+  }
+
+  if (s->ramdisk.prev) {
+    /* a flush request issued while a previous flush is still in progress
+     * will merge with the previous request. If you want the previous
+     * request to be consistent, wait for it to complete. */
+    if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
+      return count;
+
+    for (i = 0; i < count; i++) {
+      buf = hashtable_search(s->ramdisk.h, sectors + i);
+      ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
+                        s->ramdisk.sector_size);
+    }
+    free(sectors);
+
+    hashtable_destroy (s->ramdisk.h, 0);
+  } else
+    s->ramdisk.prev = s->ramdisk.h;
+
+  /* We create a new hashtable so that new writes can be performed before
+   * the old hashtable is completely drained. */
+  s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
+                                 rd_hash_equal);
+
+  return ramdisk_flush(driver, s);
+}
+
+
+static int ramdisk_start(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  if (s->ramdisk.h) {
+    RPRINTF("ramdisk already allocated\n");
+    return 0;
+  }
+
+  s->ramdisk.sector_size = driver->info.sector_size;
+  s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
+                                 rd_hash_equal);
+
+  DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
+  
+  return 0;
+}
+
+/* common client/server functions */
+/* mayberead: Time out after a certain interval. */
+static int mread(int fd, void* buf, size_t len)
+{
+  fd_set rfds;
+  int rc;
+  size_t cur = 0;
+  struct timeval tv = {
+    .tv_sec = 0,
+    .tv_usec = NET_TIMEOUT * 1000
+  };
+
+  if (!len)
+    return 0;
+
+  /* read first. Only select if read is incomplete. */
+  rc = read(fd, buf, len);
+  while (rc < 0 || cur + rc < len) {
+    if (!rc) {
+      RPRINTF("end-of-file");
+      return -1;
+    }
+    if (rc < 0 && errno != EAGAIN) {
+       RPRINTF("error during read: %s\n", strerror(errno));
+       return -1;
+    }
+    if (rc > 0)
+      cur += rc;
+
+    FD_ZERO(&rfds);
+    FD_SET(fd, &rfds);
+    if (!(rc = select(fd + 1, &rfds, NULL, NULL, &tv))) {
+      RPRINTF("time out during read\n");
+      return -1;
+    } else if (rc < 0) {
+      RPRINTF("error during select: %d\n", errno);
+      return -1;
+    }
+    rc = read(fd, buf + cur, len - cur);
+  }
+  /*
+  RPRINTF("read %d bytes\n", cur + rc);
+  */
+
+  return 0;
+}
+
+static int mwrite(int fd, void* buf, size_t len)
+{
+  fd_set wfds;
+  size_t cur = 0;
+  int rc;
+  struct timeval tv = {
+    .tv_sec = 0,
+    .tv_usec = NET_TIMEOUT * 1000 
+  };
+
+  if (!len)
+    return 0;
+
+  /* read first. Only select if read is incomplete. */
+  rc = write(fd, buf, len);
+  while (rc < 0 || cur + rc < len) {
+    if (!rc) {
+      RPRINTF("end-of-file");
+      return -1;
+    }
+    if (rc < 0 && errno != EAGAIN) {
+       RPRINTF("error during write: %s\n", strerror(errno));
+       return -1;
+    }
+    if (rc > 0)
+      cur += rc;
+
+    FD_ZERO(&wfds);
+    FD_SET(fd, &wfds);
+    if (!(rc = select(fd + 1, NULL, &wfds, NULL, &tv))) {
+      RPRINTF("time out during write\n");
+      return -1;
+    } else if (rc < 0) {
+      RPRINTF("error during select: %d\n", errno);
+      return -1;
+    }
+    rc = write(fd, buf + cur, len - cur);
+  }
+  /*
+  RPRINTF("wrote %d bytes\n", cur + rc);
+  */
+
+  return 0;
+  FD_ZERO(&wfds);
+  FD_SET(fd, &wfds);
+  select(fd + 1, NULL, &wfds, NULL, &tv);
+}
+
+
+static void inline close_stream_fd(struct tdremus_state *s)
+{
+    /* XXX: -2 is magic. replace with macro perhaps? */
+    tapdisk_server_unregister_event(s->stream_fd.id);
+    close(s->stream_fd.fd);
+    s->stream_fd.fd = -2;
+}
+
+/* primary functions */
+static void remus_client_event(event_id_t, char mode, void *private);
+static void remus_connect_event(event_id_t id, char mode, void *private);
+static void remus_retry_connect_event(event_id_t id, char mode, void *private);
+
+static int primary_do_connect(struct tdremus_state *state)
+{
+  event_id_t id;
+  int fd;
+  int rc;
+  int flags;
+
+  RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), 
ntohs(state->sa.sin_port));
+
+  if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+    RPRINTF("could not create client socket: %d\n", errno);
+    return -1;
+  }
+
+  /* make socket nonblocking */
+  if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+    flags = 0;
+  if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
+    return -1;
+
+  /* once we have created the socket and populated the address, we can now 
start
+   * our non-blocking connect. rather than duplicating code we trigger a 
timeout
+   * on the socket fd, which calls out nonblocking connect code
+   */
+  if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, 
remus_retry_connect_event, state)) < 0) {
+    RPRINTF("error registering timeout client connection event handler: %s\n", 
strerror(id));
+    /* TODO: we leak a fd here */
+    return -1;
+  }
+  state->stream_fd.fd = fd;
+  state->stream_fd.id = id;
+  return 0;
+}
+
+static int primary_blocking_connect(struct tdremus_state *state)
+{
+  int fd;
+  int id;
+  int rc;
+  int flags;
+
+  RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), 
ntohs(state->sa.sin_port));
+
+  if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+    RPRINTF("could not create client socket: %d\n", errno);
+    return -1;
+  }
+
+  do {
+    if ((rc = connect(fd, &state->sa, sizeof(state->sa))) < 0) {
+      if (errno == ECONNREFUSED) {
+        RPRINTF("connection refused -- retrying in 1 second\n");
+        sleep(1);
+      } else {
+        RPRINTF("connection failed: %d\n", errno);
+        close(fd);
+        return -1;
+      }
+    }
+  } while (rc < 0);
+
+  RPRINTF("client connected\n");
+
+  /* make socket nonblocking */
+  if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+    flags = 0;
+  if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
+  {
+    RPRINTF("error making socket nonblocking\n");
+    close(fd);
+    return -1;
+  }
+
+  if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, 
remus_client_event, state)) < 0) {
+    RPRINTF("error registering client event handler: %s\n", strerror(id));
+    close(fd);
+    return -1;
+  }
+
+  state->stream_fd.fd = fd;
+  state->stream_fd.id = id;
+  return 0;
+}
+
+/* on read, just pass request through */
+static void primary_queue_read(td_driver_t *driver, td_request_t treq)
+{
+  /* just pass read through */
+  td_forward_request(treq);
+}
+
+/* TODO:
+ * The primary uses mwrite() to write the contents of a write request to the
+ * backup. This effectively blocks until all data has been copied into a system
+ * buffer or a timeout has occured. We may wish to instead use tapdisk's
+ * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
+ * and write data in an asynchronous fashion.
+ */
+static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+{
+    struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+    char header[sizeof(uint32_t) + sizeof(uint64_t)];
+    uint32_t *sectors = (uint32_t *)header;
+    uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+
+    // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
+  
+    /* -1 means we haven't connected yet, -2 means the connection was lost */
+    if(s->stream_fd.fd == -1) {
+       RPRINTF("connecting to backup...\n");
+       primary_blocking_connect(s);
+    }
+
+    *sectors = treq.secs;
+    *sector = treq.sec;
+
+    if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
+       goto fail;
+    if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
+       goto fail;
+  
+    if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * 
driver->info.sector_size) < 0)
+       goto fail;
+
+    td_forward_request(treq);
+
+    return;
+
+  fail:
+    /* switch to unprotected mode and tell tapdisk to retry */
+    RPRINTF("write request replication failed, switching to unprotected mode");
+    switch_mode(s->tdremus_driver, mode_unprotected);
+    td_complete_request(treq, -EBUSY);
+}
+
+
+static int client_flush(td_driver_t *driver)
+{
+    struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+    // RPRINTF("committing output\n");
+
+    if (s->stream_fd.fd == -1)
+       /* connection not yet established, nothing to flush */
+       return 0;
+
+    if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 0) {
+       RPRINTF("error flushing output");
+       close_stream_fd(s);
+       return -1;
+    }
+
+    return 0;
+}
+
+static int primary_start(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  RPRINTF("activating client mode\n");
+
+  tapdisk_remus.td_queue_read = primary_queue_read;
+  tapdisk_remus.td_queue_write = primary_queue_write;
+  s->queue_flush = client_flush;
+
+  s->stream_fd.fd = -1;
+  s->stream_fd.id = -1;
+
+  return 0;
+}
+
+/* timeout callback */
+static void remus_retry_connect_event(event_id_t id, char mode, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)private;
+
+  /* do a non-blocking connect */
+  if (connect(s->stream_fd.fd, &s->sa, sizeof(s->sa)) && errno != EINPROGRESS) 
{
+    if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == EAGAIN || 
errno == ECONNABORTED)
+    { 
+      /* try again in a second */
+      tapdisk_server_unregister_event(s->stream_fd.id);
+      if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, 
s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
+        RPRINTF("error registering timeout client connection event handler: 
%s\n", strerror(id));
+        return;
+      }
+      s->stream_fd.id = id;
+    }
+    else
+    { 
+      /* not recoverable */
+      RPRINTF("error connection to server %s\n", strerror(errno));
+      return;
+    }
+  }
+  else
+  {
+    /* the connect returned EINPROGRESS (nonblocking connect) we must wait for 
the fd to be writeable to determine if the connect worked */
+
+    tapdisk_server_unregister_event(s->stream_fd.id);
+    if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, 
s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
+      RPRINTF("error registering client connection event handler: %s\n", 
strerror(id));
+      return;
+    }
+    s->stream_fd.id = id;
+  }
+}
+
+/* callback when nonblocking connect() is finished */
+/* called only by primary in unprotected state */
+static void remus_connect_event(event_id_t id, char mode, void *private)
+{
+  int socket_errno;
+  socklen_t socket_errno_size;
+  struct tdremus_state *s = (struct tdremus_state *)private;
+
+  /* check to se if the connect succeeded */
+  socket_errno_size = sizeof(socket_errno);
+  if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, 
&socket_errno_size)) {
+    RPRINTF("error getting socket errno\n");
+    return;
+  }
+
+  RPRINTF("socket connect returned %d\n", socket_errno);
+ 
+  if(socket_errno)
+  {
+    /* the connect did not succeed */
+    
+    if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH || 
socket_errno == ETIMEDOUT
+      || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
+    {
+      /* we can probably assume that the backup is down. just try again later 
*/
+      tapdisk_server_unregister_event(s->stream_fd.id);
+      if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, 
s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
+        RPRINTF("error registering timeout client connection event handler: 
%s\n", strerror(id));
+        return;
+      }
+      s->stream_fd.id = id;
+    }
+    else
+    {
+      RPRINTF("socket connect returned %d, giving up\n", socket_errno);
+    }
+  }
+  else
+  {
+    /* the connect succeeded */
+
+    /* unregister this function and register a new event handler */
+    tapdisk_server_unregister_event(s->stream_fd.id);
+    if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
+      RPRINTF("error registering client event handler: %s\n", strerror(id));
+      return;
+    }
+    s->stream_fd.id = id;
+
+    /* switch from unprotected to protected client */
+    switch_mode(s->tdremus_driver, mode_primary);
+  }
+}
+
+
+/* we install this event handler on the primary once we have connected to the 
backup */
+/* wait for "done" message to commit checkpoint */
+static void remus_client_event(event_id_t id, char mode, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)private;
+  char req[5]; 
+  int rc;
+
+  if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
+      /* replication stream closed or otherwise broken (timeout, reset, &c) */
+      RPRINTF("error reading from backup\n");
+      close_stream_fd(s);
+      return;
+  }
+
+  req[4] = '\0';
+
+  if (!strcmp(req, TDREMUS_DONE))
+      /* checkpoint committed, inform msg_fd */
+      ctl_respond(s, TDREMUS_DONE);
+  else {
+    RPRINTF("received unknown message: %s\n", req);
+    close_stream_fd(s);
+  }
+
+  return;
+}
+
+/* backup functions */
+static void remus_server_event(event_id_t id, char mode, void *private);
+
+/* returns the socket that receives write requests */
+static void remus_server_accept(event_id_t id, char mode, void* private)
+{
+  struct tdremus_state* s = (struct tdremus_state *) private;
+
+  int stream_fd;
+  event_id_t cid;
+
+  /* XXX: add address-based black/white list */
+  if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
+    RPRINTF("error accepting connection: %d\n", errno);
+    return;
+  }
+
+  /* TODO: check to see if we are already replicating. if so just close the
+   * connection (or do something smarter) */
+  RPRINTF("server accepted connection\n");
+
+  /* add tapdisk event for replication stream */
+  cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
+                                      remus_server_event, s);
+
+  if(cid < 0) {
+    RPRINTF("error registering connection event handler: %s\n", 
strerror(errno));
+    close(stream_fd);
+    return;
+  }
+
+  /* store replication file descriptor */
+  s->stream_fd.fd = stream_fd;
+  s->stream_fd.id = cid;
+}
+
+/* returns -2 if EADDRNOTAVAIL */
+static int remus_bind(struct tdremus_state* s)
+{
+//  struct sockaddr_in sa;
+  int opt;
+  int rc = -1;
+
+  if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    RPRINTF("could not create server socket: %d\n", errno);
+    return rc;
+  }
+  opt = 1;
+  if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) 
< 0)
+    RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno);
+
+  if (bind(s->server_fd.fd, &s->sa, sizeof(s->sa)) < 0) {
+    RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", 
s->server_fd.fd,
+           inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, 
strerror(errno));
+    if (errno != EADDRINUSE)
+      rc = -2;
+    goto err_sfd;
+  }
+  if (listen(s->server_fd.fd, 10)) {
+    RPRINTF("could not listen on socket: %d\n", errno);
+    goto err_sfd;
+  }
+
+  /* The socket s now bound to the address and listening so we may now register
+   * the fd with tapdisk */
+
+  if((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                                      s->server_fd.fd, 0,
+                                                      remus_server_accept, s)) 
< 0) {
+    RPRINTF("error registering server connection event handler: %s",
+             strerror(s->server_fd.id));
+    goto err_sfd;
+  }
+
+  return 0;
+
+  err_sfd:
+  close(s->server_fd.fd);
+  s->server_fd.fd = -1;
+
+  return rc;
+}
+
+/* wait for latest checkpoint to be applied */
+static inline int server_writes_inflight(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  if (!s->ramdisk.inflight && !s->ramdisk.prev)
+    return 0;
+
+  return 1;
+}
+
+/* Due to block device prefetching this code may be called on the server side
+ * during normal replication. In this case we must return EBUSY, otherwise the
+ * domain may be started with stale data.
+ */
+void backup_queue_read(td_driver_t *driver, td_request_t treq)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  if(!remus_image)
+    remus_image = treq.image;
+
+#if 0
+  /* due to prefetching, we must return EBUSY on server reads. This
+   * maintains a consistent disk image */
+  td_complete_request(treq, -EBUSY);
+#else
+  /* what exactly is the race that requires the response above? */
+  td_forward_request(treq);
+#endif
+}
+
+/* see above */
+void backup_queue_write(td_driver_t *driver, td_request_t treq)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  /* on a server write, we know the domain has failed over. we must change our
+   * state to unprotected and then have the unprotected queue_write function
+   * handle the write
+   */
+
+  switch_mode(driver, mode_unprotected);
+  /* TODO: call the appropriate write function rather than return EBUSY */
+  td_complete_request(treq, -EBUSY);
+}
+
+static int backup_start(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+  int fd;
+
+  if (ramdisk_start(driver) < 0)
+    return -1;
+  
+  tapdisk_remus.td_queue_read = backup_queue_read;
+  tapdisk_remus.td_queue_write = backup_queue_write;
+  /* TODO set flush function */
+  return 0;
+}
+
+static int server_do_wreq(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+  static tdremus_wire_t twreq;
+  char buf[4096];
+  int len, rc;
+
+  char header[sizeof(uint32_t) + sizeof(uint64_t)];
+  uint32_t *sectors = (uint32_t *) header;
+  uint64_t *sector =  (uint64_t *) &header[sizeof(uint32_t)];
+
+  // RPRINTF("received write request\n");
+
+  if (mread(s->stream_fd.fd, header, sizeof(header)) < 0)
+    goto err;
+
+  len = *sectors * driver->info.sector_size;
+
+  //RPRINTF("writing %d sectors (%d bytes) starting at %" PRIu64 "\n", 
*sectors, len,
+  // *sector);
+
+  if (len > sizeof(buf)) {
+    /* freak out! */
+    RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf));
+    return -1;
+  }
+
+  if (mread(s->stream_fd.fd, buf, len) < 0)
+    goto err;
+
+  if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
+      goto err;
+
+  return 0;
+
+  err:
+  /* should start failover */
+  RPRINTF("backup write request error\n");
+  close_stream_fd(s);
+
+  return -1;
+}
+
+static int server_do_sreq(td_driver_t *driver)
+{
+  /*
+  RPRINTF("submit request received\n");
+  */
+  
+  return 0;
+}
+
+/* at this point, the server can start applying the most recent
+ * ramdisk. */
+static int server_do_creq(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  // RPRINTF("committing buffer\n");
+
+  ramdisk_start_flush(driver);
+
+  /* XXX this message should not be sent until flush completes! */
+  if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
+    return -1;
+
+  return 0;
+}
+
+
+/* called when data is pending in s->rfd */
+static void remus_server_event(event_id_t id, char mode, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)private;
+  td_driver_t *driver = s->tdremus_driver;
+  char req[5];
+
+  // RPRINTF("replication data waiting\n");
+
+  /* TODO: add a get_connection_by_event_id() function.
+   * for now we can assume that the fd is s->stream_fd */
+
+  if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
+      RPRINTF("error reading server event, activating backup\n");
+      switch_mode(driver, mode_unprotected);
+      return;
+  }
+
+  req[4] = '\0';
+
+  if (!strcmp(req, TDREMUS_WRITE))
+    server_do_wreq(driver);
+  else if (!strcmp(req, TDREMUS_SUBMIT))
+    server_do_sreq(driver);
+  else if (!strcmp(req, TDREMUS_COMMIT))
+    server_do_creq(driver);
+  else
+    RPRINTF("unknown request received: %s\n", req);
+
+  return;
+
+}
+
+/* unprotected */
+
+void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  /* wait for previous ramdisk to flush  before servicing reads */
+  if (server_writes_inflight(driver)) {
+    /* for now lets just return EBUSY. if this becomes an issue we can
+     * do something smarter */
+    td_complete_request(treq, -EBUSY);
+  }
+  else {
+    /* here we just pass reads through */
+    td_forward_request(treq);
+  }
+}
+
+/* For a recoverable remus solution we need to log unprotected writes here */
+void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  /* wait for previous ramdisk to flush */
+  if (server_writes_inflight(driver)) {
+    RPRINTF("queue_write: waiting for queue to drain");
+    td_complete_request(treq, -EBUSY);
+  }
+  else {
+      // RPRINTF("servicing write request on backup\n");
+      td_forward_request(treq);
+  }
+}
+
+static int unprotected_start(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  RPRINTF("failure detected, activating passthrough\n");
+
+  /* close the server socket */
+  close_stream_fd(s);
+
+  /* unregister the replication stream */
+  tapdisk_server_unregister_event(s->server_fd.id);
+
+  /* close the replication stream */
+  close(s->server_fd.fd);
+  s->server_fd.fd = -1;
+
+  /* install the unprotected read/write handlers */
+  tapdisk_remus.td_queue_read = unprotected_queue_read;
+  tapdisk_remus.td_queue_write = unprotected_queue_write;
+
+  return 0;
+}
+
+
+/* control */
+
+static inline int resolve_address(const char* addr, struct in_addr* ia)
+{
+  struct hostent* he;
+  uint32_t ip;
+
+  if (!(he = gethostbyname(addr))) {
+    RPRINTF("error resolving %s: %d\n", addr, h_errno);
+    return -1;
+  }
+
+  if (!he->h_addr_list[0]) {
+    RPRINTF("no address found for %s\n", addr);
+    return -1;
+  }
+
+  /* network byte order */
+  ip = *((uint32_t**)he->h_addr_list)[0];
+  ia->s_addr = ip;
+
+  return 0;
+}
+
+static int get_args(td_driver_t *driver, const char* name)
+{
+  struct tdremus_state *state = (struct tdremus_state *)driver->data;
+  char* host;
+  char* port;
+//  char* driver_str;
+//  char* parent;
+//  int type;
+//  char* path;
+//  unsigned long ulport;
+//  int i;
+//  struct sockaddr_in server_addr_in;
+
+  int gai_status;
+  int valid_addr;
+  struct addrinfo gai_hints;
+  struct addrinfo *servinfo, *servinfo_itr;
+
+  memset(&gai_hints, 0, sizeof gai_hints);
+  gai_hints.ai_family = AF_UNSPEC;
+  gai_hints.ai_socktype = SOCK_STREAM;
+
+  port = strchr(name, ':');
+  if (!port) {
+    RPRINTF("missing host in %s\n", name);
+    return -ENOENT;
+  }
+  if (!(host = strndup(name, port - name))) {
+    RPRINTF("unable to allocate host\n");
+    return -ENOMEM;
+  }
+  port++;
+
+  if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
+    RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
+    return -ENOENT;
+  }
+
+  /* TODO: do something smarter here */
+  valid_addr = 0;
+  for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = 
servinfo_itr->ai_next) {
+    void *addr;
+    char *ipver;
+
+    if (servinfo_itr->ai_family == AF_INET) {
+      valid_addr = 1;
+      memset(&state->sa, 0, sizeof(state->sa));
+      state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
+      break;
+    }
+  }
+  freeaddrinfo(servinfo); 
+
+  if (!valid_addr)
+    return -ENOENT;
+
+  RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), 
ntohs(state->sa.sin_port));
+
+  return 0;
+}
+
+static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+  int rc;
+
+  if (mode == s->mode)
+    return 0;
+
+  if (s->queue_flush)
+      if ((rc = s->queue_flush(driver)) < 0) {
+         // fall back to unprotected mode on error
+         RPRINTF("switch_mode: error flushing queue (old: %d, new: %d)", 
s->mode, mode);
+         mode = mode_unprotected;
+      }
+
+  if (mode == mode_unprotected)
+    rc = unprotected_start(driver);
+  else if (mode == mode_primary)
+    rc = primary_start(driver);
+  else if (mode == mode_backup)
+    rc = backup_start(driver);
+  else {
+    RPRINTF("unknown mode requested: %d\n", mode);
+    rc = -1;
+  }
+
+  if (!rc)
+    s->mode = mode;
+
+  return rc;
+}
+
+static void ctl_request(event_id_t id, char mode, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)private;
+  td_driver_t *driver = s->tdremus_driver;
+  char msg[80];
+  int rc;
+
+  // RPRINTF("data waiting on control fifo\n");
+
+  if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
+    RPRINTF("0-byte read received, reopening FIFO\n");
+    /*TODO: we may have to unregister/re-register with tapdisk_server */
+    close(s->ctl_fd.fd);
+    RPRINTF("FIFO closed\n");
+    if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+       RPRINTF("error reopening FIFO: %d\n", errno);
+    }
+    return;
+  }
+
+  if (rc < 0) {
+    RPRINTF("error reading from FIFO: %d\n", errno);
+    return;
+  }
+
+  /* TODO: need to get driver somehow */
+  msg[rc] = '\0';
+  if (!strncmp(msg, "flush", 5)) {
+    if (s->queue_flush)
+       if ((rc = s->queue_flush(driver))) {
+           RPRINTF("error passing flush request to backup");
+           ctl_respond(s, TDREMUS_FAIL);
+       }
+  } else {
+    RPRINTF("unknown command: %s\n", msg);
+  }
+}
+
+static int ctl_respond(struct tdremus_state *s, const char *response)
+{
+    int rc;
+
+    if ((rc = write(s->msg_fd.fd, response, strlen(response))) < 0) {
+       RPRINTF("error writing notification: %d\n", errno);
+       close(s->msg_fd.fd);
+       if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0)
+           RPRINTF("error reopening FIFO: %d\n", errno);
+    }
+
+    return rc;
+}
+
+/* must be called after the underlying driver has been initialized */
+static int ctl_open(td_driver_t *driver, const char* name)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+  int i, l;
+
+  /* first we must ensure that BLKTAP_CTRL_DIR exists */
+  if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST)
+  {
+    DPRINTF("error creating directory %s: %d\n", BLKTAP_CTRL_DIR, errno);
+    return -1;
+  }
+
+  /* use the device name to create the control fifo path */
+  if (asprintf(&s->ctl_path, BLKTAP_CTRL_DIR "/remus_%s", name) < 0)
+    return -1;
+  /* scrub fifo pathname  */
+  for (i = strlen(BLKTAP_CTRL_DIR) + 1, l = strlen(s->ctl_path); i < l; i++) {
+    if (strchr(":/", s->ctl_path[i]))
+      s->ctl_path[i] = '_';
+  }
+  if (asprintf(&s->msg_path, "%s.msg", s->ctl_path) < 0)
+    goto err_ctlfifo;
+
+  if (mkfifo(s->ctl_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
+    RPRINTF("error creating control FIFO %s: %d\n", s->ctl_path, errno);
+    goto err_msgfifo;
+  }
+
+  if (mkfifo(s->msg_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
+    RPRINTF("error creating message FIFO %s: %d\n", s->msg_path, errno);
+    goto err_msgfifo;
+  }
+
+  /* RDWR so that fd doesn't block select when no writer is present */
+  if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+    RPRINTF("error opening control FIFO %s: %d\n", s->ctl_path, errno);
+    goto err_msgfifo;
+  }
+
+  if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0) {
+    RPRINTF("error opening message FIFO %s: %d\n", s->msg_path, errno);
+    goto err_openctlfifo;
+  }
+
+  RPRINTF("control FIFO %s\n", s->ctl_path);
+  RPRINTF("message FIFO %s\n", s->msg_path);
+
+  return 0;
+
+  err_openctlfifo:
+  close(s->ctl_fd.fd);
+  err_msgfifo:
+  free(s->msg_path);
+  s->msg_path = NULL;
+  err_ctlfifo:
+  free(s->ctl_path);
+  s->ctl_path = NULL;
+  return -1;
+}
+
+static void ctl_close(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+  /* TODO: close *all* connections */
+
+  if(s->ctl_fd.fd)
+    close(s->ctl_fd.fd);
+
+  if (s->ctl_path) {
+    unlink(s->ctl_path);
+    free(s->ctl_path);
+    s->ctl_path = NULL;
+  }
+  if (s->msg_path) {
+    unlink(s->msg_path);
+    free(s->msg_path);
+    s->msg_path = NULL;
+  }
+}
+
+static int ctl_register(struct tdremus_state *s)
+{
+  RPRINTF("registering ctl fifo\n");
+
+  /* register ctl fd */
+  s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->ctl_fd.fd, 0, ctl_request, s);
+  
+  if (s->ctl_fd.id < 0) {
+    RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, s->ctl_fd.id);
+    return -1;
+  }
+
+  return 0;
+}
+
+/* interface */
+
+static int tdremus_open(td_driver_t *driver, const char *name,
+                         td_flag_t flags)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+  int rc;
+
+  RPRINTF("opening %s\n", name);
+
+  /* first we need to get the underlying vbd for this driver stack. To do so we
+   * need to know the vbd's id. Fortunately, for tapdisk2 this is hard-coded as
+   * 0 (see tapdisk2.c)
+   */
+  device_vbd = tapdisk_server_get_vbd(0);
+
+  memset(s, 0, sizeof(*s));
+  s->server_fd.fd = -1;
+  s->stream_fd.fd = -1;
+  s->ctl_fd.fd = -1;
+  s->msg_fd.fd = -1;
+
+  /* TODO: this is only needed so that the server can send writes down
+   * the driver stack from the stream_fd event handler */
+  s->tdremus_driver = driver;
+
+  /* parse name to get info etc */
+  if ((rc = get_args(driver, name)))
+    return rc;
+
+  if ((rc = ctl_open(driver, name))) {
+    RPRINTF("error setting up control channel\n");
+    free(s->driver_data);
+    return rc;
+  }
+
+  if ((rc = ctl_register(s))) {
+    RPRINTF("error registering control channel\n");
+    free(s->driver_data);
+    return rc;
+  }
+
+  if (!(rc = remus_bind(s)))
+      rc = switch_mode(driver, mode_backup);
+  else if (rc == -2)
+      rc = switch_mode(driver, mode_primary);
+
+  if (!rc)
+      return 0;
+
+  tdremus_close(driver);
+  return -EIO;
+}
+
+static int tdremus_close(td_driver_t *driver)
+{
+  struct tdremus_state *s = (struct tdremus_state *)driver->data;
+  int rc;
+
+  RPRINTF("closing\n");
+
+  if (s->driver_data) {
+    free(s->driver_data);
+    s->driver_data = NULL;
+  }
+  if (s->server_fd.fd >= 0) {
+    close(s->server_fd.fd);
+    s->server_fd.fd = -1;
+  }
+  if (s->stream_fd.fd >= 0)
+    close_stream_fd(s);
+
+  ctl_close(driver);
+
+  return rc;
+}
+
+static int tdremus_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
+{
+  /* we shouldn't have a parent... for now */
+  return -EINVAL;
+}
+
+static int tdremus_validate_parent(td_driver_t *driver, 
+                            td_driver_t *pdriver, td_flag_t flags)
+{
+  return 0;
+}
+
+struct tap_disk tapdisk_remus = {
+  .disk_type          = "tapdisk_remus",
+  .private_data_size  = sizeof(struct tdremus_state),
+  .td_open            = tdremus_open,
+  .td_queue_read      = unprotected_queue_read,
+  .td_queue_write     = unprotected_queue_write,
+  .td_close           = tdremus_close,
+  .td_get_parent_id   = tdremus_get_parent_id,
+  .td_validate_parent = tdremus_validate_parent,
+  .td_debug           = NULL,
+};
diff --git a/tools/blktap2/drivers/disktypes.h 
b/tools/blktap2/drivers/disktypes.h
--- a/tools/blktap2/drivers/disktypes.h
+++ b/tools/blktap2/drivers/disktypes.h
@@ -49,6 +49,7 @@
  extern struct tap_disk tapdisk_qcow; 
 extern struct tap_disk tapdisk_block_cache;
 extern struct tap_disk tapdisk_log;
+extern struct tap_disk tapdisk_remus;
 
 #define MAX_DISK_TYPES        20
 
@@ -61,6 +62,7 @@
 #define DISK_TYPE_QCOW        6
 #define DISK_TYPE_BLOCK_CACHE 7
 #define DISK_TYPE_LOG         9
+#define DISK_TYPE_REMUS       10
 
 /*Define Individual Disk Parameters here */
 static disk_info_t null_disk = {
@@ -167,6 +169,16 @@
 #endif
 };
 
+static disk_info_t remus_disk = {
+       DISK_TYPE_REMUS,
+       "remus disk replicator (remus)",
+       "remus",
+       0,
+#ifdef TAPDISK
+       &tapdisk_remus,
+#endif
+};
+
 /*Main disk info array */
 static disk_info_t *dtypes[] = {
        &aio_disk,
@@ -179,6 +191,7 @@
        &block_cache_disk,
        &null_disk,
        &log_disk,
+       &remus_disk,
 };
 
 #endif
diff --git a/tools/blktap2/drivers/hashtable.c 
b/tools/blktap2/drivers/hashtable.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap2/drivers/hashtable.c
@@ -0,0 +1,274 @@
+/* Copyright (C) 2004 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <math.h>
+
+/*
+Credit for primes table: Aaron Krowne
+ http://br.endernet.org/~akrowne/
+ http://planetmath.org/encyclopedia/GoodHashTablePrimes.html
+*/
+static const unsigned int primes[] = {
+53, 97, 193, 389,
+769, 1543, 3079, 6151,
+12289, 24593, 49157, 98317,
+196613, 393241, 786433, 1572869,
+3145739, 6291469, 12582917, 25165843,
+50331653, 100663319, 201326611, 402653189,
+805306457, 1610612741
+};
+const unsigned int prime_table_length = sizeof(primes)/sizeof(primes[0]);
+const float max_load_factor = 0.65;
+
+/*****************************************************************************/
+struct hashtable *
+create_hashtable(unsigned int minsize,
+                 unsigned int (*hashf) (void*),
+                 int (*eqf) (void*,void*))
+{
+    struct hashtable *h;
+    unsigned int pindex, size = primes[0];
+    /* Check requested hashtable isn't too large */
+    if (minsize > (1u << 30)) return NULL;
+    /* Enforce size as prime */
+    for (pindex=0; pindex < prime_table_length; pindex++) {
+        if (primes[pindex] > minsize) { size = primes[pindex]; break; }
+    }
+    h = (struct hashtable *)malloc(sizeof(struct hashtable));
+    if (NULL == h) return NULL; /*oom*/
+    h->table = (struct entry **)malloc(sizeof(struct entry*) * size);
+    if (NULL == h->table) { free(h); return NULL; } /*oom*/
+    memset(h->table, 0, size * sizeof(struct entry *));
+    h->tablelength  = size;
+    h->primeindex   = pindex;
+    h->entrycount   = 0;
+    h->hashfn       = hashf;
+    h->eqfn         = eqf;
+    h->loadlimit    = (unsigned int) ceil(size * max_load_factor);
+    return h;
+}
+
+/*****************************************************************************/
+unsigned int
+hash(struct hashtable *h, void *k)
+{
+    /* Aim to protect against poor hash functions by adding logic here
+     * - logic taken from java 1.4 hashtable source */
+    unsigned int i = h->hashfn(k);
+    i += ~(i << 9);
+    i ^=  ((i >> 14) | (i << 18)); /* >>> */
+    i +=  (i << 4);
+    i ^=  ((i >> 10) | (i << 22)); /* >>> */
+    return i;
+}
+
+/*****************************************************************************/
+static int
+hashtable_expand(struct hashtable *h)
+{
+    /* Double the size of the table to accomodate more entries */
+    struct entry **newtable;
+    struct entry *e;
+    struct entry **pE;
+    unsigned int newsize, i, index;
+    /* Check we're not hitting max capacity */
+    if (h->primeindex == (prime_table_length - 1)) return 0;
+    newsize = primes[++(h->primeindex)];
+
+    newtable = (struct entry **)malloc(sizeof(struct entry*) * newsize);
+    if (NULL != newtable)
+    {
+        memset(newtable, 0, newsize * sizeof(struct entry *));
+        /* This algorithm is not 'stable'. ie. it reverses the list
+         * when it transfers entries between the tables */
+        for (i = 0; i < h->tablelength; i++) {
+            while (NULL != (e = h->table[i])) {
+                h->table[i] = e->next;
+                index = indexFor(newsize,e->h);
+                e->next = newtable[index];
+                newtable[index] = e;
+            }
+        }
+        free(h->table);
+        h->table = newtable;
+    }
+    /* Plan B: realloc instead */
+    else 
+    {
+        newtable = (struct entry **)
+                   realloc(h->table, newsize * sizeof(struct entry *));
+        if (NULL == newtable) { (h->primeindex)--; return 0; }
+        h->table = newtable;
+        memset(newtable[h->tablelength], 0, newsize - h->tablelength);
+        for (i = 0; i < h->tablelength; i++) {
+            for (pE = &(newtable[i]), e = *pE; e != NULL; e = *pE) {
+                index = indexFor(newsize,e->h);
+                if (index == i)
+                {
+                    pE = &(e->next);
+                }
+                else
+                {
+                    *pE = e->next;
+                    e->next = newtable[index];
+                    newtable[index] = e;
+                }
+            }
+        }
+    }
+    h->tablelength = newsize;
+    h->loadlimit   = (unsigned int) ceil(newsize * max_load_factor);
+    return -1;
+}
+
+/*****************************************************************************/
+unsigned int
+hashtable_count(struct hashtable *h)
+{
+    return h->entrycount;
+}
+
+/*****************************************************************************/
+int
+hashtable_insert(struct hashtable *h, void *k, void *v)
+{
+    /* This method allows duplicate keys - but they shouldn't be used */
+    unsigned int index;
+    struct entry *e;
+    if (++(h->entrycount) > h->loadlimit)
+    {
+        /* Ignore the return value. If expand fails, we should
+         * still try cramming just this value into the existing table
+         * -- we may not have memory for a larger table, but one more
+         * element may be ok. Next time we insert, we'll try expanding again.*/
+        hashtable_expand(h);
+    }
+    e = (struct entry *)malloc(sizeof(struct entry));
+    if (NULL == e) { --(h->entrycount); return 0; } /*oom*/
+    e->h = hash(h,k);
+    index = indexFor(h->tablelength,e->h);
+    e->k = k;
+    e->v = v;
+    e->next = h->table[index];
+    h->table[index] = e;
+    return -1;
+}
+
+/*****************************************************************************/
+void * /* returns value associated with key */
+hashtable_search(struct hashtable *h, void *k)
+{
+    struct entry *e;
+    unsigned int hashvalue, index;
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hashvalue);
+    e = h->table[index];
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k))) return e->v;
+        e = e->next;
+    }
+    return NULL;
+}
+
+/*****************************************************************************/
+void * /* returns value associated with key */
+hashtable_remove(struct hashtable *h, void *k)
+{
+    /* TODO: consider compacting the table when the load factor drops enough,
+     *       or provide a 'compact' method. */
+
+    struct entry *e;
+    struct entry **pE;
+    void *v;
+    unsigned int hashvalue, index;
+
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hash(h,k));
+    pE = &(h->table[index]);
+    e = *pE;
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+        {
+            *pE = e->next;
+            h->entrycount--;
+            v = e->v;
+            freekey(e->k);
+            free(e);
+            return v;
+        }
+        pE = &(e->next);
+        e = e->next;
+    }
+    return NULL;
+}
+
+/*****************************************************************************/
+/* destroy */
+void
+hashtable_destroy(struct hashtable *h, int free_values)
+{
+    unsigned int i;
+    struct entry *e, *f;
+    struct entry **table = h->table;
+    if (free_values)
+    {
+        for (i = 0; i < h->tablelength; i++)
+        {
+            e = table[i];
+            while (NULL != e)
+            { f = e; e = e->next; freekey(f->k); free(f->v); free(f); }
+        }
+    }
+    else
+    {
+        for (i = 0; i < h->tablelength; i++)
+        {
+            e = table[i];
+            while (NULL != e)
+            { f = e; e = e->next; freekey(f->k); free(f); }
+        }
+    }
+    free(h->table);
+    free(h);
+}
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap2/drivers/hashtable_itr.c 
b/tools/blktap2/drivers/hashtable_itr.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap2/drivers/hashtable_itr.c
@@ -0,0 +1,188 @@
+/* Copyright (C) 2002, 2004 Christopher Clark  
<firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include "hashtable_itr.h"
+#include <stdlib.h> /* defines NULL */
+
+/*****************************************************************************/
+/* hashtable_iterator    - iterator constructor */
+
+struct hashtable_itr *
+hashtable_iterator(struct hashtable *h)
+{
+    unsigned int i, tablelength;
+    struct hashtable_itr *itr = (struct hashtable_itr *)
+        malloc(sizeof(struct hashtable_itr));
+    if (NULL == itr) return NULL;
+    itr->h = h;
+    itr->e = NULL;
+    itr->parent = NULL;
+    tablelength = h->tablelength;
+    itr->index = tablelength;
+    if (0 == h->entrycount) return itr;
+
+    for (i = 0; i < tablelength; i++)
+    {
+        if (NULL != h->table[i])
+        {
+            itr->e = h->table[i];
+            itr->index = i;
+            break;
+        }
+    }
+    return itr;
+}
+
+/*****************************************************************************/
+/* key      - return the key of the (key,value) pair at the current position */
+/* value    - return the value of the (key,value) pair at the current position 
*/
+
+void *
+hashtable_iterator_key(struct hashtable_itr *i)
+{ return i->e->k; }
+
+void *
+hashtable_iterator_value(struct hashtable_itr *i)
+{ return i->e->v; }
+
+/*****************************************************************************/
+/* advance - advance the iterator to the next element
+ *           returns zero if advanced to end of table */
+
+int
+hashtable_iterator_advance(struct hashtable_itr *itr)
+{
+    unsigned int j,tablelength;
+    struct entry **table;
+    struct entry *next;
+    if (NULL == itr->e) return 0; /* stupidity check */
+
+    next = itr->e->next;
+    if (NULL != next)
+    {
+        itr->parent = itr->e;
+        itr->e = next;
+        return -1;
+    }
+    tablelength = itr->h->tablelength;
+    itr->parent = NULL;
+    if (tablelength <= (j = ++(itr->index)))
+    {
+        itr->e = NULL;
+        return 0;
+    }
+    table = itr->h->table;
+    while (NULL == (next = table[j]))
+    {
+        if (++j >= tablelength)
+        {
+            itr->index = tablelength;
+            itr->e = NULL;
+            return 0;
+        }
+    }
+    itr->index = j;
+    itr->e = next;
+    return -1;
+}
+
+/*****************************************************************************/
+/* remove - remove the entry at the current iterator position
+ *          and advance the iterator, if there is a successive
+ *          element.
+ *          If you want the value, read it before you remove:
+ *          beware memory leaks if you don't.
+ *          Returns zero if end of iteration. */
+
+int
+hashtable_iterator_remove(struct hashtable_itr *itr)
+{
+    struct entry *remember_e, *remember_parent;
+    int ret;
+
+    /* Do the removal */
+    if (NULL == (itr->parent))
+    {
+        /* element is head of a chain */
+        itr->h->table[itr->index] = itr->e->next;
+    } else {
+        /* element is mid-chain */
+        itr->parent->next = itr->e->next;
+    }
+    /* itr->e is now outside the hashtable */
+    remember_e = itr->e;
+    itr->h->entrycount--;
+    freekey(remember_e->k);
+
+    /* Advance the iterator, correcting the parent */
+    remember_parent = itr->parent;
+    ret = hashtable_iterator_advance(itr);
+    if (itr->parent == remember_e) { itr->parent = remember_parent; }
+    free(remember_e);
+    return ret;
+}
+
+/*****************************************************************************/
+int /* returns zero if not found */
+hashtable_iterator_search(struct hashtable_itr *itr,
+                          struct hashtable *h, void *k)
+{
+    struct entry *e, *parent;
+    unsigned int hashvalue, index;
+
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hashvalue);
+
+    e = h->table[index];
+    parent = NULL;
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+        {
+            itr->index = index;
+            itr->e = e;
+            itr->parent = parent;
+            itr->h = h;
+            return -1;
+        }
+        parent = e;
+        e = e->next;
+    }
+    return 0;
+}
+
+
+/*
+ * Copyright (c) 2002, 2004, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap2/drivers/hashtable_itr.h 
b/tools/blktap2/drivers/hashtable_itr.h
new file mode 100644
--- /dev/null
+++ b/tools/blktap2/drivers/hashtable_itr.h
@@ -0,0 +1,112 @@
+/* Copyright (C) 2002, 2004 Christopher Clark 
<firstname.lastname@xxxxxxxxxxxx> */
+
+#ifndef __HASHTABLE_ITR_CWC22__
+#define __HASHTABLE_ITR_CWC22__
+#include "hashtable.h"
+#include "hashtable_private.h" /* needed to enable inlining */
+
+/*****************************************************************************/
+/* This struct is only concrete here to allow the inlining of two of the
+ * accessor functions. */
+struct hashtable_itr
+{
+    struct hashtable *h;
+    struct entry *e;
+    struct entry *parent;
+    unsigned int index;
+};
+
+
+/*****************************************************************************/
+/* hashtable_iterator
+ */
+
+struct hashtable_itr *
+hashtable_iterator(struct hashtable *h);
+
+/*****************************************************************************/
+/* hashtable_iterator_key
+ * - return the value of the (key,value) pair at the current position */
+
+extern inline void *
+hashtable_iterator_key(struct hashtable_itr *i)
+{
+    return i->e->k;
+}
+
+/*****************************************************************************/
+/* value - return the value of the (key,value) pair at the current position */
+
+extern inline void *
+hashtable_iterator_value(struct hashtable_itr *i)
+{
+    return i->e->v;
+}
+
+/*****************************************************************************/
+/* advance - advance the iterator to the next element
+ *           returns zero if advanced to end of table */
+
+int
+hashtable_iterator_advance(struct hashtable_itr *itr);
+
+/*****************************************************************************/
+/* remove - remove current element and advance the iterator to the next element
+ *          NB: if you need the value to free it, read it before
+ *          removing. ie: beware memory leaks!
+ *          returns zero if advanced to end of table */
+
+int
+hashtable_iterator_remove(struct hashtable_itr *itr);
+
+/*****************************************************************************/
+/* search - overwrite the supplied iterator, to point to the entry
+ *          matching the supplied key.
+            h points to the hashtable to be searched.
+ *          returns zero if not found. */
+int
+hashtable_iterator_search(struct hashtable_itr *itr,
+                          struct hashtable *h, void *k);
+
+#define DEFINE_HASHTABLE_ITERATOR_SEARCH(fnname, keytype) \
+int fnname (struct hashtable_itr *i, struct hashtable *h, keytype *k) \
+{ \
+    return (hashtable_iterator_search(i,h,k)); \
+}
+
+
+
+#endif /* __HASHTABLE_ITR_CWC22__*/
+
+/*
+ * Copyright (c) 2002, 2004, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap2/drivers/hashtable_utility.c 
b/tools/blktap2/drivers/hashtable_utility.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap2/drivers/hashtable_utility.c
@@ -0,0 +1,71 @@
+/* Copyright (C) 2002 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include "hashtable_utility.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+/*****************************************************************************/
+/* hashtable_change
+ *
+ * function to change the value associated with a key, where there already
+ * exists a value bound to the key in the hashtable.
+ * Source due to Holger Schemel.
+ * 
+ *  */
+int
+hashtable_change(struct hashtable *h, void *k, void *v)
+{
+    struct entry *e;
+    unsigned int hashvalue, index;
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hashvalue);
+    e = h->table[index];
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+        {
+            free(e->v);
+            e->v = v;
+            return -1;
+        }
+        e = e->next;
+    }
+    return 0;
+}
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap2/drivers/hashtable_utility.h 
b/tools/blktap2/drivers/hashtable_utility.h
new file mode 100644
--- /dev/null
+++ b/tools/blktap2/drivers/hashtable_utility.h
@@ -0,0 +1,55 @@
+/* Copyright (C) 2002 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#ifndef __HASHTABLE_CWC22_UTILITY_H__
+#define __HASHTABLE_CWC22_UTILITY_H__
+
+/*****************************************************************************
+ * hashtable_change
+ *
+ * function to change the value associated with a key, where there already
+ * exists a value bound to the key in the hashtable.
+ * Source due to Holger Schemel.
+ *
+ * @name        hashtable_change
+ * @param   h   the hashtable
+ * @param       key
+ * @param       value
+ *
+ */
+int
+hashtable_change(struct hashtable *h, void *k, void *v);
+
+#endif /* __HASHTABLE_CWC22_H__ */
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/python/xen/xend/server/BlktapController.py 
b/tools/python/xen/xend/server/BlktapController.py
--- a/tools/python/xen/xend/server/BlktapController.py
+++ b/tools/python/xen/xend/server/BlktapController.py
@@ -28,6 +28,7 @@
     'ram',
     'qcow',
     'vhd',
+    'remus',
     ]
 
 blktap_disk_types = blktap1_disk_types + blktap2_disk_types

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.