[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [RFC Patch v3 17/22] block-colo: implement colo disk replication



 TODO:
 update block-remus to use async io to instead
 of mread/mwrite.

Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
 tools/blktap2/drivers/Makefile            |    3 +
 tools/blktap2/drivers/block-colo.c        | 1151 +++++++++++++++++++++++++++++
 tools/blktap2/drivers/block-replication.c |  196 +++++
 tools/blktap2/drivers/block-replication.h |   56 ++
 tools/blktap2/drivers/tapdisk-disktype.c  |    9 +
 tools/blktap2/drivers/tapdisk-disktype.h  |    3 +-
 6 files changed, 1417 insertions(+), 1 deletion(-)
 create mode 100644 tools/blktap2/drivers/block-colo.c

diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
index 3d8ed8a..21d21b9 100644
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -28,6 +28,8 @@ REMUS-OBJS  += hashtable.o
 REMUS-OBJS  += hashtable_itr.o
 REMUS-OBJS  += hashtable_utility.o
 
+COLO-OBJS += block-colo.o
+
 tapdisk2 tapdisk-stream tapdisk-diff $(QCOW_UTIL): AIOLIBS := -laio
 
 MEMSHRLIBS :=
@@ -74,6 +76,7 @@ BLK-OBJS-y  += aes.o
 BLK-OBJS-y  += md5.o
 BLK-OBJS-y  += $(PORTABLE-OBJS-y)
 BLK-OBJS-y  += $(REMUS-OBJS)
+BLK-OBJS-y  += $(COLO-OBJS)
 
 all: $(IBIN) lock-util qcow-util
 
diff --git a/tools/blktap2/drivers/block-colo.c 
b/tools/blktap2/drivers/block-colo.c
new file mode 100644
index 0000000..565a386
--- /dev/null
+++ b/tools/blktap2/drivers/block-colo.c
@@ -0,0 +1,1151 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "tapdisk.h"
+#include "tapdisk-server.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-interface.h"
+#include "block-replication.h"
+
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+/* connect retry timeout (seconds) */
+#define COLO_CONNRETRY_TIMEOUT  1
+
+/* timeout for reads and writes in second */
+#define HEARTBEAT_S 1
+
+/* TAPDISK_DATA_REQUESTS I/O requests + commit flag */
+#define MAX_COLO_REQUEST        TAPDISK_DATA_REQUESTS + 1
+
+#undef DPRINTF
+#undef EPRINTF
+#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "COLO: " _f, ## _a)
+#define EPRINTF(_f, _a...) syslog (LOG_ERR, "COLO: " _f, ## _a)
+
+#define TDCOLO_WRITE "wreq"
+#define TDCOLO_COMMIT "creq"
+#define TDCOLO_DONE "done"
+#define TDCOLO_FAIL "fail"
+
+enum tdcolo_mode {
+       mode_invalid = 0,
+       mode_unprotected,
+       mode_primary,
+       mode_backup,
+
+       /*
+        * If we find some internal error in backup mode, we cannot
+        * switch to unprotected mode.
+        */
+       mode_failed,
+};
+
+enum {
+       colo_io,
+       colo_commit,
+};
+
+typedef struct queued_io {
+       int type;
+       union {
+               td_request_t treq;
+               char *buff; /* TDCOLO_COMMIT */
+       };
+} queued_io_t;
+
+struct queued_io_ring {
+       /* waste one slot to distinguish between empty and full */
+       queued_io_t qio[MAX_COLO_REQUEST + 1];
+       unsigned int prod;
+       unsigned int cons;
+};
+
+typedef struct colo_control {
+       /*
+        * socket file, the user writes "flush" to this socket, and then
+        * we write the result to it.
+        */
+       char *path;
+       int listen_fd;
+       event_id_t listen_id;
+
+       int io_fd;
+       event_id_t io_id;
+} colo_control_t;
+
+struct tdcolo_state {
+       colo_control_t ctl;
+
+       /* async connection */
+       td_replication_connect_t t;
+       /* replication channel */
+       td_async_io_t rio, wio;
+
+       /*
+        * queue I/O requests, and they will be forwarded to backup
+        * asynchronously.
+        */
+       struct queued_io_ring qio_ring;
+
+       /* ramdisk data */
+       struct ramdisk ramdisk;
+       /*
+        * The primary write request is queued in this
+        * hashtable, and will be flushed to ramdisk when
+        * the checkpoint finishes.
+        */
+       struct hashtable *h;
+       /*
+        * The secondary vm write request is queued in this
+        * hashtable, and will be dropped when the checkpoint
+        * finishes or flushed to ramdisk after failover.
+        */
+       struct hashtable *local;
+
+       /* mode methods */
+       enum tdcolo_mode mode;
+       /* It will be called when switching mode */
+       int (*queue_flush)(struct tdcolo_state *c);
+
+       char request[5];
+       char header[sizeof(uint32_t) + sizeof(uint64_t)];
+       int commit;
+       void *buff;
+       int bsize;
+       int sector_size;
+};
+
+struct tap_disk tapdisk_colo;
+
+static void colo_control_respond(colo_control_t *ctl, const char *response);
+static int switch_mode(struct tdcolo_state *c, enum tdcolo_mode mode);
+
+/* ======== common functions ======== */
+static int check_read_result(td_async_io_t *rio, int realsize,
+                            const char *target)
+{
+       if (realsize < 0) {
+               /* internal error */
+               EPRINTF("error reading from %s\n", target);
+               return ERROR_INTERNAL;
+       } else if (realsize < rio->size) {
+               /* timeout or I/O error */
+               EPRINTF("error reading from %s\n", target);
+               return ERROR_IO;
+       }
+
+       return 0;
+}
+
+static int check_write_result(td_async_io_t *wio, int realsize,
+                             const char * target)
+{
+       if (realsize < 0) {
+               /* internal error */
+               EPRINTF("error writing to %s\n", target);
+               return ERROR_INTERNAL;
+       } else if (realsize == 0) {
+               /* timeout or I/O error */
+               EPRINTF("error writing to %s\n", target);
+               return ERROR_IO;
+       }
+
+       return 0;
+}
+
+/* ======= ring functions ======== */
+static inline unsigned int ring_next(unsigned int pos)
+{
+       if (++pos > MAX_COLO_REQUEST)
+               return 0;
+
+       return pos;
+}
+
+static inline int ring_isempty(struct queued_io_ring* ring)
+{
+       return ring->cons == ring->prod;
+}
+
+static inline int ring_isfull(struct queued_io_ring* ring)
+{
+       return ring_next(ring->prod) == ring->cons;
+}
+
+static void ring_add_request(struct queued_io_ring *ring,
+                            const td_request_t *treq)
+{
+       /* If ring is full, it means that tapdisk2 has some bug */
+       if (ring_isfull(ring)) {
+               EPRINTF("OOPS, ring is full\n");
+               exit(1);
+       }
+
+       ring->qio[ring->prod].type = colo_io;
+       ring->qio[ring->prod].treq = *treq;
+       ring->prod = ring_next(ring->prod);
+}
+
+static void ring_add_commit_flag(struct queued_io_ring *ring)
+{
+       /* If ring is full, it means that tapdisk2 has some bug */
+       if (ring_isfull(ring)) {
+               EPRINTF("OOPS, ring is full\n");
+               exit(1);
+       }
+
+       ring->qio[ring->prod].type = colo_commit;
+       ring->qio[ring->prod].buff = TDCOLO_COMMIT;
+       ring->prod = ring_next(ring->prod);
+}
+
+/* return the first queued I/O request */
+static queued_io_t *ring_peek(struct queued_io_ring *ring)
+{
+       queued_io_t *qio;
+
+       if (ring_isempty(ring))
+               return NULL;
+
+       qio = &ring->qio[ring->cons];
+       return qio;
+}
+
+/* consume the first queued I/O request, and return it */
+static queued_io_t *ring_get(struct queued_io_ring *ring)
+{
+       queued_io_t *qio;
+
+       if (ring_isempty(ring))
+               return NULL;
+
+       qio = &ring->qio[ring->cons];
+       ring->cons = ring_next(ring->cons);
+       return qio;
+}
+
+/* ======== primary read/write functions ======== */
+static void primary_write_header(td_async_io_t *wio, int realsize, int 
errnoval);
+static void primary_write_data(td_async_io_t *wio, int realsize, int errnoval);
+static void primary_forward_done(td_async_io_t *wio, int realsize, int 
errnoval);
+static void primary_read_done(td_async_io_t *rio, int realsize, int errnoval);
+
+/*
+ * It is called when we cannot connect to backup, or find I/O error when
+ * reading/writing.
+ */
+static void primary_failed(struct tdcolo_state *c, int rc)
+{
+       td_replication_connect_kill(&c->t);
+       td_async_io_kill(&c->rio);
+       td_async_io_kill(&c->wio);
+       if (rc == ERROR_INTERNAL)
+               EPRINTF("switch to unprotected mode due to internal error");
+       if (rc == ERROR_CLOSE)
+               DPRINTF("switch to unprotected mode before closing");
+       switch_mode(c, mode_unprotected);
+}
+
+static void primary_waio(struct tdcolo_state *c, void *buff, size_t size,
+                        taio_callback *callback)
+{
+       td_async_io_t *wio = &c->wio;
+
+       wio->fd = c->t.fd;
+       wio->timeout_s = HEARTBEAT_S;
+       wio->mode = td_async_write;
+       wio->buff = buff;
+       wio->size = size;
+       wio->callback = callback;
+
+       if (td_async_io_start(wio))
+               primary_failed(c, ERROR_INTERNAL);
+}
+
+static void primary_raio(struct tdcolo_state *c)
+{
+       td_async_io_t *rio = &c->rio;
+
+       if (c->t.fd < 0)
+               return;
+
+       rio->fd = c->t.fd;
+       rio->timeout_s = 0;
+       rio->mode = td_async_read;
+       rio->buff = c->request;
+       rio->size = sizeof(c->request) - 1;
+       rio->callback = primary_read_done;
+
+       if (td_async_io_start(rio))
+               primary_failed(c, ERROR_INTERNAL);
+}
+
+static void primary_handle_queued_io(struct tdcolo_state *c)
+{
+       struct queued_io_ring *qring = &c->qio_ring;
+       unsigned int cons;
+       queued_io_t *qio;
+       int rc;
+
+       while (!ring_isempty(qring)) {
+               qio = ring_peek(qring);
+               if (qio->type == colo_commit) {
+                       primary_waio(c, qio->buff, strlen(qio->buff),
+                                    primary_forward_done);
+                       return;
+               }
+
+               if (qio->treq.op == TD_OP_WRITE) {
+                       primary_waio(c, TDCOLO_WRITE, strlen(TDCOLO_WRITE),
+                                    primary_write_header);
+                       return;
+               }
+
+               td_forward_request(qio->treq);
+               ring_get(qring);
+       }
+}
+
+/* wait for "done" message to commit checkpoint */
+static void primary_read_done(td_async_io_t *rio, int realsize, int errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+       char *req = c->request;
+       int rc;
+
+       rc = check_read_result(rio, realsize, "backup");
+       if (rc)
+               goto err;
+
+       rc = ERROR_INTERNAL;
+       req[4] = '\0';
+
+       if (c->commit != 1) {
+               EPRINTF("received unexpected message: %s\n", req);
+               goto err;
+       }
+
+       c->commit--;
+
+       if (strcmp(req, TDCOLO_DONE)) {
+               EPRINTF("received unknown message: %s\n", req);
+               goto err;
+       }
+
+       /* checkpoint committed, inform msg_fd */
+       colo_control_respond(&c->ctl, TDCOLO_DONE);
+       primary_raio(c);
+
+       return;
+err:
+       colo_control_respond(&c->ctl, TDCOLO_FAIL);
+       primary_failed(c, rc);
+}
+
+static void primary_write_header(td_async_io_t *wio, int realsize, int 
errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+       queued_io_t *qio = ring_peek(&c->qio_ring);
+       uint32_t *sectors = (uint32_t *)c->header;
+       uint64_t *sector = (uint64_t *)(c->header + sizeof(uint32_t));
+       int rc;
+
+       rc = check_write_result(wio, realsize, "backup");
+       if (rc) {
+               primary_failed(c, rc);
+               return;
+       }
+
+       *sectors = qio->treq.secs;
+       *sector = qio->treq.sec;
+
+       primary_waio(c, c->header, sizeof(c->header), primary_write_data);
+}
+
+static void primary_write_data(td_async_io_t *wio, int realsize, int errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+       queued_io_t *qio = ring_peek(&c->qio_ring);
+       int rc;
+
+       rc = check_write_result(wio, realsize, "backup");
+       if (rc) {
+               primary_failed(c, rc);
+               return;
+       }
+
+       primary_waio(c, qio->treq.buf, qio->treq.secs * c->sector_size,
+                    primary_forward_done);
+}
+
+static void primary_forward_done(td_async_io_t *wio, int realsize, int 
errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+       queued_io_t *qio;
+       struct td_request_t *treq;
+       int rc;
+
+       rc = check_write_result(wio, realsize, "backup");
+       if (rc) {
+               primary_failed(c, rc);
+               return;
+       }
+
+       qio = ring_get(&c->qio_ring);
+       if (qio->type == colo_io)
+               td_forward_request(qio->treq);
+       else
+               c->commit--;
+
+       primary_handle_queued_io(c);
+}
+
+static void primary_queue_read(td_driver_t *driver, td_request_t treq)
+{
+       struct tdcolo_state *c = driver->data;
+       struct queued_io_ring *ring = &c->qio_ring;
+
+       if (ring_isempty(ring)) {
+               /* just pass read through */
+               td_forward_request(treq);
+               return;
+       }
+
+       ring_add_request(ring, &treq);
+       if (td_replication_connect_status(&c->t) != 1)
+               return;
+
+       if (!td_async_io_is_running(&c->wio))
+               primary_handle_queued_io(c);
+}
+
+static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+{
+       struct tdcolo_state *c = driver->data;
+       struct queued_io_ring *ring = &c->qio_ring;
+
+       ring_add_request(ring, &treq);
+       if (td_replication_connect_status(&c->t) != 1)
+               return;
+
+       if (!td_async_io_is_running(&c->wio))
+               primary_handle_queued_io(c);
+}
+
+/* It is called when the user write "flush" to control file. */
+static int client_flush(struct tdcolo_state *c)
+{
+       if (td_replication_connect_status(&c->t) != 1)
+               return 0;
+
+       if (c->commit > 0) {
+               EPRINTF("the last commit is not finished\n");
+               colo_control_respond(&c->ctl, TDCOLO_FAIL);
+               primary_failed(c, ERROR_INTERNAL);
+               return -1;
+       }
+
+       ring_add_commit_flag(&c->qio_ring);
+       c->commit = 2;
+       if (!td_async_io_is_running(&c->wio))
+               primary_handle_queued_io(c);
+
+       return 0;
+}
+
+/* It is called when switching the mode from primary to unprotected */
+static int primary_flush(struct tdcolo_state *c)
+{
+       struct queued_io_ring *qring = &c->qio_ring;
+       queued_io_t *qio;
+
+       if (ring_isempty(qring))
+               return 0;
+
+       while (!ring_isempty(qring)) {
+               qio = ring_get(qring);
+
+               if (qio->type == colo_commit) {
+                       colo_control_respond(&c->ctl, TDCOLO_FAIL);
+                       c->commit = 0;
+                       continue;
+               }
+
+               td_forward_request(qio->treq);
+       }
+
+       return 0;
+}
+
+static void colo_client_established(td_replication_connect_t *t, int rc)
+{
+       struct tdcolo_state *c = CONTAINER_OF(t, *c, t);
+
+       if (rc) {
+               primary_failed(c, rc);
+               return;
+       }
+
+       /* the connect succeeded and handle the queued requests */
+       primary_handle_queued_io(c);
+
+       primary_raio(c);
+}
+
+static int primary_start(struct tdcolo_state *c)
+{
+       DPRINTF("activating client mode\n");
+
+       tapdisk_colo.td_queue_read = primary_queue_read;
+       tapdisk_colo.td_queue_write = primary_queue_write;
+       c->queue_flush = primary_flush;
+
+       c->t.callback = colo_client_established;
+       return td_replication_client_start(&c->t);
+}
+
+/* ======== backup read/write functions ======== */
+static void backup_read_header_done(td_async_io_t *rio, int realsize,
+                                   int errnoval);
+static void backup_read_data_done(td_async_io_t *rio, int realsize,
+                                 int errnoval);
+static void backup_write_done(td_async_io_t *wio, int realsize, int errnoval);
+
+static void backup_failed(struct tdcolo_state *c, int rc)
+{
+       td_replication_connect_kill(&c->t);
+       td_async_io_kill(&c->rio);
+       td_async_io_kill(&c->wio);
+
+       if (rc == ERROR_INTERNAL) {
+               EPRINTF("switch to failed mode due to internal error");
+               switch_mode(c, mode_failed);
+               return;
+       }
+
+       if (rc == ERROR_CLOSE)
+               DPRINTF("switch to unprotected mode before closing");
+
+       switch_mode(c, mode_unprotected);
+}
+
+static void backup_raio(struct tdcolo_state *c, void *buff, int size,
+                       int timeout_s, taio_callback *callback)
+{
+       td_async_io_t *rio = &c->rio;
+
+       rio->fd = c->t.fd;
+       rio->timeout_s = timeout_s;
+       rio->mode = td_async_read;
+       rio->buff = buff;
+       rio->size = size;
+       rio->callback = callback;
+
+       if (td_async_io_start(rio)) {
+               EPRINTF("cannot start read aio\n");
+               backup_failed(c, ERROR_INTERNAL);
+       }
+}
+
+static void backup_waio(struct tdcolo_state *c)
+{
+       td_async_io_t *wio = &c->wio;
+
+       wio->fd = c->t.fd;
+       wio->timeout_s = HEARTBEAT_S;
+       wio->mode = td_async_write;
+       wio->buff = TDCOLO_DONE;
+       wio->size = strlen(TDCOLO_DONE);
+       wio->callback = backup_write_done;
+
+       if (td_async_io_start(wio)) {
+               EPRINTF("cannot start write aio\n");
+               backup_failed(c, ERROR_INTERNAL);
+       }
+}
+
+static void backup_read_req_done(td_async_io_t *rio, int realsize,
+                                int errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+       char *req = c->request;
+       int rc;
+
+       rc = check_read_result(rio, realsize, "primary");
+       if (rc)
+               goto err;
+
+       rc = ERROR_INTERNAL;
+       req[4] = '\0';
+
+       if (!strcmp(req, TDCOLO_WRITE)) {
+               backup_raio(c, c->header, sizeof(c->header), HEARTBEAT_S,
+                           backup_read_header_done);
+               return;
+       } else if (!strcmp(req, TDCOLO_COMMIT)) {
+               ramdisk_destroy_hashtable(c->local);
+               c->local = ramdisk_new_hashtable();
+               if (!c->local) {
+                       EPRINTF("error creating local hashtable\n");
+                       goto err;
+               }
+               rc = ramdisk_start_flush(&c->ramdisk, &c->h);
+               if (rc) {
+                       EPRINTF("error flushing queued I/O\n");
+                       goto err;
+               }
+
+               backup_waio(c);
+       } else {
+               EPRINTF("unsupported request: %s\n", req);
+               goto err;
+       }
+
+       return;
+
+err:
+       backup_failed(c, ERROR_INTERNAL);
+       return;
+}
+
+static void backup_read_header_done(td_async_io_t *rio, int realsize,
+                                   int errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+       uint32_t *sectors = (uint32_t *)c->header;
+       int rc;
+
+       rc = check_read_result(rio, realsize, "primary");
+       if (rc)
+               goto err;
+
+       rc = ERROR_INTERNAL;
+       if (*sectors * c->sector_size > c->bsize) {
+               EPRINTF("write request is too large: %d/%d\n",
+                       *sectors * c->sector_size, c->bsize);
+               goto err;
+       }
+
+       backup_raio(c, c->buff, *sectors * c->sector_size, HEARTBEAT_S,
+                   backup_read_data_done);
+
+       return;
+err:
+       backup_failed(c, rc);
+}
+
+static void backup_read_data_done(td_async_io_t *rio, int realsize,
+                                 int errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+       uint32_t *sectors = (uint32_t *)c->header;
+       uint64_t *sector = (uint64_t *)(c->header + sizeof(uint32_t));
+       int rc;
+
+       rc = check_read_result(rio, realsize, "primary");
+       if (rc)
+               goto err;
+
+       rc = ramdisk_write_to_hashtable(c->h, *sector, *sectors,
+                                       c->sector_size, c->buff, "COLO");
+       if (rc) {
+               EPRINTF("cannot write primary data to hashtable\n");
+               rc = ERROR_INTERNAL;
+               goto err;
+       }
+
+       backup_raio(c, c->request, sizeof(c->request) - 1, 0,
+                   backup_read_req_done);
+
+       return;
+err:
+       backup_failed(c, rc);
+}
+
+static void backup_write_done(td_async_io_t *wio, int realsize, int errnoval)
+{
+       struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+       int rc;
+
+       rc = check_write_result(wio, realsize, "primary");
+       if (rc) {
+               backup_failed(c, rc);
+               return;
+       }
+
+       backup_raio(c, c->request, sizeof(c->request) - 1, 0,
+                   backup_read_req_done);
+}
+
+static void colo_server_established(td_replication_connect_t *t, int rc)
+{
+       struct tdcolo_state *c = CONTAINER_OF(t, *c, t);
+
+       if (rc) {
+               backup_failed(c, rc);
+               return;
+       }
+
+       backup_raio(c, c->request, sizeof(c->request) - 1, 0,
+                   backup_read_req_done);
+}
+
+/* It is called when switching the mode from backup to unprotected */
+static int backup_flush(struct tdcolo_state *c)
+{
+       int rc;
+
+       rc = ramdisk_start_flush(&c->ramdisk, &c->local);
+       if (rc)
+               EPRINTF("error flushing local queued I/O\n");
+
+       return 0;
+}
+
+static void backup_queue_read(td_driver_t *driver, td_request_t treq)
+{
+       struct tdcolo_state *c = driver->data;
+
+       if (ramdisk_read_from_hashtable(c->local, treq.sec, treq.secs,
+                                       c->sector_size, treq.buf))
+               /* FIXME */
+               td_forward_request(treq);
+       else
+               /* complete the request */
+               td_complete_request(treq, 0);
+}
+
+static void backup_queue_write(td_driver_t *driver, td_request_t treq)
+{
+       struct tdcolo_state *c = driver->data;
+       int rc;
+
+       rc = ramdisk_write_to_hashtable(c->local, treq.sec, treq.secs,
+                                       c->sector_size, treq.buf,
+                                       "COLO");
+       if (rc)
+               td_complete_request(treq, -EBUSY);
+       else
+               td_complete_request(treq, 0);
+}
+
+static int backup_start(struct tdcolo_state *c)
+{
+       tapdisk_colo.td_queue_read = backup_queue_read;
+       tapdisk_colo.td_queue_write = backup_queue_write;
+       c->queue_flush = backup_flush;
+
+       c->h = ramdisk_new_hashtable();
+       c->local = ramdisk_new_hashtable();
+       if (!c->h || !c->local)
+               return -1;
+
+       c->bsize = sysconf(_SC_PAGESIZE);
+       c->buff = malloc(c->bsize);
+       if (!c->buff)
+               return -1;
+
+       return 0;
+}
+
+/* ======== unprotected read/write functions ======== */
+void unprotected_queue_io(td_driver_t *driver, td_request_t treq)
+{
+       struct tdcolo_state *c = driver->data;
+
+       /* wait for previous ramdisk to flush  before servicing I/O */
+       if (ramdisk_writes_inflight(&c->ramdisk)) {
+               ramdisk_flush(&c->ramdisk);
+               td_complete_request(treq, -EBUSY);
+       } else {
+               /* here we just pass I/O through */
+               td_forward_request(treq);
+       }
+}
+
+static int unprotected_start(struct tdcolo_state *c)
+{
+       DPRINTF("failure detected, activating passthrough\n");
+
+       /* install the unprotected read/write handlers */
+       tapdisk_colo.td_queue_read = unprotected_queue_io;
+       tapdisk_colo.td_queue_write = unprotected_queue_io;
+       c->queue_flush = NULL;
+
+       return 0;
+}
+
+/* ======== failed read/write functions ======== */
+static void failed_queue_io(td_driver_t *driver, td_request_t treq)
+{
+       td_complete_request(treq, -EIO);
+}
+
+static int failed_start(struct tdcolo_state *c)
+{
+       tapdisk_colo.td_queue_read = failed_queue_io;
+       tapdisk_colo.td_queue_write = failed_queue_io;
+       c->queue_flush = NULL;
+
+       return 0;
+}
+
+/* ======== control ======== */
+static void colo_control_accept(event_id_t id, char mode, void *private);
+static void colo_control_handle_request(event_id_t id, char mode,
+                                       void *private);
+static void colo_control_close(colo_control_t *ctl);
+
+static void colo_control_init(colo_control_t *ctl)
+{
+       ctl->listen_fd = -1;
+       ctl->listen_id = -1;
+       ctl->io_fd = -1;
+       ctl->io_id = -1;
+}
+
+static int colo_create_control_socket(colo_control_t *ctl, const char *name)
+{
+       int i, l;
+       struct sockaddr_un saddr;
+       event_id_t id;
+       int rc;
+
+       /* first we must ensure that BLKTAP_CTRL_DIR exists */
+       if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST) {
+               rc = -errno;
+               EPRINTF("error creating directory %s: %d\n",
+                       BLKTAP_CTRL_DIR, errno);
+               goto fail;
+       }
+
+       /* use the device name to create the control socket path */
+       if (asprintf(&ctl->path, BLKTAP_CTRL_DIR "/colo_%s", name) < 0) {
+               rc = -errno;
+               goto fail;
+       }
+
+       /* scrub socket pathname  */
+       l = strlen(ctl->path);
+       for (i = strlen(BLKTAP_CTRL_DIR) + 1; i < l; i++) {
+               if (strchr(":/", ctl->path[i]))
+                       ctl->path[i] = '_';
+       }
+
+       if (unlink(ctl->path) && errno != ENOENT) {
+               rc = -errno;
+               EPRINTF("failed to unlink %s: %d\n", ctl->path, errno);
+               goto fail;
+       }
+
+       ctl->listen_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (ctl->listen_fd == -1) {
+               rc = -errno;
+               EPRINTF("failed to create control socket: %d\n", errno);
+               goto fail;
+       }
+
+       memset(&saddr, 0, sizeof(saddr));
+       strncpy(saddr.sun_path, ctl->path, sizeof(saddr.sun_path));
+       saddr.sun_family = AF_UNIX;
+
+       rc = bind(ctl->listen_fd, (const struct sockaddr *)&saddr,
+                 sizeof(saddr));
+       if (rc == -1) {
+               rc = -errno;
+               EPRINTF("failed to bind to %s: %d\n", saddr.sun_path, errno);
+               goto fail;
+       }
+
+       rc = listen(ctl->listen_fd, 10);
+       if (rc == -1) {
+               rc = -errno;
+               EPRINTF("failed to listen: %d\n", errno);
+               goto fail;
+       }
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                          ctl->listen_fd, 0,
+                                          colo_control_accept, ctl);
+       if (id < 0) {
+               EPRINTF("failed to add watch: %d\n", id);
+               rc = id;
+               goto fail;
+       }
+
+       ctl->listen_id = id;
+       return 0;
+
+fail:
+       colo_control_close(ctl);
+       return rc;
+}
+
+static void colo_control_accept(event_id_t id, char mode, void *private)
+{
+       colo_control_t *ctl = private;
+       int fd;
+
+       fd = accept(ctl->listen_fd, NULL, NULL);
+       if (fd == -1) {
+               EPRINTF("failed to accept new control connection: %d\n", errno);
+               return;
+       }
+
+       if (ctl->io_fd >= 0) {
+               EPRINTF("cannot accept two control connections\n");
+               close(fd);
+               return;
+       }
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                          fd, 0,
+                                          colo_control_handle_request,
+                                          ctl);
+       if (id < 0) {
+               close(fd);
+               EPRINTF("failed to register new control event: %d\n", id);
+               return;
+       }
+
+       ctl->io_fd = fd;
+       ctl->io_id = id;
+}
+
+static void colo_control_handle_request(event_id_t id, char mode, void 
*private)
+{
+       colo_control_t *ctl = private;
+       struct tdcolo_state *c = CONTAINER_OF(ctl, *c, ctl);
+       char req[6];
+       int rc;
+
+       rc = read(ctl->io_fd, req, sizeof(req) - 1);
+       if (!rc) {
+               EPRINTF("0-byte read received, close control socket\n");
+               goto err;
+       }
+
+       if (rc < 0) {
+               EPRINTF("error reading from control socket: %d\n", errno);
+               goto err;
+       }
+
+       req[rc] = '\0';
+       if (strncmp(req, "flush", 5)) {
+               EPRINTF("unknown command: %s\n", req);
+               colo_control_respond(ctl, TDCOLO_FAIL);
+               return;
+       }
+
+       if (c->mode != mode_primary) {
+               EPRINTF("invalid mode: %d\n", c->mode);
+               colo_control_respond(ctl, TDCOLO_FAIL);
+               return;
+       }
+
+       client_flush(c);
+       return;
+
+err:
+       UNREGISTER_EVENT(ctl->io_id);
+       CLOSE_FD(ctl->io_fd);
+       return;
+}
+
+static void colo_control_respond(colo_control_t *ctl, const char *response)
+{
+       int rc;
+
+       if (ctl->io_fd < 0)
+               return;
+
+       rc = write(ctl->io_fd, response, strlen(response));
+       if (rc < 0) {
+               EPRINTF("error writing notification: %d\n", errno);
+               CLOSE_FD(ctl->io_fd);
+       }
+}
+
+static void colo_control_close(colo_control_t *ctl)
+{
+       UNREGISTER_EVENT(ctl->listen_id);
+       UNREGISTER_EVENT(ctl->io_id);
+       CLOSE_FD(ctl->listen_fd);
+       CLOSE_FD(ctl->io_fd);
+
+       if (ctl->path) {
+               unlink(ctl->path);
+               free(ctl->path);
+               ctl->path = NULL;
+       }
+}
+
+/* ======== interface ======== */
+static int tdcolo_close(td_driver_t *driver);
+
+static int switch_mode(struct tdcolo_state *c, enum tdcolo_mode mode)
+{
+       int rc;
+
+       if (mode == c->mode)
+               return 0;
+
+       if (c->queue_flush)
+               if ((rc = c->queue_flush(c)) < 0) {
+                       /* fall back to unprotected mode on error */
+                       EPRINTF("switch_mode: error flushing queue (old: %d, 
new: %d)",
+                               c->mode, mode);
+                       mode = mode_unprotected;
+               }
+
+       if (mode == mode_unprotected)
+               rc = unprotected_start(c);
+       else if (mode == mode_primary)
+               rc = primary_start(c);
+       else if (mode == mode_backup)
+               rc = backup_start(c);
+       else if (mode == mode_failed)
+               rc = failed_start(c);
+       else {
+               EPRINTF("unknown mode requested: %d\n", mode);
+               rc = -1;
+       }
+
+       if (!rc)
+               c->mode = mode;
+
+       return rc;
+}
+
+static int tdcolo_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
+{
+       struct tdcolo_state *c = driver->data;
+       td_replication_connect_t *t = &c->t;
+       colo_control_t *ctl = &c->ctl;
+       ramdisk_t *ramdisk = &c->ramdisk;
+       int rc;
+       const char *name = image->name;
+       td_flag_t flags = image->flags;
+
+       DPRINTF("opening %s\n", name);
+
+       memset(c, 0, sizeof(*c));
+
+       /* init ramdisk */
+       ramdisk->log_prefix = "COLO";
+       ramdisk->sector_size = driver->info.sector_size;
+       ramdisk->image = image;
+       ramdisk_init(&c->ramdisk);
+
+       /* init async I/O */
+       td_async_io_init(&c->rio);
+       td_async_io_init(&c->wio);
+
+       c->sector_size = driver->info.sector_size;
+
+       /* init control socket */
+       colo_control_init(ctl);
+       rc = colo_create_control_socket(ctl, name);
+       if (rc)
+               return rc;
+
+       /* init async connection */
+       t->log_prefix = "COLO";
+       t->retry_timeout_s = COLO_CONNRETRY_TIMEOUT;
+       t->max_connections = 1;
+       t->callback = colo_server_established;
+       rc = td_replication_connect_init(t, name);
+       if (rc) {
+               colo_control_close(ctl);
+               return rc;
+       }
+
+       rc = td_replication_server_start(t);
+       if (!rc)
+               rc = switch_mode(c, mode_backup);
+       else if (rc == -2)
+               rc = switch_mode(c, mode_primary);
+
+       if (!rc)
+               return 0;
+
+       tdcolo_close(driver);
+       return -EIO;
+}
+
+static int tdcolo_pre_close(td_driver_t *driver)
+{
+       struct tdcolo_state *c = driver->data;
+
+       if (c->mode != mode_primary)
+               return 0;
+
+       if (td_replication_connect_status(&c->t))
+               return 0;
+
+       /*
+        * The connection is in progress, and we may queue some
+        * I/O requests.
+        */
+       primary_failed(c, ERROR_CLOSE);
+       return 0;
+}
+
+static int tdcolo_close(td_driver_t *driver)
+{
+       struct tdcolo_state *c = driver->data;
+
+       DPRINTF("closing\n");
+       ramdisk_destroy(&c->ramdisk);
+       ramdisk_destroy_hashtable(c->h);
+       ramdisk_destroy_hashtable(c->local);
+       td_replication_connect_kill(&c->t);
+       td_async_io_kill(&c->rio);
+       td_async_io_kill(&c->wio);
+       colo_control_close(&c->ctl);
+       free(c->buff);
+
+       return 0;
+}
+
+static int tdcolo_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
+{
+       /* we shouldn't have a parent... for now */
+       return -EINVAL;
+}
+
+static int tdcolo_validate_parent(td_driver_t *driver,
+                                 td_driver_t *pdriver, td_flag_t flags)
+{
+       return 0;
+}
+
+struct tap_disk tapdisk_colo = {
+       .disk_type          = "tapdisk_colo",
+       .private_data_size  = sizeof(struct tdcolo_state),
+       .td_open            = tdcolo_open,
+       .td_queue_read      = unprotected_queue_io,
+       .td_queue_write     = unprotected_queue_io,
+       .td_pre_close       = tdcolo_pre_close,
+       .td_close           = tdcolo_close,
+       .td_get_parent_id   = tdcolo_get_parent_id,
+       .td_validate_parent = tdcolo_validate_parent,
+};
diff --git a/tools/blktap2/drivers/block-replication.c 
b/tools/blktap2/drivers/block-replication.c
index 30eba8f..0992d19 100644
--- a/tools/blktap2/drivers/block-replication.c
+++ b/tools/blktap2/drivers/block-replication.c
@@ -911,6 +911,25 @@ int ramdisk_write_to_hashtable(struct hashtable *h, 
uint64_t sector,
        return 0;
 }
 
+int ramdisk_read_from_hashtable(struct hashtable *h, uint64_t sector,
+                               int nb_sectors, int sector_size,
+                               char *buf)
+{
+       int i;
+       uint64_t key;
+       char *v;
+
+       for (i = 0; i < nb_sectors; i++) {
+               key = sector + i;
+               v = hashtable_search(h, &key);
+               if (!v)
+                       return -1;
+               memcpy(buf + i * sector_size, v, sector_size);
+       }
+
+       return 0;
+}
+
 void ramdisk_destroy_hashtable(struct hashtable *h)
 {
        if (!h)
@@ -918,3 +937,180 @@ void ramdisk_destroy_hashtable(struct hashtable *h)
 
        hashtable_destroy(h, 1);
 }
+
+/* async I/O */
+static void td_async_io_readable(event_id_t id, char mode, void *private);
+static void td_async_io_writeable(event_id_t id, char mode, void *private);
+static void td_async_io_timeout(event_id_t id, char mode, void *private);
+
+void td_async_io_init(td_async_io_t *taio)
+{
+       memset(taio, 0, sizeof(*taio));
+       taio->fd = -1;
+       taio->timeout_id = -1;
+       taio->io_id = -1;
+}
+
+int td_async_io_start(td_async_io_t *taio)
+{
+       event_id_t id;
+
+       if (taio->running)
+               return -1;
+
+       if (taio->size <= 0 || taio->fd < 0)
+               return -1;
+
+       taio->running = 1;
+
+       if (taio->mode == td_async_read)
+               id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                                  taio->fd, 0,
+                                                  td_async_io_readable,
+                                                  taio);
+       else if (taio->mode == td_async_write)
+               id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+                                                  taio->fd, 0,
+                                                  td_async_io_writeable,
+                                                  taio);
+       else
+               id = -1;
+       if (id < 0)
+               goto err;
+       taio->io_id = id;
+
+       if (taio->timeout_s) {
+               id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+                                                  -1, taio->timeout_s,
+                                                  td_async_io_timeout, taio);
+               if (id < 0)
+                       goto err;
+               taio->timeout_id = id;
+       }
+
+       taio->used = 0;
+       return 0;
+
+err:
+       td_async_io_kill(taio);
+       return -1;
+}
+
+static void td_async_io_callback(td_async_io_t *taio, int realsize,
+                                int errnoval)
+{
+       td_async_io_kill(taio);
+       taio->callback(taio, realsize, errnoval);
+}
+
+static void td_async_io_update_timeout(td_async_io_t *taio)
+{
+       event_id_t id;
+
+       if (!taio->timeout_s)
+               return;
+
+       tapdisk_server_unregister_event(taio->timeout_id);
+       taio->timeout_id = -1;
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+                                          -1, taio->timeout_s,
+                                          td_async_io_timeout, taio);
+       if (id < 0)
+               td_async_io_callback(taio, -1, id);
+       else
+               taio->timeout_id = id;
+}
+
+static void td_async_io_readable(event_id_t id, char mode, void *private)
+{
+       td_async_io_t *taio = private;
+       int rc;
+
+       while (1) {
+               rc = read(taio->fd, taio->buff + taio->used,
+                         taio->size - taio->used);
+               if (rc < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       if (errno == EWOULDBLOCK || errno == EAGAIN)
+                               break;
+
+                       td_async_io_callback(taio, 0, errno);
+                       return;
+               }
+
+               if (rc == 0) {
+                       td_async_io_callback(taio, taio->used, 0);
+                       return;
+               }
+
+               taio->used += rc;
+               if (taio->used == taio->size) {
+                       td_async_io_callback(taio, taio->used, 0);
+                       return;
+               }
+       }
+
+       td_async_io_update_timeout(taio);
+}
+
+static void td_async_io_writeable(event_id_t id, char mode, void *private)
+{
+       td_async_io_t *taio = private;
+       int rc;
+
+       while (1) {
+               rc = write(taio->fd, taio->buff + taio->used,
+                          taio->size - taio->used);
+
+               if (rc < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       if (errno == EWOULDBLOCK || errno == EAGAIN)
+                               break;
+
+                       td_async_io_callback(taio, 0, errno);
+                       return;
+               }
+
+               taio->used += rc;
+               if (taio->used == taio->size) {
+                       td_async_io_callback(taio, taio->used, 0);
+                       return;
+               }
+       }
+
+       td_async_io_update_timeout(taio);
+}
+
+static void td_async_io_timeout(event_id_t id, char mode, void *private)
+{
+       td_async_io_t *taio = private;
+
+       td_async_io_kill(taio);
+       taio->callback(taio, 0, ETIME);
+}
+
+int td_async_io_is_running(td_async_io_t *taio)
+{
+       return taio->running;
+}
+
+void td_async_io_kill(td_async_io_t *taio)
+{
+       if (!taio->running)
+               return;
+
+       if (taio->timeout_id >= 0) {
+               tapdisk_server_unregister_event(taio->timeout_id);
+               taio->timeout_id = -1;
+       }
+
+       if (taio->io_id >= 0) {
+               tapdisk_server_unregister_event(taio->io_id);
+               taio->io_id = -1;
+       }
+
+       taio->running = 0;
+}
diff --git a/tools/blktap2/drivers/block-replication.h 
b/tools/blktap2/drivers/block-replication.h
index fdc216e..d39c530 100644
--- a/tools/blktap2/drivers/block-replication.h
+++ b/tools/blktap2/drivers/block-replication.h
@@ -156,6 +156,62 @@ struct hashtable *ramdisk_new_hashtable(void);
 int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
                               int nb_sectors, size_t sector_size, char* buf,
                               const char *log_prefix);
+int ramdisk_read_from_hashtable(struct hashtable *h, uint64_t sector,
+                               int nb_sectors, int sector_size,
+                               char *buf);
 void ramdisk_destroy_hashtable(struct hashtable *h);
 
+/* async I/O, don't support read/write at the same time */
+typedef struct td_async_io td_async_io_t;
+enum {
+       td_async_read,
+       td_async_write,
+};
+
+/*
+ * realsize >= 1 means all data was read/written
+ * realsize == 0 means failure happened when reading/writing, and
+ * errnoval is valid
+ * realsize == -1 means some other internal failure happended, and
+ * errnoval is also valid
+ * In all cases async_io is killed before calling this callback
+ *
+ * If we don't read/write any more data in timeout_s seconds, realsize is
+ * 0, and errnoval is ETIME
+ *
+ * If timeout_s is 0, timeout will be disabled.
+ *
+ * NOTE: realsize is less than taio->size, if we read EOF.
+ */
+typedef void taio_callback(td_async_io_t *taio, int realsize,
+                          int errnoval);
+
+struct td_async_io {
+       /* caller must fill these in, and they must all remain valid */
+       int fd;
+       int timeout_s;
+       int mode;
+       /*
+        * read: store the data to buff
+        * write: point to the data to be written
+        */
+       void *buff;
+       int size;
+       taio_callback *callback;
+
+       /* private */
+       event_id_t timeout_id, io_id;
+       int used;
+       int running;
+};
+
+/* Don't call it when td_async_io is running */
+void td_async_io_init(td_async_io_t *taio);
+/* return -1 if we find some error. Otherwise, return 0 */
+int td_async_io_start(td_async_io_t *taio);
+/* return 1 if td_async_io is running, otherwise return 0 */
+int td_async_io_is_running(td_async_io_t *taio);
+/* The callback will not be called */
+void td_async_io_kill(td_async_io_t *taio);
+
 #endif
diff --git a/tools/blktap2/drivers/tapdisk-disktype.c 
b/tools/blktap2/drivers/tapdisk-disktype.c
index 8d1383b..aa2afab 100644
--- a/tools/blktap2/drivers/tapdisk-disktype.c
+++ b/tools/blktap2/drivers/tapdisk-disktype.c
@@ -94,6 +94,12 @@ static const disk_info_t remus_disk = {
        0,
 };
 
+static const disk_info_t colo_disk = {
+       "colo",
+       "colo disk replicator (COLO)",
+       0,
+};
+
 const disk_info_t *tapdisk_disk_types[] = {
        [DISK_TYPE_AIO] = &aio_disk,
        [DISK_TYPE_SYNC]        = &sync_disk,
@@ -105,6 +111,7 @@ const disk_info_t *tapdisk_disk_types[] = {
        [DISK_TYPE_BLOCK_CACHE] = &block_cache_disk,
        [DISK_TYPE_LOG] = &log_disk,
        [DISK_TYPE_REMUS]       = &remus_disk,
+       [DISK_TYPE_COLO]        = &colo_disk,
        [DISK_TYPE_MAX]         = NULL,
 };
 
@@ -119,6 +126,7 @@ extern struct tap_disk tapdisk_block_cache;
 extern struct tap_disk tapdisk_vhd_index;
 extern struct tap_disk tapdisk_log;
 extern struct tap_disk tapdisk_remus;
+extern struct tap_disk tapdisk_colo;
 
 const struct tap_disk *tapdisk_disk_drivers[] = {
        [DISK_TYPE_AIO]         = &tapdisk_aio,
@@ -132,6 +140,7 @@ const struct tap_disk *tapdisk_disk_drivers[] = {
        [DISK_TYPE_BLOCK_CACHE] = &tapdisk_block_cache,
        [DISK_TYPE_LOG]         = &tapdisk_log,
        [DISK_TYPE_REMUS]       = &tapdisk_remus,
+       [DISK_TYPE_COLO]        = &tapdisk_colo,
        [DISK_TYPE_MAX]         = NULL,
 };
 
diff --git a/tools/blktap2/drivers/tapdisk-disktype.h 
b/tools/blktap2/drivers/tapdisk-disktype.h
index c574990..ee8cb02 100644
--- a/tools/blktap2/drivers/tapdisk-disktype.h
+++ b/tools/blktap2/drivers/tapdisk-disktype.h
@@ -39,7 +39,8 @@
 #define DISK_TYPE_BLOCK_CACHE 7
 #define DISK_TYPE_LOG         8
 #define DISK_TYPE_REMUS       9
-#define DISK_TYPE_MAX         10
+#define DISK_TYPE_COLO        10
+#define DISK_TYPE_MAX         11
 
 #define DISK_TYPE_NAME_MAX    32
 
-- 
1.9.3


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.