|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH 12/17] tools: blktap2: implement an API to create a connection asynchronously
tapdisk2 is a single thread process. If we use remus,
we will block in primary_blocking_connect(). The
user will not have any chance to talk with tapdisk2.
So we should connect to backup asynchronously. The patch
only implements an API to create a connection asynchronously.
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
---
tools/blktap2/drivers/Makefile | 1 +
tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++++
tools/blktap2/drivers/block-replication.h | 111 +++++++
3 files changed, 580 insertions(+)
create mode 100644 tools/blktap2/drivers/block-replication.c
create mode 100644 tools/blktap2/drivers/block-replication.h
diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
index 3476fc1..a7f45c7 100644
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -29,6 +29,7 @@ REMUS-OBJS := block-remus.o
REMUS-OBJS += hashtable.o
REMUS-OBJS += hashtable_itr.o
REMUS-OBJS += hashtable_utility.o
+REMUS-OBJS += block-replication.o
tapdisk2 tapdisk-stream tapdisk-diff $(QCOW_UTIL): AIOLIBS := -laio
diff --git a/tools/blktap2/drivers/block-replication.c
b/tools/blktap2/drivers/block-replication.c
new file mode 100644
index 0000000..e4b2679
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.c
@@ -0,0 +1,468 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "tapdisk-server.h"
+#include "block-replication.h"
+
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+
+#undef DPRINTF
+#undef EPRINTF
+#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
+#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
+
+/* connection status */
+enum {
+ connection_none,
+ connection_in_progress,
+ connection_established,
+ connection_closed,
+};
+
+/* common functions */
+/* args should be host:port */
+static int get_args(td_replication_connect_t *t, const char* name)
+{
+ char* host;
+ const char* port;
+ int gai_status;
+ int valid_addr;
+ struct addrinfo gai_hints;
+ struct addrinfo *servinfo, *servinfo_itr;
+ const char *log_prefix = t->log_prefix;
+
+ memset(&gai_hints, 0, sizeof gai_hints);
+ gai_hints.ai_family = AF_UNSPEC;
+ gai_hints.ai_socktype = SOCK_STREAM;
+
+ port = strchr(name, ':');
+ if (!port) {
+ EPRINTF("missing host in %s\n", name);
+ return -ENOENT;
+ }
+ if (!(host = strndup(name, port - name))) {
+ EPRINTF("unable to allocate host\n");
+ return -ENOMEM;
+ }
+ port++;
+ if ((gai_status = getaddrinfo(host, port,
+ &gai_hints, &servinfo)) != 0) {
+ EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
+ free(host);
+ return -ENOENT;
+ }
+ free(host);
+
+ /* TODO: do something smarter here */
+ valid_addr = 0;
+ for (servinfo_itr = servinfo; servinfo_itr != NULL;
+ servinfo_itr = servinfo_itr->ai_next) {
+ if (servinfo_itr->ai_family == AF_INET) {
+ valid_addr = 1;
+ memset(&t->sa, 0, sizeof(t->sa));
+ t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
+ break;
+ }
+ }
+ freeaddrinfo(servinfo);
+
+ if (!valid_addr)
+ return -ENOENT;
+
+ DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
+ ntohs(t->sa.sin_port));
+
+ return 0;
+}
+
+int td_replication_connect_init(td_replication_connect_t *t, const char *name)
+{
+ int rc;
+
+ rc = get_args(t, name);
+ if (rc)
+ return rc;
+
+ t->listen_fd = -1;
+ t->id = -1;
+ t->status = connection_none;
+ return 0;
+}
+
+int td_replication_connect_status(td_replication_connect_t *t)
+{
+ const char *log_prefix = t->log_prefix;
+
+ switch (t->status) {
+ case connection_none:
+ case connection_closed:
+ return -1;
+ case connection_in_progress:
+ return 0;
+ case connection_established:
+ return 1;
+ default:
+ EPRINTF("td_replication_connect is corruptted\n");
+ return -2;
+ }
+}
+
+void td_replication_connect_kill(td_replication_connect_t *t)
+{
+ if (t->status != connection_in_progress &&
+ t->status != connection_established)
+ return;
+
+ UNREGISTER_EVENT(t->id);
+ CLOSE_FD(t->fd);
+ CLOSE_FD(t->listen_fd);
+ t->status = connection_closed;
+}
+
+/* server */
+static void td_replication_server_accept(event_id_t id, char mode,
+ void *private);
+
+int td_replication_server_start(td_replication_connect_t *t)
+{
+ int opt;
+ int rc = -1;
+ event_id_t id;
+ int fd;
+ const char *log_prefix = t->log_prefix;
+
+ if (t->status == connection_in_progress ||
+ t->status == connection_established)
+ return rc;
+
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd < 0) {
+ EPRINTF("could not create server socket: %d\n", errno);
+ return rc;
+ }
+
+ opt = 1;
+ if (setsockopt(fd, SOL_SOCKET,
+ SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+ DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
+
+ if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
+ DPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
+ fd, inet_ntoa(t->sa.sin_addr),
+ ntohs(t->sa.sin_port), errno, strerror(errno));
+ if (errno == EADDRNOTAVAIL)
+ rc = -2;
+ goto err;
+ }
+
+ if (listen(fd, t->max_connections)) {
+ EPRINTF("could not listen on socket: %d\n", errno);
+ goto err;
+ }
+
+ /*
+ * The socket is now bound to the address and listening so we
+ * may now register the fd with tapdisk
+ */
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ fd, 0,
+ td_replication_server_accept, t);
+ if (id < 0) {
+ EPRINTF("error registering server connection event handler: %s",
+ strerror(id));
+ goto err;
+ }
+ t->listen_fd = fd;
+ t->id = id;
+ t->status = connection_in_progress;
+
+ return 0;
+
+err:
+ close(fd);
+ return rc;
+}
+
+static void td_replication_server_accept(event_id_t id, char mode,
+ void *private)
+{
+ td_replication_connect_t *t = private;
+ int fd;
+ const char *log_prefix = t->log_prefix;
+
+ /* XXX: add address-based black/white list */
+ fd = accept(t->listen_fd, NULL, NULL);
+ if (fd < 0) {
+ EPRINTF("error accepting connection: %d\n", errno);
+ return;
+ }
+
+ if (t->status == connection_established) {
+ EPRINTF("connection is already established\n");
+ close(fd);
+ return;
+ }
+
+ DPRINTF("server accepted connection\n");
+ t->fd = fd;
+ t->status = connection_established;
+ t->callback(t, 0);
+}
+
+int td_replication_server_restart(td_replication_connect_t *t)
+{
+ switch (t->status) {
+ case connection_in_progress:
+ return 0;
+ case connection_established:
+ CLOSE_FD(t->fd);
+ t->status = connection_in_progress;
+ return 0;
+ case connection_none:
+ case connection_closed:
+ return td_replication_server_start(t);
+ default:
+ /* not reached */
+ return -1;
+ }
+}
+
+/* client */
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+ void *private);
+static void td_replication_connect_event(event_id_t id, char mode,
+ void *private);
+int td_replication_client_start(td_replication_connect_t *t)
+{
+ event_id_t id;
+ int fd;
+ int rc;
+ int flags;
+ const char *log_prefix = t->log_prefix;
+
+ if (t->status == connection_in_progress ||
+ t->status == connection_established)
+ return ERROR_INTERNAL;
+
+ DPRINTF("client connecting to %s:%d...\n",
+ inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
+
+ if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+ EPRINTF("could not create client socket: %d\n", errno);
+ return ERROR_INTERNAL;
+ }
+
+ /* make socket nonblocking */
+ if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+ flags = 0;
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ EPRINTF("error setting fd %d to non block mode\n", fd);
+ goto err;
+ }
+
+ /*
+ * once we have created the socket and populated the address,
+ * we can now start our non-blocking connect. rather than
+ * duplicating code we trigger a timeout on the socket fd,
+ * which calls out nonblocking connect code
+ */
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
+ td_replication_retry_connect_event,
+ t);
+ if(id < 0) {
+ EPRINTF("error registering timeout client connection event
handler: %s\n",
+ strerror(id));
+ goto err;
+ }
+
+ t->fd = fd;
+ t->id = id;
+ t->status = connection_in_progress;
+ return 0;
+
+err:
+ close(fd);
+ return ERROR_INTERNAL;
+}
+
+static void td_replication_client_failed(td_replication_connect_t *t, int rc)
+{
+ td_replication_connect_kill(t);
+ t->callback(t, rc);
+}
+
+static void td_replication_client_done(td_replication_connect_t *t)
+{
+ UNREGISTER_EVENT(t->id);
+ t->status = connection_established;
+ t->callback(t, 0);
+}
+
+static int td_replication_retry_connect(td_replication_connect_t *t)
+{
+ event_id_t id;
+ const char *log_prefix = t->log_prefix;
+
+ UNREGISTER_EVENT(t->id);
+
+ DPRINTF("connect to server 1 second later");
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ t->fd, t->retry_timeout_s,
+ td_replication_retry_connect_event,
+ t);
+ if (id < 0) {
+ EPRINTF("error registering timeout client connection event
handler: %s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
+ }
+
+ t->id = id;
+ return 0;
+}
+
+static int td_replication_wait_connect_done(td_replication_connect_t *t)
+{
+ event_id_t id;
+ const char *log_prefix = t->log_prefix;
+
+ UNREGISTER_EVENT(t->id);
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+ t->fd, 0,
+ td_replication_connect_event, t);
+ if (id < 0) {
+ EPRINTF("error registering client connection event handler:
%s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
+ }
+ t->id = id;
+
+ return 0;
+}
+
+/* return 1 if we need to reconnect to backup server */
+static int check_connect_errno(int err)
+{
+ /*
+ * The fd is non-block, so we will not get ETIMEDOUT
+ * after calling connect(). We only can get this errno
+ * by getsockopt().
+ */
+ if (err == ECONNREFUSED || err == ENETUNREACH ||
+ err == EAGAIN || err == ECONNABORTED ||
+ err == ETIMEDOUT)
+ return 1;
+
+ return 0;
+}
+
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+ void *private)
+{
+ td_replication_connect_t *t = private;
+ int rc, ret;
+ const char *log_prefix = t->log_prefix;
+
+ /* do a non-blocking connect */
+ ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
+ if (ret) {
+ if (errno == EINPROGRESS) {
+ /*
+ * the connect returned EINPROGRESS (nonblocking
+ * connect) we must wait for the fd to be writeable
+ * to determine if the connect worked
+ */
+ rc = td_replication_wait_connect_done(t);
+ if (rc)
+ goto fail;
+ return;
+ }
+
+ if (check_connect_errno(errno)) {
+ rc = td_replication_retry_connect(t);
+ if (rc)
+ goto fail;
+ return;
+ }
+
+ /* not recoverable */
+ EPRINTF("error connection to server %s\n", strerror(errno));
+ rc = ERROR_CONNECTION;
+ goto fail;
+ }
+
+ /* The connection is established unexpectedly */
+ td_replication_client_done(t);
+
+ return;
+
+fail:
+ td_replication_client_failed(t, rc);
+}
+
+/* callback when nonblocking connect() is finished */
+static void td_replication_connect_event(event_id_t id, char mode,
+ void *private)
+{
+ int socket_errno;
+ socklen_t socket_errno_size;
+ td_replication_connect_t *t = private;
+ int rc;
+ const char *log_prefix = t->log_prefix;
+
+ /* check to see if the connect succeeded */
+ socket_errno_size = sizeof(socket_errno);
+ if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
+ &socket_errno, &socket_errno_size)) {
+ EPRINTF("error getting socket errno\n");
+ return;
+ }
+
+ DPRINTF("socket connect returned %d\n", socket_errno);
+
+ if (socket_errno) {
+ /* the connect did not succeed */
+ if (check_connect_errno(socket_errno)) {
+ /*
+ * we can probably assume that the backup is down.
+ * just try again later
+ */
+ rc = td_replication_retry_connect(t);
+ if (rc)
+ goto fail;
+
+ return;
+ } else {
+ EPRINTF("socket connect returned %d, giving up\n",
+ socket_errno);
+ rc = ERROR_CONNECTION;
+ goto fail;
+ }
+ }
+
+ td_replication_client_done(t);
+
+ return;
+
+fail:
+ td_replication_client_failed(t, rc);
+}
diff --git a/tools/blktap2/drivers/block-replication.h
b/tools/blktap2/drivers/block-replication.h
new file mode 100644
index 0000000..9e051cc
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.h
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#ifndef BLOCK_REPLICATION_H
+#define BLOCK_REPLICATION_H
+
+#include "scheduler.h"
+#include <sys/socket.h>
+#include <netdb.h>
+
+#define CONTAINER_OF(inner_ptr, outer, member_name) \
+ ({ \
+ typeof(outer) *container_of_; \
+ container_of_ = (void*)((char*)(inner_ptr) - \
+ offsetof(typeof(outer), member_name)); \
+ (void)(&container_of_->member_name == \
+ (typeof(inner_ptr))0) /* type check */; \
+ container_of_; \
+ })
+
+#define UNREGISTER_EVENT(id) \
+ do { \
+ if (id >= 0) { \
+ tapdisk_server_unregister_event(id); \
+ id = -1; \
+ } \
+ } while (0)
+#define CLOSE_FD(fd) \
+ do { \
+ if (fd >= 0) { \
+ close(fd); \
+ fd = -1; \
+ } \
+ } while (0)
+
+enum {
+ ERROR_INTERNAL = -1,
+ ERROR_CONNECTION = -2,
+};
+
+typedef struct td_replication_connect td_replication_connect_t;
+typedef void td_replication_callback(td_replication_connect_t *r, int rc);
+
+struct td_replication_connect {
+ /*
+ * caller must fill these in before calling
+ * td_replication_connect_init()
+ */
+ const char *log_prefix;
+ td_replication_callback *callback;
+ int retry_timeout_s;
+ int max_connections;
+ /*
+ * The caller uses this fd to read/write after
+ * the connection is established
+ */
+ int fd;
+
+ /* private */
+ struct sockaddr_in sa;
+ int listen_fd;
+ event_id_t id;
+
+ int status;
+};
+
+/* return -errno if failure happened, otherwise return 0 */
+int td_replication_connect_init(td_replication_connect_t *t, const char *name);
+/*
+ * Return value:
+ * -1: connection is closed or not connected
+ * 0: connection is in progress
+ * 1: connection is established
+ */
+int td_replication_connect_status(td_replication_connect_t *t);
+void td_replication_connect_kill(td_replication_connect_t *t);
+
+/*
+ * Return value:
+ * -2: this caller should be client
+ * -1: error
+ * 0: connection is in progress
+ */
+int td_replication_server_start(td_replication_connect_t *t);
+/*
+ * Return value:
+ * -2: this caller should be client
+ * -1: error
+ * 0: connection is in progress
+ */
+int td_replication_server_restart(td_replication_connect_t *t);
+/*
+ * Return value:
+ * -1: error
+ * 0: connection is in progress
+ */
+int td_replication_client_start(td_replication_connect_t *t);
+
+#endif
--
1.9.3
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |