|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH 13/17] tools: block-remus: connect to backup asynchronously
Use the API to connect to backup asynchronously.
Before the connection is established, we queue
all I/O requests, and handle them when the connection
is established.
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
tools/blktap2/drivers/block-remus.c | 508 +++++++++++++-----------------
tools/blktap2/drivers/block-replication.h | 1 +
2 files changed, 221 insertions(+), 288 deletions(-)
diff --git a/tools/blktap2/drivers/block-remus.c
b/tools/blktap2/drivers/block-remus.c
index e5ad782..a2b9f62 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -40,6 +40,7 @@
#include "hashtable.h"
#include "hashtable_itr.h"
#include "hashtable_utility.h"
+#include "block-replication.h"
#include <errno.h>
#include <inttypes.h>
@@ -49,10 +50,7 @@
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
#include <netinet/in.h>
-#include <arpa/inet.h>
#include <sys/param.h>
#include <sys/sysctl.h>
#include <unistd.h>
@@ -63,10 +61,12 @@
#define RAMDISK_HASHSIZE 128
/* connect retry timeout (seconds) */
-#define REMUS_CONNRETRY_TIMEOUT 10
+#define REMUS_CONNRETRY_TIMEOUT 1
#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
+#define MAX_REMUS_REQUESTS TAPDISK_DATA_REQUESTS
+
enum tdremus_mode {
mode_invalid = 0,
mode_unprotected,
@@ -75,16 +75,14 @@ enum tdremus_mode {
};
struct tdremus_req {
- uint64_t sector;
- int nb_sectors;
- char buf[4096];
+ td_request_t treq;
};
struct req_ring {
/* waste one slot to distinguish between empty and full */
- struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
- unsigned int head;
- unsigned int tail;
+ struct tdremus_req pending_requests[MAX_REMUS_REQUESTS + 1];
+ unsigned int prod;
+ unsigned int cons;
};
/* TODO: This isn't very pretty, but to properly generate our own treqs (needed
@@ -161,13 +159,14 @@ struct tdremus_state {
char* msg_path; /* output completion message here */
poll_fd_t msg_fd;
- /* replication host */
- struct sockaddr_in sa;
- poll_fd_t server_fd; /* server listen port */
+ td_replication_connect_t t;
poll_fd_t stream_fd; /* replication channel */
- /* queue write requests, batch-replicate at submit */
- struct req_ring write_ring;
+ /*
+ * queue I/O requests, batch-replicate when
+ * the connection is established.
+ */
+ struct req_ring queued_io;
/* ramdisk data*/
struct ramdisk ramdisk;
@@ -206,11 +205,13 @@ static int tdremus_close(td_driver_t *driver);
static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
static int ctl_respond(struct tdremus_state *s, const char *response);
+static int ctl_register(struct tdremus_state *s);
+static void ctl_unregister(struct tdremus_state *s);
/* ring functions */
-static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
+static inline unsigned int ring_next(unsigned int pos)
{
- if (++pos >= MAX_REQUESTS * 2 + 1)
+ if (++pos >= MAX_REMUS_REQUESTS + 1)
return 0;
return pos;
@@ -218,13 +219,26 @@ static inline unsigned int ring_next(struct req_ring*
ring, unsigned int pos)
static inline int ring_isempty(struct req_ring* ring)
{
- return ring->head == ring->tail;
+ return ring->cons == ring->prod;
}
static inline int ring_isfull(struct req_ring* ring)
{
- return ring_next(ring, ring->tail) == ring->head;
+ return ring_next(ring->prod) == ring->cons;
}
+
+static void ring_add_request(struct req_ring *ring, const td_request_t *treq)
+{
+ /* If ring is full, it means that tapdisk2 has some bug */
+ if (ring_isfull(ring)) {
+ RPRINTF("OOPS, ring is full\n");
+ exit(1);
+ }
+
+ ring->pending_requests[ring->prod].treq = *treq;
+ ring->prod = ring_next(ring->prod);
+}
+
/* Prototype declarations */
static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
@@ -724,89 +738,113 @@ static int mwrite(int fd, void* buf, size_t len)
select(fd + 1, NULL, &wfds, NULL, &tv);
}
-
-static void inline close_stream_fd(struct tdremus_state *s)
-{
- if (s->stream_fd.fd < 0)
- return;
-
- /* XXX: -2 is magic. replace with macro perhaps? */
- tapdisk_server_unregister_event(s->stream_fd.id);
- close(s->stream_fd.fd);
- s->stream_fd.fd = -2;
-}
-
-static void close_server_fd(struct tdremus_state *s)
-{
- if (s->server_fd.fd < 0)
- return;
-
- tapdisk_server_unregister_event(s->server_fd.id);
- s->server_fd.id = -1;
- close(s->stream_fd.fd);
- s->stream_fd.fd = -1;
-}
-
/* primary functions */
static void remus_client_event(event_id_t, char mode, void *private);
+static int primary_forward_request(struct tdremus_state *s,
+ const td_request_t *treq);
-static int primary_blocking_connect(struct tdremus_state *state)
+/*
+ * It is called when we cannot connect to backup, or find I/O error when
+ * reading/writing.
+ */
+static void primary_failed(struct tdremus_state *s, int rc)
{
- int fd;
- int id;
+ td_replication_connect_kill(&s->t);
+ if (rc == ERROR_INTERNAL)
+ RPRINTF("switch to unprotected mode due to internal error");
+ UNREGISTER_EVENT(s->stream_fd.id);
+ switch_mode(s->tdremus_driver, mode_unprotected);
+}
+
+static int remus_handle_queued_io(struct tdremus_state *s)
+{
+ struct req_ring *queued_io = &s->queued_io;
+ unsigned int cons;
+ td_request_t *treq;
int rc;
- int flags;
- RPRINTF("client connecting to %s:%d...\n",
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
+ while (!ring_isempty(queued_io)) {
+ cons = queued_io->cons;
+ treq = &queued_io->pending_requests[cons].treq;
- if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- RPRINTF("could not create client socket: %d\n", errno);
- return -1;
- }
-
- do {
- if ((rc = connect(fd, (struct sockaddr *)&state->sa,
- sizeof(state->sa))) < 0)
- {
- if (errno == ECONNREFUSED) {
- RPRINTF("connection refused -- retrying in 1
second\n");
- sleep(1);
- } else {
- RPRINTF("connection failed: %d\n", errno);
- close(fd);
- return -1;
- }
+ if (treq->op == TD_OP_WRITE) {
+ rc = primary_forward_request(s, treq);
+ if (rc)
+ return rc;
}
- } while (rc < 0);
- RPRINTF("client connected\n");
-
- /* make socket nonblocking */
- if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
- flags = 0;
- if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
- {
- RPRINTF("error making socket nonblocking\n");
- close(fd);
- return -1;
+ td_forward_request(*treq);
+ queued_io->cons = ring_next(cons);
}
- if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0,
remus_client_event, state)) < 0) {
- RPRINTF("error registering client event handler: %s\n",
strerror(id));
- close(fd);
- return -1;
- }
-
- state->stream_fd.fd = fd;
- state->stream_fd.id = id;
return 0;
}
-/* on read, just pass request through */
+static void remus_client_established(td_replication_connect_t *t, int rc)
+{
+ struct tdremus_state *s = CONTAINER_OF(t, *s, t);
+ event_id_t id;
+
+ if (rc) {
+ primary_failed(s, rc);
+ return;
+ }
+
+ /* the connect succeeded */
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
+ 0, remus_client_event, s);
+ if(id < 0) {
+ RPRINTF("error registering client event handler: %s\n",
+ strerror(id));
+ primary_failed(s, ERROR_INTERNAL);
+ return;
+ }
+
+ s->stream_fd.fd = t->fd;
+ s->stream_fd.id = id;
+
+ /* handle the queued requests */
+ rc = remus_handle_queued_io(s);
+ if (rc)
+ primary_failed(s, rc);
+}
+
static void primary_queue_read(td_driver_t *driver, td_request_t treq)
{
- /* just pass read through */
- td_forward_request(treq);
+ struct tdremus_state *s = (struct tdremus_state *)driver->data;
+ struct req_ring *ring = &s->queued_io;
+
+ if (ring_isempty(ring)) {
+ /* just pass read through */
+ td_forward_request(treq);
+ return;
+ }
+
+ ring_add_request(ring, &treq);
+}
+
+static int primary_forward_request(struct tdremus_state *s,
+ const td_request_t *treq)
+{
+ char header[sizeof(uint32_t) + sizeof(uint64_t)];
+ uint32_t *sectors = (uint32_t *)header;
+ uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+ td_driver_t *driver = s->tdremus_driver;
+
+ *sectors = treq->secs;
+ *sector = treq->sec;
+
+ if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
+ return ERROR_IO;
+
+ if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
+ return ERROR_IO;
+
+ if (mwrite(s->stream_fd.fd, treq->buf,
+ treq->secs * driver->info.sector_size) < 0)
+ return ERROR_IO;
+
+ return 0;
}
/* TODO:
@@ -819,28 +857,28 @@ static void primary_queue_read(td_driver_t *driver,
td_request_t treq)
static void primary_queue_write(td_driver_t *driver, td_request_t treq)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
- char header[sizeof(uint32_t) + sizeof(uint64_t)];
- uint32_t *sectors = (uint32_t *)header;
- uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+ int rc, ret;
// RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
- /* -1 means we haven't connected yet, -2 means the connection was lost
*/
- if(s->stream_fd.fd == -1) {
+ ret = td_replication_connect_status(&s->t);
+ if(ret == -1) {
RPRINTF("connecting to backup...\n");
- primary_blocking_connect(s);
+ s->t.callback = remus_client_established;
+ rc = td_replication_client_start(&s->t);
+ if (rc)
+ goto fail;
}
- *sectors = treq.secs;
- *sector = treq.sec;
+ /* The connection is not established, just queue the request */
+ if (ret != 1) {
+ ring_add_request(&s->queued_io, &treq);
+ return;
+ }
- if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
- goto fail;
- if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
- goto fail;
-
- if (mwrite(s->stream_fd.fd, treq.buf, treq.secs *
driver->info.sector_size) < 0)
+ /* The connection is established */
+ rc = primary_forward_request(s, &treq);
+ if (rc)
goto fail;
td_forward_request(treq);
@@ -850,7 +888,7 @@ static void primary_queue_write(td_driver_t *driver,
td_request_t treq)
fail:
/* switch to unprotected mode and tell tapdisk to retry */
RPRINTF("write request replication failed, switching to unprotected
mode");
- switch_mode(s->tdremus_driver, mode_unprotected);
+ primary_failed(s, rc);
td_complete_request(treq, -EBUSY);
}
@@ -867,7 +905,7 @@ static int client_flush(td_driver_t *driver)
if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) <
0) {
RPRINTF("error flushing output");
- close_stream_fd(s);
+ primary_failed(s, ERROR_IO);
return -1;
}
@@ -886,6 +924,26 @@ static int server_flush(td_driver_t *driver)
return ramdisk_flush(driver, s);
}
+/* It is called when switching the mode from primary to unprotected */
+static int primary_flush(td_driver_t *driver)
+{
+ struct tdremus_state *s = driver->data;
+ struct req_ring *ring = &s->queued_io;
+ unsigned int cons;
+
+ if (ring_isempty(ring))
+ return 0;
+
+ while (!ring_isempty(ring)) {
+ cons = ring->cons;
+ ring->cons = ring_next(cons);
+
+ td_forward_request(ring->pending_requests[cons].treq);
+ }
+
+ return client_flush(driver);
+}
+
static int primary_start(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -894,7 +952,7 @@ static int primary_start(td_driver_t *driver)
tapdisk_remus.td_queue_read = primary_queue_read;
tapdisk_remus.td_queue_write = primary_queue_write;
- s->queue_flush = client_flush;
+ s->queue_flush = primary_flush;
s->stream_fd.fd = -1;
s->stream_fd.id = -1;
@@ -913,7 +971,7 @@ static void remus_client_event(event_id_t id, char mode,
void *private)
if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
/* replication stream closed or otherwise broken (timeout,
reset, &c) */
RPRINTF("error reading from backup\n");
- close_stream_fd(s);
+ primary_failed(s, ERROR_IO);
return;
}
@@ -924,7 +982,7 @@ static void remus_client_event(event_id_t id, char mode,
void *private)
ctl_respond(s, TDREMUS_DONE);
else {
RPRINTF("received unknown message: %s\n", req);
- close_stream_fd(s);
+ primary_failed(s, ERROR_IO);
}
return;
@@ -933,84 +991,36 @@ static void remus_client_event(event_id_t id, char mode,
void *private)
/* backup functions */
static void remus_server_event(event_id_t id, char mode, void *private);
-/* returns the socket that receives write requests */
-static void remus_server_accept(event_id_t id, char mode, void* private)
+/* It is called when we find some I/O error */
+static void backup_failed(struct tdremus_state *s, int rc)
{
- struct tdremus_state* s = (struct tdremus_state *) private;
+ UNREGISTER_EVENT(s->stream_fd.id);
+ td_replication_connect_kill(&s->t);
+ /* We will switch to unprotected mode in backup_queue_write() */
+}
- int stream_fd;
- event_id_t cid;
+/* returns the socket that receives write requests */
+static void remus_server_established(td_replication_connect_t *t, int rc)
+{
+ struct tdremus_state *s = CONTAINER_OF(t, *s, t);
+ event_id_t id;
- /* XXX: add address-based black/white list */
- if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
- RPRINTF("error accepting connection: %d\n", errno);
- return;
- }
-
- /* TODO: check to see if we are already replicating. if so just close
the
- * connection (or do something smarter) */
- RPRINTF("server accepted connection\n");
+ /* rc is always 0 */
/* add tapdisk event for replication stream */
- cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd,
0,
- remus_server_event, s);
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
+ remus_server_event, s);
- if(cid < 0) {
- RPRINTF("error registering connection event handler: %s\n",
strerror(errno));
- close(stream_fd);
+ if (id < 0) {
+ RPRINTF("error registering connection event handler: %s\n",
+ strerror(errno));
+ td_replication_server_restart(t);
return;
}
/* store replication file descriptor */
- s->stream_fd.fd = stream_fd;
- s->stream_fd.id = cid;
-}
-
-/* returns -2 if EADDRNOTAVAIL */
-static int remus_bind(struct tdremus_state* s)
-{
-// struct sockaddr_in sa;
- int opt;
- int rc = -1;
-
- if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- RPRINTF("could not create server socket: %d\n", errno);
- return rc;
- }
- opt = 1;
- if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(opt)) < 0)
- RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd,
errno);
-
- if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) <
0) {
- RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
s->server_fd.fd,
- inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port),
errno, strerror(errno));
- if (errno != EADDRINUSE)
- rc = -2;
- goto err_sfd;
- }
- if (listen(s->server_fd.fd, 10)) {
- RPRINTF("could not listen on socket: %d\n", errno);
- goto err_sfd;
- }
-
- /* The socket s now bound to the address and listening so we may now
register
- * the fd with tapdisk */
-
- if((s->server_fd.id =
tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
- s->server_fd.fd, 0,
-
remus_server_accept, s)) < 0) {
- RPRINTF("error registering server connection event handler: %s",
- strerror(s->server_fd.id));
- goto err_sfd;
- }
-
- return 0;
-
- err_sfd:
- close(s->server_fd.fd);
- s->server_fd.fd = -1;
-
- return rc;
+ s->stream_fd.fd = t->fd;
+ s->stream_fd.id = id;
}
/* wait for latest checkpoint to be applied */
@@ -1053,6 +1063,8 @@ void backup_queue_write(td_driver_t *driver, td_request_t
treq)
* handle the write
*/
+ /* If we have called backup_failed, calling it again is harmless */
+ backup_failed(s, ERROR_INTERNAL);
switch_mode(driver, mode_unprotected);
/* TODO: call the appropriate write function rather than return EBUSY */
td_complete_request(treq, -EBUSY);
@@ -1061,7 +1073,6 @@ void backup_queue_write(td_driver_t *driver, td_request_t
treq)
static int backup_start(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
- int fd;
if (ramdisk_start(driver) < 0)
return -1;
@@ -1073,12 +1084,12 @@ static int backup_start(td_driver_t *driver)
return 0;
}
-static int server_do_wreq(td_driver_t *driver)
+static void server_do_wreq(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
static tdremus_wire_t twreq;
char buf[4096];
- int len, rc;
+ int len, rc = ERROR_IO;
char header[sizeof(uint32_t) + sizeof(uint64_t)];
uint32_t *sectors = (uint32_t *) header;
@@ -1097,28 +1108,28 @@ static int server_do_wreq(td_driver_t *driver)
if (len > sizeof(buf)) {
/* freak out! */
RPRINTF("write request too large: %d/%u\n", len,
(unsigned)sizeof(buf));
- return -1;
+ goto err;
}
if (mread(s->stream_fd.fd, buf, len) < 0)
goto err;
- if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
+ if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
+ rc = ERROR_INTERNAL;
goto err;
+ }
- return 0;
+ return;
err:
/* should start failover */
RPRINTF("backup write request error\n");
- close_stream_fd(s);
-
- return -1;
+ backup_failed(s, rc);
}
/* at this point, the server can start applying the most recent
* ramdisk. */
-static int server_do_creq(td_driver_t *driver)
+static void server_do_creq(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -1128,9 +1139,7 @@ static int server_do_creq(td_driver_t *driver)
/* XXX this message should not be sent until flush completes! */
if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
- return -1;
-
- return 0;
+ backup_failed(s, ERROR_IO);
}
@@ -1213,11 +1222,6 @@ static int unprotected_start(td_driver_t *driver)
RPRINTF("failure detected, activating passthrough\n");
- /* close the server socket */
- close_stream_fd(s);
-
- close_server_fd(s);
-
/* install the unprotected read/write handlers */
tapdisk_remus.td_queue_read = unprotected_queue_read;
tapdisk_remus.td_queue_write = unprotected_queue_write;
@@ -1227,90 +1231,6 @@ static int unprotected_start(td_driver_t *driver)
/* control */
-
-static inline int resolve_address(const char* addr, struct in_addr* ia)
-{
- struct hostent* he;
- uint32_t ip;
-
- if (!(he = gethostbyname(addr))) {
- RPRINTF("error resolving %s: %d\n", addr, h_errno);
- return -1;
- }
-
- if (!he->h_addr_list[0]) {
- RPRINTF("no address found for %s\n", addr);
- return -1;
- }
-
- /* network byte order */
- ip = *((uint32_t**)he->h_addr_list)[0];
- ia->s_addr = ip;
-
- return 0;
-}
-
-static int get_args(td_driver_t *driver, const char* name)
-{
- struct tdremus_state *state = (struct tdremus_state *)driver->data;
- char* host;
- char* port;
-// char* driver_str;
-// char* parent;
-// int type;
-// char* path;
-// unsigned long ulport;
-// int i;
-// struct sockaddr_in server_addr_in;
-
- int gai_status;
- int valid_addr;
- struct addrinfo gai_hints;
- struct addrinfo *servinfo, *servinfo_itr;
-
- memset(&gai_hints, 0, sizeof gai_hints);
- gai_hints.ai_family = AF_UNSPEC;
- gai_hints.ai_socktype = SOCK_STREAM;
-
- port = strchr(name, ':');
- if (!port) {
- RPRINTF("missing host in %s\n", name);
- return -ENOENT;
- }
- if (!(host = strndup(name, port - name))) {
- RPRINTF("unable to allocate host\n");
- return -ENOMEM;
- }
- port++;
-
- if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0)
{
- RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
- return -ENOENT;
- }
-
- /* TODO: do something smarter here */
- valid_addr = 0;
- for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr =
servinfo_itr->ai_next) {
- void *addr;
- char *ipver;
-
- if (servinfo_itr->ai_family == AF_INET) {
- valid_addr = 1;
- memset(&state->sa, 0, sizeof(state->sa));
- state->sa = *(struct sockaddr_in
*)servinfo_itr->ai_addr;
- break;
- }
- }
- freeaddrinfo(servinfo);
-
- if (!valid_addr)
- return -ENOENT;
-
- RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr),
ntohs(state->sa.sin_port));
-
- return 0;
-}
-
static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -1343,6 +1263,20 @@ static int switch_mode(td_driver_t *driver, enum
tdremus_mode mode)
return rc;
}
+static void ctl_reopen(struct tdremus_state *s)
+{
+ ctl_unregister(s);
+ CLOSE_FD(s->ctl_fd.fd);
+ RPRINTF("FIFO closed\n");
+
+ if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+ RPRINTF("error reopening FIFO: %d\n", errno);
+ return;
+ }
+
+ ctl_register(s);
+}
+
static void ctl_request(event_id_t id, char mode, void *private)
{
struct tdremus_state *s = (struct tdremus_state *)private;
@@ -1355,11 +1289,7 @@ static void ctl_request(event_id_t id, char mode, void
*private)
if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
RPRINTF("0-byte read received, reopening FIFO\n");
/*TODO: we may have to unregister/re-register with
tapdisk_server */
- close(s->ctl_fd.fd);
- RPRINTF("FIFO closed\n");
- if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
- RPRINTF("error reopening FIFO: %d\n", errno);
- }
+ ctl_reopen(s);
return;
}
@@ -1372,7 +1302,7 @@ static void ctl_request(event_id_t id, char mode, void
*private)
msg[rc] = '\0';
if (!strncmp(msg, "flush", 5)) {
if (s->mode == mode_primary) {
- if ((rc = s->queue_flush(driver))) {
+ if ((rc = client_flush(driver))) {
RPRINTF("error passing flush request to
backup");
ctl_respond(s, TDREMUS_FAIL);
}
@@ -1521,6 +1451,7 @@ static void ctl_unregister(struct tdremus_state *s)
static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
+ td_replication_connect_t *t = &s->t;
int rc;
const char *name = image->name;
td_flag_t flags = image->flags;
@@ -1531,7 +1462,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t
*image, td_uuid_t uuid)
remus_image = image;
memset(s, 0, sizeof(*s));
- s->server_fd.fd = -1;
s->stream_fd.fd = -1;
s->ctl_fd.fd = -1;
s->msg_fd.fd = -1;
@@ -1540,8 +1470,11 @@ static int tdremus_open(td_driver_t *driver, td_image_t
*image, td_uuid_t uuid)
* the driver stack from the stream_fd event handler */
s->tdremus_driver = driver;
- /* parse name to get info etc */
- if ((rc = get_args(driver, name)))
+ t->log_prefix = "remus";
+ t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
+ t->max_connections = 10;
+ t->callback = remus_server_established;
+ if ((rc = td_replication_connect_init(t, name)))
return rc;
if ((rc = ctl_open(driver, name))) {
@@ -1555,7 +1488,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t
*image, td_uuid_t uuid)
return rc;
}
- if (!(rc = remus_bind(s)))
+ if (!(rc = td_replication_server_start(t)))
rc = switch_mode(driver, mode_backup);
else if (rc == -2)
rc = switch_mode(driver, mode_primary);
@@ -1575,8 +1508,7 @@ static int tdremus_close(td_driver_t *driver)
if (s->ramdisk.inprogress)
hashtable_destroy(s->ramdisk.inprogress, 0);
- close_server_fd(s);
- close_stream_fd(s);
+ td_replication_connect_kill(&s->t);
ctl_unregister(s);
ctl_close(s);
diff --git a/tools/blktap2/drivers/block-replication.h
b/tools/blktap2/drivers/block-replication.h
index 9e051cc..07fd630 100644
--- a/tools/blktap2/drivers/block-replication.h
+++ b/tools/blktap2/drivers/block-replication.h
@@ -48,6 +48,7 @@
enum {
ERROR_INTERNAL = -1,
ERROR_CONNECTION = -2,
+ ERROR_IO = -3,
};
typedef struct td_replication_connect td_replication_connect_t;
--
1.9.3
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |