|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v3 17/22] block-colo: implement colo disk replication
TODO:
update block-remus to use async io to instead
of mread/mwrite.
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
tools/blktap2/drivers/Makefile | 3 +
tools/blktap2/drivers/block-colo.c | 1151 +++++++++++++++++++++++++++++
tools/blktap2/drivers/block-replication.c | 196 +++++
tools/blktap2/drivers/block-replication.h | 56 ++
tools/blktap2/drivers/tapdisk-disktype.c | 9 +
tools/blktap2/drivers/tapdisk-disktype.h | 3 +-
6 files changed, 1417 insertions(+), 1 deletion(-)
create mode 100644 tools/blktap2/drivers/block-colo.c
diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
index 3d8ed8a..21d21b9 100644
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -28,6 +28,8 @@ REMUS-OBJS += hashtable.o
REMUS-OBJS += hashtable_itr.o
REMUS-OBJS += hashtable_utility.o
+COLO-OBJS += block-colo.o
+
tapdisk2 tapdisk-stream tapdisk-diff $(QCOW_UTIL): AIOLIBS := -laio
MEMSHRLIBS :=
@@ -74,6 +76,7 @@ BLK-OBJS-y += aes.o
BLK-OBJS-y += md5.o
BLK-OBJS-y += $(PORTABLE-OBJS-y)
BLK-OBJS-y += $(REMUS-OBJS)
+BLK-OBJS-y += $(COLO-OBJS)
all: $(IBIN) lock-util qcow-util
diff --git a/tools/blktap2/drivers/block-colo.c
b/tools/blktap2/drivers/block-colo.c
new file mode 100644
index 0000000..565a386
--- /dev/null
+++ b/tools/blktap2/drivers/block-colo.c
@@ -0,0 +1,1151 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "tapdisk.h"
+#include "tapdisk-server.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-interface.h"
+#include "block-replication.h"
+
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+/* connect retry timeout (seconds) */
+#define COLO_CONNRETRY_TIMEOUT 1
+
+/* timeout for reads and writes in second */
+#define HEARTBEAT_S 1
+
+/* TAPDISK_DATA_REQUESTS I/O requests + commit flag */
+#define MAX_COLO_REQUEST TAPDISK_DATA_REQUESTS + 1
+
+#undef DPRINTF
+#undef EPRINTF
+#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "COLO: " _f, ## _a)
+#define EPRINTF(_f, _a...) syslog (LOG_ERR, "COLO: " _f, ## _a)
+
+#define TDCOLO_WRITE "wreq"
+#define TDCOLO_COMMIT "creq"
+#define TDCOLO_DONE "done"
+#define TDCOLO_FAIL "fail"
+
+enum tdcolo_mode {
+ mode_invalid = 0,
+ mode_unprotected,
+ mode_primary,
+ mode_backup,
+
+ /*
+ * If we find some internal error in backup mode, we cannot
+ * switch to unprotected mode.
+ */
+ mode_failed,
+};
+
+enum {
+ colo_io,
+ colo_commit,
+};
+
+typedef struct queued_io {
+ int type;
+ union {
+ td_request_t treq;
+ char *buff; /* TDCOLO_COMMIT */
+ };
+} queued_io_t;
+
+struct queued_io_ring {
+ /* waste one slot to distinguish between empty and full */
+ queued_io_t qio[MAX_COLO_REQUEST + 1];
+ unsigned int prod;
+ unsigned int cons;
+};
+
+typedef struct colo_control {
+ /*
+ * socket file, the user writes "flush" to this socket, and then
+ * we write the result to it.
+ */
+ char *path;
+ int listen_fd;
+ event_id_t listen_id;
+
+ int io_fd;
+ event_id_t io_id;
+} colo_control_t;
+
+struct tdcolo_state {
+ colo_control_t ctl;
+
+ /* async connection */
+ td_replication_connect_t t;
+ /* replication channel */
+ td_async_io_t rio, wio;
+
+ /*
+ * queue I/O requests, and they will be forwarded to backup
+ * asynchronously.
+ */
+ struct queued_io_ring qio_ring;
+
+ /* ramdisk data */
+ struct ramdisk ramdisk;
+ /*
+ * The primary write request is queued in this
+ * hashtable, and will be flushed to ramdisk when
+ * the checkpoint finishes.
+ */
+ struct hashtable *h;
+ /*
+ * The secondary vm write request is queued in this
+ * hashtable, and will be dropped when the checkpoint
+ * finishes or flushed to ramdisk after failover.
+ */
+ struct hashtable *local;
+
+ /* mode methods */
+ enum tdcolo_mode mode;
+ /* It will be called when switching mode */
+ int (*queue_flush)(struct tdcolo_state *c);
+
+ char request[5];
+ char header[sizeof(uint32_t) + sizeof(uint64_t)];
+ int commit;
+ void *buff;
+ int bsize;
+ int sector_size;
+};
+
+struct tap_disk tapdisk_colo;
+
+static void colo_control_respond(colo_control_t *ctl, const char *response);
+static int switch_mode(struct tdcolo_state *c, enum tdcolo_mode mode);
+
+/* ======== common functions ======== */
+static int check_read_result(td_async_io_t *rio, int realsize,
+ const char *target)
+{
+ if (realsize < 0) {
+ /* internal error */
+ EPRINTF("error reading from %s\n", target);
+ return ERROR_INTERNAL;
+ } else if (realsize < rio->size) {
+ /* timeout or I/O error */
+ EPRINTF("error reading from %s\n", target);
+ return ERROR_IO;
+ }
+
+ return 0;
+}
+
+static int check_write_result(td_async_io_t *wio, int realsize,
+ const char * target)
+{
+ if (realsize < 0) {
+ /* internal error */
+ EPRINTF("error writing to %s\n", target);
+ return ERROR_INTERNAL;
+ } else if (realsize == 0) {
+ /* timeout or I/O error */
+ EPRINTF("error writing to %s\n", target);
+ return ERROR_IO;
+ }
+
+ return 0;
+}
+
+/* ======= ring functions ======== */
+static inline unsigned int ring_next(unsigned int pos)
+{
+ if (++pos > MAX_COLO_REQUEST)
+ return 0;
+
+ return pos;
+}
+
+static inline int ring_isempty(struct queued_io_ring* ring)
+{
+ return ring->cons == ring->prod;
+}
+
+static inline int ring_isfull(struct queued_io_ring* ring)
+{
+ return ring_next(ring->prod) == ring->cons;
+}
+
+static void ring_add_request(struct queued_io_ring *ring,
+ const td_request_t *treq)
+{
+ /* If ring is full, it means that tapdisk2 has some bug */
+ if (ring_isfull(ring)) {
+ EPRINTF("OOPS, ring is full\n");
+ exit(1);
+ }
+
+ ring->qio[ring->prod].type = colo_io;
+ ring->qio[ring->prod].treq = *treq;
+ ring->prod = ring_next(ring->prod);
+}
+
+static void ring_add_commit_flag(struct queued_io_ring *ring)
+{
+ /* If ring is full, it means that tapdisk2 has some bug */
+ if (ring_isfull(ring)) {
+ EPRINTF("OOPS, ring is full\n");
+ exit(1);
+ }
+
+ ring->qio[ring->prod].type = colo_commit;
+ ring->qio[ring->prod].buff = TDCOLO_COMMIT;
+ ring->prod = ring_next(ring->prod);
+}
+
+/* return the first queued I/O request */
+static queued_io_t *ring_peek(struct queued_io_ring *ring)
+{
+ queued_io_t *qio;
+
+ if (ring_isempty(ring))
+ return NULL;
+
+ qio = &ring->qio[ring->cons];
+ return qio;
+}
+
+/* consume the first queued I/O request, and return it */
+static queued_io_t *ring_get(struct queued_io_ring *ring)
+{
+ queued_io_t *qio;
+
+ if (ring_isempty(ring))
+ return NULL;
+
+ qio = &ring->qio[ring->cons];
+ ring->cons = ring_next(ring->cons);
+ return qio;
+}
+
+/* ======== primary read/write functions ======== */
+static void primary_write_header(td_async_io_t *wio, int realsize, int
errnoval);
+static void primary_write_data(td_async_io_t *wio, int realsize, int errnoval);
+static void primary_forward_done(td_async_io_t *wio, int realsize, int
errnoval);
+static void primary_read_done(td_async_io_t *rio, int realsize, int errnoval);
+
+/*
+ * It is called when we cannot connect to backup, or find I/O error when
+ * reading/writing.
+ */
+static void primary_failed(struct tdcolo_state *c, int rc)
+{
+ td_replication_connect_kill(&c->t);
+ td_async_io_kill(&c->rio);
+ td_async_io_kill(&c->wio);
+ if (rc == ERROR_INTERNAL)
+ EPRINTF("switch to unprotected mode due to internal error");
+ if (rc == ERROR_CLOSE)
+ DPRINTF("switch to unprotected mode before closing");
+ switch_mode(c, mode_unprotected);
+}
+
+static void primary_waio(struct tdcolo_state *c, void *buff, size_t size,
+ taio_callback *callback)
+{
+ td_async_io_t *wio = &c->wio;
+
+ wio->fd = c->t.fd;
+ wio->timeout_s = HEARTBEAT_S;
+ wio->mode = td_async_write;
+ wio->buff = buff;
+ wio->size = size;
+ wio->callback = callback;
+
+ if (td_async_io_start(wio))
+ primary_failed(c, ERROR_INTERNAL);
+}
+
+static void primary_raio(struct tdcolo_state *c)
+{
+ td_async_io_t *rio = &c->rio;
+
+ if (c->t.fd < 0)
+ return;
+
+ rio->fd = c->t.fd;
+ rio->timeout_s = 0;
+ rio->mode = td_async_read;
+ rio->buff = c->request;
+ rio->size = sizeof(c->request) - 1;
+ rio->callback = primary_read_done;
+
+ if (td_async_io_start(rio))
+ primary_failed(c, ERROR_INTERNAL);
+}
+
+static void primary_handle_queued_io(struct tdcolo_state *c)
+{
+ struct queued_io_ring *qring = &c->qio_ring;
+ unsigned int cons;
+ queued_io_t *qio;
+ int rc;
+
+ while (!ring_isempty(qring)) {
+ qio = ring_peek(qring);
+ if (qio->type == colo_commit) {
+ primary_waio(c, qio->buff, strlen(qio->buff),
+ primary_forward_done);
+ return;
+ }
+
+ if (qio->treq.op == TD_OP_WRITE) {
+ primary_waio(c, TDCOLO_WRITE, strlen(TDCOLO_WRITE),
+ primary_write_header);
+ return;
+ }
+
+ td_forward_request(qio->treq);
+ ring_get(qring);
+ }
+}
+
+/* wait for "done" message to commit checkpoint */
+static void primary_read_done(td_async_io_t *rio, int realsize, int errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+ char *req = c->request;
+ int rc;
+
+ rc = check_read_result(rio, realsize, "backup");
+ if (rc)
+ goto err;
+
+ rc = ERROR_INTERNAL;
+ req[4] = '\0';
+
+ if (c->commit != 1) {
+ EPRINTF("received unexpected message: %s\n", req);
+ goto err;
+ }
+
+ c->commit--;
+
+ if (strcmp(req, TDCOLO_DONE)) {
+ EPRINTF("received unknown message: %s\n", req);
+ goto err;
+ }
+
+ /* checkpoint committed, inform msg_fd */
+ colo_control_respond(&c->ctl, TDCOLO_DONE);
+ primary_raio(c);
+
+ return;
+err:
+ colo_control_respond(&c->ctl, TDCOLO_FAIL);
+ primary_failed(c, rc);
+}
+
+static void primary_write_header(td_async_io_t *wio, int realsize, int
errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+ queued_io_t *qio = ring_peek(&c->qio_ring);
+ uint32_t *sectors = (uint32_t *)c->header;
+ uint64_t *sector = (uint64_t *)(c->header + sizeof(uint32_t));
+ int rc;
+
+ rc = check_write_result(wio, realsize, "backup");
+ if (rc) {
+ primary_failed(c, rc);
+ return;
+ }
+
+ *sectors = qio->treq.secs;
+ *sector = qio->treq.sec;
+
+ primary_waio(c, c->header, sizeof(c->header), primary_write_data);
+}
+
+static void primary_write_data(td_async_io_t *wio, int realsize, int errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+ queued_io_t *qio = ring_peek(&c->qio_ring);
+ int rc;
+
+ rc = check_write_result(wio, realsize, "backup");
+ if (rc) {
+ primary_failed(c, rc);
+ return;
+ }
+
+ primary_waio(c, qio->treq.buf, qio->treq.secs * c->sector_size,
+ primary_forward_done);
+}
+
+static void primary_forward_done(td_async_io_t *wio, int realsize, int
errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+ queued_io_t *qio;
+ struct td_request_t *treq;
+ int rc;
+
+ rc = check_write_result(wio, realsize, "backup");
+ if (rc) {
+ primary_failed(c, rc);
+ return;
+ }
+
+ qio = ring_get(&c->qio_ring);
+ if (qio->type == colo_io)
+ td_forward_request(qio->treq);
+ else
+ c->commit--;
+
+ primary_handle_queued_io(c);
+}
+
+static void primary_queue_read(td_driver_t *driver, td_request_t treq)
+{
+ struct tdcolo_state *c = driver->data;
+ struct queued_io_ring *ring = &c->qio_ring;
+
+ if (ring_isempty(ring)) {
+ /* just pass read through */
+ td_forward_request(treq);
+ return;
+ }
+
+ ring_add_request(ring, &treq);
+ if (td_replication_connect_status(&c->t) != 1)
+ return;
+
+ if (!td_async_io_is_running(&c->wio))
+ primary_handle_queued_io(c);
+}
+
+static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+{
+ struct tdcolo_state *c = driver->data;
+ struct queued_io_ring *ring = &c->qio_ring;
+
+ ring_add_request(ring, &treq);
+ if (td_replication_connect_status(&c->t) != 1)
+ return;
+
+ if (!td_async_io_is_running(&c->wio))
+ primary_handle_queued_io(c);
+}
+
+/* It is called when the user write "flush" to control file. */
+static int client_flush(struct tdcolo_state *c)
+{
+ if (td_replication_connect_status(&c->t) != 1)
+ return 0;
+
+ if (c->commit > 0) {
+ EPRINTF("the last commit is not finished\n");
+ colo_control_respond(&c->ctl, TDCOLO_FAIL);
+ primary_failed(c, ERROR_INTERNAL);
+ return -1;
+ }
+
+ ring_add_commit_flag(&c->qio_ring);
+ c->commit = 2;
+ if (!td_async_io_is_running(&c->wio))
+ primary_handle_queued_io(c);
+
+ return 0;
+}
+
+/* It is called when switching the mode from primary to unprotected */
+static int primary_flush(struct tdcolo_state *c)
+{
+ struct queued_io_ring *qring = &c->qio_ring;
+ queued_io_t *qio;
+
+ if (ring_isempty(qring))
+ return 0;
+
+ while (!ring_isempty(qring)) {
+ qio = ring_get(qring);
+
+ if (qio->type == colo_commit) {
+ colo_control_respond(&c->ctl, TDCOLO_FAIL);
+ c->commit = 0;
+ continue;
+ }
+
+ td_forward_request(qio->treq);
+ }
+
+ return 0;
+}
+
+static void colo_client_established(td_replication_connect_t *t, int rc)
+{
+ struct tdcolo_state *c = CONTAINER_OF(t, *c, t);
+
+ if (rc) {
+ primary_failed(c, rc);
+ return;
+ }
+
+ /* the connect succeeded and handle the queued requests */
+ primary_handle_queued_io(c);
+
+ primary_raio(c);
+}
+
+static int primary_start(struct tdcolo_state *c)
+{
+ DPRINTF("activating client mode\n");
+
+ tapdisk_colo.td_queue_read = primary_queue_read;
+ tapdisk_colo.td_queue_write = primary_queue_write;
+ c->queue_flush = primary_flush;
+
+ c->t.callback = colo_client_established;
+ return td_replication_client_start(&c->t);
+}
+
+/* ======== backup read/write functions ======== */
+static void backup_read_header_done(td_async_io_t *rio, int realsize,
+ int errnoval);
+static void backup_read_data_done(td_async_io_t *rio, int realsize,
+ int errnoval);
+static void backup_write_done(td_async_io_t *wio, int realsize, int errnoval);
+
+static void backup_failed(struct tdcolo_state *c, int rc)
+{
+ td_replication_connect_kill(&c->t);
+ td_async_io_kill(&c->rio);
+ td_async_io_kill(&c->wio);
+
+ if (rc == ERROR_INTERNAL) {
+ EPRINTF("switch to failed mode due to internal error");
+ switch_mode(c, mode_failed);
+ return;
+ }
+
+ if (rc == ERROR_CLOSE)
+ DPRINTF("switch to unprotected mode before closing");
+
+ switch_mode(c, mode_unprotected);
+}
+
+static void backup_raio(struct tdcolo_state *c, void *buff, int size,
+ int timeout_s, taio_callback *callback)
+{
+ td_async_io_t *rio = &c->rio;
+
+ rio->fd = c->t.fd;
+ rio->timeout_s = timeout_s;
+ rio->mode = td_async_read;
+ rio->buff = buff;
+ rio->size = size;
+ rio->callback = callback;
+
+ if (td_async_io_start(rio)) {
+ EPRINTF("cannot start read aio\n");
+ backup_failed(c, ERROR_INTERNAL);
+ }
+}
+
+static void backup_waio(struct tdcolo_state *c)
+{
+ td_async_io_t *wio = &c->wio;
+
+ wio->fd = c->t.fd;
+ wio->timeout_s = HEARTBEAT_S;
+ wio->mode = td_async_write;
+ wio->buff = TDCOLO_DONE;
+ wio->size = strlen(TDCOLO_DONE);
+ wio->callback = backup_write_done;
+
+ if (td_async_io_start(wio)) {
+ EPRINTF("cannot start write aio\n");
+ backup_failed(c, ERROR_INTERNAL);
+ }
+}
+
+static void backup_read_req_done(td_async_io_t *rio, int realsize,
+ int errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+ char *req = c->request;
+ int rc;
+
+ rc = check_read_result(rio, realsize, "primary");
+ if (rc)
+ goto err;
+
+ rc = ERROR_INTERNAL;
+ req[4] = '\0';
+
+ if (!strcmp(req, TDCOLO_WRITE)) {
+ backup_raio(c, c->header, sizeof(c->header), HEARTBEAT_S,
+ backup_read_header_done);
+ return;
+ } else if (!strcmp(req, TDCOLO_COMMIT)) {
+ ramdisk_destroy_hashtable(c->local);
+ c->local = ramdisk_new_hashtable();
+ if (!c->local) {
+ EPRINTF("error creating local hashtable\n");
+ goto err;
+ }
+ rc = ramdisk_start_flush(&c->ramdisk, &c->h);
+ if (rc) {
+ EPRINTF("error flushing queued I/O\n");
+ goto err;
+ }
+
+ backup_waio(c);
+ } else {
+ EPRINTF("unsupported request: %s\n", req);
+ goto err;
+ }
+
+ return;
+
+err:
+ backup_failed(c, ERROR_INTERNAL);
+ return;
+}
+
+static void backup_read_header_done(td_async_io_t *rio, int realsize,
+ int errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+ uint32_t *sectors = (uint32_t *)c->header;
+ int rc;
+
+ rc = check_read_result(rio, realsize, "primary");
+ if (rc)
+ goto err;
+
+ rc = ERROR_INTERNAL;
+ if (*sectors * c->sector_size > c->bsize) {
+ EPRINTF("write request is too large: %d/%d\n",
+ *sectors * c->sector_size, c->bsize);
+ goto err;
+ }
+
+ backup_raio(c, c->buff, *sectors * c->sector_size, HEARTBEAT_S,
+ backup_read_data_done);
+
+ return;
+err:
+ backup_failed(c, rc);
+}
+
+static void backup_read_data_done(td_async_io_t *rio, int realsize,
+ int errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(rio, *c, rio);
+ uint32_t *sectors = (uint32_t *)c->header;
+ uint64_t *sector = (uint64_t *)(c->header + sizeof(uint32_t));
+ int rc;
+
+ rc = check_read_result(rio, realsize, "primary");
+ if (rc)
+ goto err;
+
+ rc = ramdisk_write_to_hashtable(c->h, *sector, *sectors,
+ c->sector_size, c->buff, "COLO");
+ if (rc) {
+ EPRINTF("cannot write primary data to hashtable\n");
+ rc = ERROR_INTERNAL;
+ goto err;
+ }
+
+ backup_raio(c, c->request, sizeof(c->request) - 1, 0,
+ backup_read_req_done);
+
+ return;
+err:
+ backup_failed(c, rc);
+}
+
+static void backup_write_done(td_async_io_t *wio, int realsize, int errnoval)
+{
+ struct tdcolo_state *c = CONTAINER_OF(wio, *c, wio);
+ int rc;
+
+ rc = check_write_result(wio, realsize, "primary");
+ if (rc) {
+ backup_failed(c, rc);
+ return;
+ }
+
+ backup_raio(c, c->request, sizeof(c->request) - 1, 0,
+ backup_read_req_done);
+}
+
+static void colo_server_established(td_replication_connect_t *t, int rc)
+{
+ struct tdcolo_state *c = CONTAINER_OF(t, *c, t);
+
+ if (rc) {
+ backup_failed(c, rc);
+ return;
+ }
+
+ backup_raio(c, c->request, sizeof(c->request) - 1, 0,
+ backup_read_req_done);
+}
+
+/* It is called when switching the mode from backup to unprotected */
+static int backup_flush(struct tdcolo_state *c)
+{
+ int rc;
+
+ rc = ramdisk_start_flush(&c->ramdisk, &c->local);
+ if (rc)
+ EPRINTF("error flushing local queued I/O\n");
+
+ return 0;
+}
+
+static void backup_queue_read(td_driver_t *driver, td_request_t treq)
+{
+ struct tdcolo_state *c = driver->data;
+
+ if (ramdisk_read_from_hashtable(c->local, treq.sec, treq.secs,
+ c->sector_size, treq.buf))
+ /* FIXME */
+ td_forward_request(treq);
+ else
+ /* complete the request */
+ td_complete_request(treq, 0);
+}
+
+static void backup_queue_write(td_driver_t *driver, td_request_t treq)
+{
+ struct tdcolo_state *c = driver->data;
+ int rc;
+
+ rc = ramdisk_write_to_hashtable(c->local, treq.sec, treq.secs,
+ c->sector_size, treq.buf,
+ "COLO");
+ if (rc)
+ td_complete_request(treq, -EBUSY);
+ else
+ td_complete_request(treq, 0);
+}
+
+static int backup_start(struct tdcolo_state *c)
+{
+ tapdisk_colo.td_queue_read = backup_queue_read;
+ tapdisk_colo.td_queue_write = backup_queue_write;
+ c->queue_flush = backup_flush;
+
+ c->h = ramdisk_new_hashtable();
+ c->local = ramdisk_new_hashtable();
+ if (!c->h || !c->local)
+ return -1;
+
+ c->bsize = sysconf(_SC_PAGESIZE);
+ c->buff = malloc(c->bsize);
+ if (!c->buff)
+ return -1;
+
+ return 0;
+}
+
+/* ======== unprotected read/write functions ======== */
+void unprotected_queue_io(td_driver_t *driver, td_request_t treq)
+{
+ struct tdcolo_state *c = driver->data;
+
+ /* wait for previous ramdisk to flush before servicing I/O */
+ if (ramdisk_writes_inflight(&c->ramdisk)) {
+ ramdisk_flush(&c->ramdisk);
+ td_complete_request(treq, -EBUSY);
+ } else {
+ /* here we just pass I/O through */
+ td_forward_request(treq);
+ }
+}
+
+static int unprotected_start(struct tdcolo_state *c)
+{
+ DPRINTF("failure detected, activating passthrough\n");
+
+ /* install the unprotected read/write handlers */
+ tapdisk_colo.td_queue_read = unprotected_queue_io;
+ tapdisk_colo.td_queue_write = unprotected_queue_io;
+ c->queue_flush = NULL;
+
+ return 0;
+}
+
+/* ======== failed read/write functions ======== */
+static void failed_queue_io(td_driver_t *driver, td_request_t treq)
+{
+ td_complete_request(treq, -EIO);
+}
+
+static int failed_start(struct tdcolo_state *c)
+{
+ tapdisk_colo.td_queue_read = failed_queue_io;
+ tapdisk_colo.td_queue_write = failed_queue_io;
+ c->queue_flush = NULL;
+
+ return 0;
+}
+
+/* ======== control ======== */
+static void colo_control_accept(event_id_t id, char mode, void *private);
+static void colo_control_handle_request(event_id_t id, char mode,
+ void *private);
+static void colo_control_close(colo_control_t *ctl);
+
+static void colo_control_init(colo_control_t *ctl)
+{
+ ctl->listen_fd = -1;
+ ctl->listen_id = -1;
+ ctl->io_fd = -1;
+ ctl->io_id = -1;
+}
+
+static int colo_create_control_socket(colo_control_t *ctl, const char *name)
+{
+ int i, l;
+ struct sockaddr_un saddr;
+ event_id_t id;
+ int rc;
+
+ /* first we must ensure that BLKTAP_CTRL_DIR exists */
+ if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST) {
+ rc = -errno;
+ EPRINTF("error creating directory %s: %d\n",
+ BLKTAP_CTRL_DIR, errno);
+ goto fail;
+ }
+
+ /* use the device name to create the control socket path */
+ if (asprintf(&ctl->path, BLKTAP_CTRL_DIR "/colo_%s", name) < 0) {
+ rc = -errno;
+ goto fail;
+ }
+
+ /* scrub socket pathname */
+ l = strlen(ctl->path);
+ for (i = strlen(BLKTAP_CTRL_DIR) + 1; i < l; i++) {
+ if (strchr(":/", ctl->path[i]))
+ ctl->path[i] = '_';
+ }
+
+ if (unlink(ctl->path) && errno != ENOENT) {
+ rc = -errno;
+ EPRINTF("failed to unlink %s: %d\n", ctl->path, errno);
+ goto fail;
+ }
+
+ ctl->listen_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (ctl->listen_fd == -1) {
+ rc = -errno;
+ EPRINTF("failed to create control socket: %d\n", errno);
+ goto fail;
+ }
+
+ memset(&saddr, 0, sizeof(saddr));
+ strncpy(saddr.sun_path, ctl->path, sizeof(saddr.sun_path));
+ saddr.sun_family = AF_UNIX;
+
+ rc = bind(ctl->listen_fd, (const struct sockaddr *)&saddr,
+ sizeof(saddr));
+ if (rc == -1) {
+ rc = -errno;
+ EPRINTF("failed to bind to %s: %d\n", saddr.sun_path, errno);
+ goto fail;
+ }
+
+ rc = listen(ctl->listen_fd, 10);
+ if (rc == -1) {
+ rc = -errno;
+ EPRINTF("failed to listen: %d\n", errno);
+ goto fail;
+ }
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ ctl->listen_fd, 0,
+ colo_control_accept, ctl);
+ if (id < 0) {
+ EPRINTF("failed to add watch: %d\n", id);
+ rc = id;
+ goto fail;
+ }
+
+ ctl->listen_id = id;
+ return 0;
+
+fail:
+ colo_control_close(ctl);
+ return rc;
+}
+
+static void colo_control_accept(event_id_t id, char mode, void *private)
+{
+ colo_control_t *ctl = private;
+ int fd;
+
+ fd = accept(ctl->listen_fd, NULL, NULL);
+ if (fd == -1) {
+ EPRINTF("failed to accept new control connection: %d\n", errno);
+ return;
+ }
+
+ if (ctl->io_fd >= 0) {
+ EPRINTF("cannot accept two control connections\n");
+ close(fd);
+ return;
+ }
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ fd, 0,
+ colo_control_handle_request,
+ ctl);
+ if (id < 0) {
+ close(fd);
+ EPRINTF("failed to register new control event: %d\n", id);
+ return;
+ }
+
+ ctl->io_fd = fd;
+ ctl->io_id = id;
+}
+
+static void colo_control_handle_request(event_id_t id, char mode, void
*private)
+{
+ colo_control_t *ctl = private;
+ struct tdcolo_state *c = CONTAINER_OF(ctl, *c, ctl);
+ char req[6];
+ int rc;
+
+ rc = read(ctl->io_fd, req, sizeof(req) - 1);
+ if (!rc) {
+ EPRINTF("0-byte read received, close control socket\n");
+ goto err;
+ }
+
+ if (rc < 0) {
+ EPRINTF("error reading from control socket: %d\n", errno);
+ goto err;
+ }
+
+ req[rc] = '\0';
+ if (strncmp(req, "flush", 5)) {
+ EPRINTF("unknown command: %s\n", req);
+ colo_control_respond(ctl, TDCOLO_FAIL);
+ return;
+ }
+
+ if (c->mode != mode_primary) {
+ EPRINTF("invalid mode: %d\n", c->mode);
+ colo_control_respond(ctl, TDCOLO_FAIL);
+ return;
+ }
+
+ client_flush(c);
+ return;
+
+err:
+ UNREGISTER_EVENT(ctl->io_id);
+ CLOSE_FD(ctl->io_fd);
+ return;
+}
+
+static void colo_control_respond(colo_control_t *ctl, const char *response)
+{
+ int rc;
+
+ if (ctl->io_fd < 0)
+ return;
+
+ rc = write(ctl->io_fd, response, strlen(response));
+ if (rc < 0) {
+ EPRINTF("error writing notification: %d\n", errno);
+ CLOSE_FD(ctl->io_fd);
+ }
+}
+
+static void colo_control_close(colo_control_t *ctl)
+{
+ UNREGISTER_EVENT(ctl->listen_id);
+ UNREGISTER_EVENT(ctl->io_id);
+ CLOSE_FD(ctl->listen_fd);
+ CLOSE_FD(ctl->io_fd);
+
+ if (ctl->path) {
+ unlink(ctl->path);
+ free(ctl->path);
+ ctl->path = NULL;
+ }
+}
+
+/* ======== interface ======== */
+static int tdcolo_close(td_driver_t *driver);
+
+static int switch_mode(struct tdcolo_state *c, enum tdcolo_mode mode)
+{
+ int rc;
+
+ if (mode == c->mode)
+ return 0;
+
+ if (c->queue_flush)
+ if ((rc = c->queue_flush(c)) < 0) {
+ /* fall back to unprotected mode on error */
+ EPRINTF("switch_mode: error flushing queue (old: %d,
new: %d)",
+ c->mode, mode);
+ mode = mode_unprotected;
+ }
+
+ if (mode == mode_unprotected)
+ rc = unprotected_start(c);
+ else if (mode == mode_primary)
+ rc = primary_start(c);
+ else if (mode == mode_backup)
+ rc = backup_start(c);
+ else if (mode == mode_failed)
+ rc = failed_start(c);
+ else {
+ EPRINTF("unknown mode requested: %d\n", mode);
+ rc = -1;
+ }
+
+ if (!rc)
+ c->mode = mode;
+
+ return rc;
+}
+
+static int tdcolo_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
+{
+ struct tdcolo_state *c = driver->data;
+ td_replication_connect_t *t = &c->t;
+ colo_control_t *ctl = &c->ctl;
+ ramdisk_t *ramdisk = &c->ramdisk;
+ int rc;
+ const char *name = image->name;
+ td_flag_t flags = image->flags;
+
+ DPRINTF("opening %s\n", name);
+
+ memset(c, 0, sizeof(*c));
+
+ /* init ramdisk */
+ ramdisk->log_prefix = "COLO";
+ ramdisk->sector_size = driver->info.sector_size;
+ ramdisk->image = image;
+ ramdisk_init(&c->ramdisk);
+
+ /* init async I/O */
+ td_async_io_init(&c->rio);
+ td_async_io_init(&c->wio);
+
+ c->sector_size = driver->info.sector_size;
+
+ /* init control socket */
+ colo_control_init(ctl);
+ rc = colo_create_control_socket(ctl, name);
+ if (rc)
+ return rc;
+
+ /* init async connection */
+ t->log_prefix = "COLO";
+ t->retry_timeout_s = COLO_CONNRETRY_TIMEOUT;
+ t->max_connections = 1;
+ t->callback = colo_server_established;
+ rc = td_replication_connect_init(t, name);
+ if (rc) {
+ colo_control_close(ctl);
+ return rc;
+ }
+
+ rc = td_replication_server_start(t);
+ if (!rc)
+ rc = switch_mode(c, mode_backup);
+ else if (rc == -2)
+ rc = switch_mode(c, mode_primary);
+
+ if (!rc)
+ return 0;
+
+ tdcolo_close(driver);
+ return -EIO;
+}
+
+static int tdcolo_pre_close(td_driver_t *driver)
+{
+ struct tdcolo_state *c = driver->data;
+
+ if (c->mode != mode_primary)
+ return 0;
+
+ if (td_replication_connect_status(&c->t))
+ return 0;
+
+ /*
+ * The connection is in progress, and we may queue some
+ * I/O requests.
+ */
+ primary_failed(c, ERROR_CLOSE);
+ return 0;
+}
+
+static int tdcolo_close(td_driver_t *driver)
+{
+ struct tdcolo_state *c = driver->data;
+
+ DPRINTF("closing\n");
+ ramdisk_destroy(&c->ramdisk);
+ ramdisk_destroy_hashtable(c->h);
+ ramdisk_destroy_hashtable(c->local);
+ td_replication_connect_kill(&c->t);
+ td_async_io_kill(&c->rio);
+ td_async_io_kill(&c->wio);
+ colo_control_close(&c->ctl);
+ free(c->buff);
+
+ return 0;
+}
+
+static int tdcolo_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
+{
+ /* we shouldn't have a parent... for now */
+ return -EINVAL;
+}
+
+static int tdcolo_validate_parent(td_driver_t *driver,
+ td_driver_t *pdriver, td_flag_t flags)
+{
+ return 0;
+}
+
+struct tap_disk tapdisk_colo = {
+ .disk_type = "tapdisk_colo",
+ .private_data_size = sizeof(struct tdcolo_state),
+ .td_open = tdcolo_open,
+ .td_queue_read = unprotected_queue_io,
+ .td_queue_write = unprotected_queue_io,
+ .td_pre_close = tdcolo_pre_close,
+ .td_close = tdcolo_close,
+ .td_get_parent_id = tdcolo_get_parent_id,
+ .td_validate_parent = tdcolo_validate_parent,
+};
diff --git a/tools/blktap2/drivers/block-replication.c
b/tools/blktap2/drivers/block-replication.c
index 30eba8f..0992d19 100644
--- a/tools/blktap2/drivers/block-replication.c
+++ b/tools/blktap2/drivers/block-replication.c
@@ -911,6 +911,25 @@ int ramdisk_write_to_hashtable(struct hashtable *h,
uint64_t sector,
return 0;
}
+int ramdisk_read_from_hashtable(struct hashtable *h, uint64_t sector,
+ int nb_sectors, int sector_size,
+ char *buf)
+{
+ int i;
+ uint64_t key;
+ char *v;
+
+ for (i = 0; i < nb_sectors; i++) {
+ key = sector + i;
+ v = hashtable_search(h, &key);
+ if (!v)
+ return -1;
+ memcpy(buf + i * sector_size, v, sector_size);
+ }
+
+ return 0;
+}
+
void ramdisk_destroy_hashtable(struct hashtable *h)
{
if (!h)
@@ -918,3 +937,180 @@ void ramdisk_destroy_hashtable(struct hashtable *h)
hashtable_destroy(h, 1);
}
+
+/* async I/O */
+static void td_async_io_readable(event_id_t id, char mode, void *private);
+static void td_async_io_writeable(event_id_t id, char mode, void *private);
+static void td_async_io_timeout(event_id_t id, char mode, void *private);
+
+void td_async_io_init(td_async_io_t *taio)
+{
+ memset(taio, 0, sizeof(*taio));
+ taio->fd = -1;
+ taio->timeout_id = -1;
+ taio->io_id = -1;
+}
+
+int td_async_io_start(td_async_io_t *taio)
+{
+ event_id_t id;
+
+ if (taio->running)
+ return -1;
+
+ if (taio->size <= 0 || taio->fd < 0)
+ return -1;
+
+ taio->running = 1;
+
+ if (taio->mode == td_async_read)
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ taio->fd, 0,
+ td_async_io_readable,
+ taio);
+ else if (taio->mode == td_async_write)
+ id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+ taio->fd, 0,
+ td_async_io_writeable,
+ taio);
+ else
+ id = -1;
+ if (id < 0)
+ goto err;
+ taio->io_id = id;
+
+ if (taio->timeout_s) {
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ -1, taio->timeout_s,
+ td_async_io_timeout, taio);
+ if (id < 0)
+ goto err;
+ taio->timeout_id = id;
+ }
+
+ taio->used = 0;
+ return 0;
+
+err:
+ td_async_io_kill(taio);
+ return -1;
+}
+
+static void td_async_io_callback(td_async_io_t *taio, int realsize,
+ int errnoval)
+{
+ td_async_io_kill(taio);
+ taio->callback(taio, realsize, errnoval);
+}
+
+static void td_async_io_update_timeout(td_async_io_t *taio)
+{
+ event_id_t id;
+
+ if (!taio->timeout_s)
+ return;
+
+ tapdisk_server_unregister_event(taio->timeout_id);
+ taio->timeout_id = -1;
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ -1, taio->timeout_s,
+ td_async_io_timeout, taio);
+ if (id < 0)
+ td_async_io_callback(taio, -1, id);
+ else
+ taio->timeout_id = id;
+}
+
+static void td_async_io_readable(event_id_t id, char mode, void *private)
+{
+ td_async_io_t *taio = private;
+ int rc;
+
+ while (1) {
+ rc = read(taio->fd, taio->buff + taio->used,
+ taio->size - taio->used);
+ if (rc < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EWOULDBLOCK || errno == EAGAIN)
+ break;
+
+ td_async_io_callback(taio, 0, errno);
+ return;
+ }
+
+ if (rc == 0) {
+ td_async_io_callback(taio, taio->used, 0);
+ return;
+ }
+
+ taio->used += rc;
+ if (taio->used == taio->size) {
+ td_async_io_callback(taio, taio->used, 0);
+ return;
+ }
+ }
+
+ td_async_io_update_timeout(taio);
+}
+
+static void td_async_io_writeable(event_id_t id, char mode, void *private)
+{
+ td_async_io_t *taio = private;
+ int rc;
+
+ while (1) {
+ rc = write(taio->fd, taio->buff + taio->used,
+ taio->size - taio->used);
+
+ if (rc < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EWOULDBLOCK || errno == EAGAIN)
+ break;
+
+ td_async_io_callback(taio, 0, errno);
+ return;
+ }
+
+ taio->used += rc;
+ if (taio->used == taio->size) {
+ td_async_io_callback(taio, taio->used, 0);
+ return;
+ }
+ }
+
+ td_async_io_update_timeout(taio);
+}
+
+static void td_async_io_timeout(event_id_t id, char mode, void *private)
+{
+ td_async_io_t *taio = private;
+
+ td_async_io_kill(taio);
+ taio->callback(taio, 0, ETIME);
+}
+
+int td_async_io_is_running(td_async_io_t *taio)
+{
+ return taio->running;
+}
+
+void td_async_io_kill(td_async_io_t *taio)
+{
+ if (!taio->running)
+ return;
+
+ if (taio->timeout_id >= 0) {
+ tapdisk_server_unregister_event(taio->timeout_id);
+ taio->timeout_id = -1;
+ }
+
+ if (taio->io_id >= 0) {
+ tapdisk_server_unregister_event(taio->io_id);
+ taio->io_id = -1;
+ }
+
+ taio->running = 0;
+}
diff --git a/tools/blktap2/drivers/block-replication.h
b/tools/blktap2/drivers/block-replication.h
index fdc216e..d39c530 100644
--- a/tools/blktap2/drivers/block-replication.h
+++ b/tools/blktap2/drivers/block-replication.h
@@ -156,6 +156,62 @@ struct hashtable *ramdisk_new_hashtable(void);
int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
int nb_sectors, size_t sector_size, char* buf,
const char *log_prefix);
+int ramdisk_read_from_hashtable(struct hashtable *h, uint64_t sector,
+ int nb_sectors, int sector_size,
+ char *buf);
void ramdisk_destroy_hashtable(struct hashtable *h);
+/* async I/O, don't support read/write at the same time */
+typedef struct td_async_io td_async_io_t;
+enum {
+ td_async_read,
+ td_async_write,
+};
+
+/*
+ * realsize >= 1 means all data was read/written
+ * realsize == 0 means failure happened when reading/writing, and
+ * errnoval is valid
+ * realsize == -1 means some other internal failure happended, and
+ * errnoval is also valid
+ * In all cases async_io is killed before calling this callback
+ *
+ * If we don't read/write any more data in timeout_s seconds, realsize is
+ * 0, and errnoval is ETIME
+ *
+ * If timeout_s is 0, timeout will be disabled.
+ *
+ * NOTE: realsize is less than taio->size, if we read EOF.
+ */
+typedef void taio_callback(td_async_io_t *taio, int realsize,
+ int errnoval);
+
+struct td_async_io {
+ /* caller must fill these in, and they must all remain valid */
+ int fd;
+ int timeout_s;
+ int mode;
+ /*
+ * read: store the data to buff
+ * write: point to the data to be written
+ */
+ void *buff;
+ int size;
+ taio_callback *callback;
+
+ /* private */
+ event_id_t timeout_id, io_id;
+ int used;
+ int running;
+};
+
+/* Don't call it when td_async_io is running */
+void td_async_io_init(td_async_io_t *taio);
+/* return -1 if we find some error. Otherwise, return 0 */
+int td_async_io_start(td_async_io_t *taio);
+/* return 1 if td_async_io is running, otherwise return 0 */
+int td_async_io_is_running(td_async_io_t *taio);
+/* The callback will not be called */
+void td_async_io_kill(td_async_io_t *taio);
+
#endif
diff --git a/tools/blktap2/drivers/tapdisk-disktype.c
b/tools/blktap2/drivers/tapdisk-disktype.c
index 8d1383b..aa2afab 100644
--- a/tools/blktap2/drivers/tapdisk-disktype.c
+++ b/tools/blktap2/drivers/tapdisk-disktype.c
@@ -94,6 +94,12 @@ static const disk_info_t remus_disk = {
0,
};
+static const disk_info_t colo_disk = {
+ "colo",
+ "colo disk replicator (COLO)",
+ 0,
+};
+
const disk_info_t *tapdisk_disk_types[] = {
[DISK_TYPE_AIO] = &aio_disk,
[DISK_TYPE_SYNC] = &sync_disk,
@@ -105,6 +111,7 @@ const disk_info_t *tapdisk_disk_types[] = {
[DISK_TYPE_BLOCK_CACHE] = &block_cache_disk,
[DISK_TYPE_LOG] = &log_disk,
[DISK_TYPE_REMUS] = &remus_disk,
+ [DISK_TYPE_COLO] = &colo_disk,
[DISK_TYPE_MAX] = NULL,
};
@@ -119,6 +126,7 @@ extern struct tap_disk tapdisk_block_cache;
extern struct tap_disk tapdisk_vhd_index;
extern struct tap_disk tapdisk_log;
extern struct tap_disk tapdisk_remus;
+extern struct tap_disk tapdisk_colo;
const struct tap_disk *tapdisk_disk_drivers[] = {
[DISK_TYPE_AIO] = &tapdisk_aio,
@@ -132,6 +140,7 @@ const struct tap_disk *tapdisk_disk_drivers[] = {
[DISK_TYPE_BLOCK_CACHE] = &tapdisk_block_cache,
[DISK_TYPE_LOG] = &tapdisk_log,
[DISK_TYPE_REMUS] = &tapdisk_remus,
+ [DISK_TYPE_COLO] = &tapdisk_colo,
[DISK_TYPE_MAX] = NULL,
};
diff --git a/tools/blktap2/drivers/tapdisk-disktype.h
b/tools/blktap2/drivers/tapdisk-disktype.h
index c574990..ee8cb02 100644
--- a/tools/blktap2/drivers/tapdisk-disktype.h
+++ b/tools/blktap2/drivers/tapdisk-disktype.h
@@ -39,7 +39,8 @@
#define DISK_TYPE_BLOCK_CACHE 7
#define DISK_TYPE_LOG 8
#define DISK_TYPE_REMUS 9
-#define DISK_TYPE_MAX 10
+#define DISK_TYPE_COLO 10
+#define DISK_TYPE_MAX 11
#define DISK_TYPE_NAME_MAX 32
--
1.9.3
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |