[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [RFC Patch v3 15/22] blktap2: move async connect related codes to block-replication.c



On Sep 5, 2014 5:31 AM, "Wen Congyang" <wency@xxxxxxxxxxxxxx> wrote:
>
> Â ÂCOLO will reuse them.
>
> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
> ---
> Âtools/blktap2/drivers/Makefile      | Â2 +-
> Âtools/blktap2/drivers/block-remus.c   Â| 494 +++---------------------------
> Âtools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++
> Âtools/blktap2/drivers/block-replication.h | 113 +++++++
> Â4 files changed, 630 insertions(+), 447 deletions(-)
> Âcreate mode 100644 tools/blktap2/drivers/block-replication.c
> Âcreate mode 100644 tools/blktap2/drivers/block-replication.h
>
> diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
> index 37c3485..3d8ed8a 100644
> --- a/tools/blktap2/drivers/Makefile
> +++ b/tools/blktap2/drivers/Makefile
> @@ -23,7 +23,7 @@ endif
>
> ÂVHDLIBSÂ Â := -L$(LIBVHDDIR) -lvhd
>
> -REMUS-OBJSÂ := block-remus.o
> +REMUS-OBJSÂ := block-remus.o block-replication.o
> ÂREMUS-OBJSÂ += hashtable.o
> ÂREMUS-OBJSÂ += hashtable_itr.o
> ÂREMUS-OBJSÂ += hashtable_utility.o
> diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c
> index 5d27d41..8b6f157 100644
> --- a/tools/blktap2/drivers/block-remus.c
> +++ b/tools/blktap2/drivers/block-remus.c
> @@ -40,6 +40,7 @@
> Â#include "hashtable.h"
> Â#include "hashtable_itr.h"
> Â#include "hashtable_utility.h"
> +#include "block-replication.h"
>
> Â#include <errno.h>
> Â#include <inttypes.h>
> @@ -49,10 +50,7 @@
> Â#include <string.h>
> Â#include <sys/time.h>
> Â#include <sys/types.h>
> -#include <sys/socket.h>
> -#include <netdb.h>
> Â#include <netinet/in.h>
> -#include <arpa/inet.h>
> Â#include <sys/param.h>
> Â#include <sys/sysctl.h>
> Â#include <unistd.h>
> @@ -67,22 +65,6 @@
>
> Â#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
>
> -#define UNREGISTER_EVENT(id)Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> -Â Â Â Âdo {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> -Â Â Â Â Â Â Â Âif (id >= 0) {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> -Â Â Â Â Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(id);Â Â \
> -Â Â Â Â Â Â Â Â Â Â Â Âid = -1;Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> -Â Â Â Â Â Â Â Â}Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> -Â Â Â Â} while (0)
> -
> -#define CLOSE_FD(fd)Â Â Â Â Â Â Â Â Â Â\
> -Â Â Â Âdo {Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> -Â Â Â Â Â Â Â Âif (fd >= 0) {Â Â Â Â Â \
> -Â Â Â Â Â Â Â Â Â Â Â Âclose(fd);Â Â Â \
> -Â Â Â Â Â Â Â Â Â Â Â Âfd = -1;Â Â Â Â \
> -Â Â Â Â Â Â Â Â}Â Â Â Â Â Â Â Â Â Â Â Â\
> -Â Â Â Â} while (0)
> -
> Â#define MAX_REMUS_REQUESTÂ Â Â ÂTAPDISK_DATA_REQUESTS
>
> Âenum tdremus_mode {
> @@ -92,13 +74,6 @@ enum tdremus_mode {
> Â Â Â Â mode_backup
> Â};
>
> -enum {
> -Â Â Â ÂERROR_INTERNAL = -1,
> -Â Â Â ÂERROR_IO = -2,
> -Â Â Â ÂERROR_CONNECTION = -3,
> -Â Â Â ÂERROR_CLOSE = -4,
> -};
> -
> Âstruct tdremus_req {
> Â Â Â Â td_request_t treq;
> Â};
> @@ -167,21 +142,9 @@ struct ramdisk_write_cbdata {
>
> Âtypedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
>
> -/*
> - * If cid, rid and wid are -1, fd must be -1. It means that
> - * we are in unpritected mode or we don't start to connect
> - * to backup.
> - * If fd is an valid fd:
> - *Â cid is valid, rid and wid must be invalid. It means that
> - *Â Â Â the connection is in progress.
> - *Â cid is invalid. rid or wid must be valid. It means that
> - *Â Â Â the connection is established.
> - */
> Âtypedef struct poll_fd {
>     int    fd;
> -Â Â Â Âevent_id_t cid;
> -Â Â Â Âevent_id_t rid;
> -Â Â Â Âevent_id_t wid;
> +Â Â Â Âevent_id_t id;
> Â} poll_fd_t;
>
> Âstruct tdremus_state {
> @@ -195,9 +158,7 @@ struct tdremus_state {
> Â Â Â Â char*Â Â Âmsg_path; /* output completion message here */
> Â Â Â Â poll_fd_t msg_fd;
>
> -Â /* replication host */
> -Â Â Â Âstruct sockaddr_in sa;
> -Â Â Â Âpoll_fd_t server_fd;Â Â /* server listen port */
> +Â Â Â Âtd_replication_connect_t t;
> Â Â Â Â poll_fd_t stream_fd;Â Â Â/* replication channel */
>
> Â Â Â Â /*
> @@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len)
> Â Â Â Â select(fd + 1, NULL, &wfds, NULL, &tv);
> Â}
>
> -
> -static void inline close_stream_fd(struct tdremus_state *s)
> -{
> -
> -Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.cid);
> -Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.rid);
> -Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.wid);
> -
> -Â Â Â Â/* close the connection */
> -Â Â Â ÂCLOSE_FD(s->stream_fd.fd);
> -}
> -
> -static void close_server_fd(struct tdremus_state *s)
> -{
> -Â Â Â ÂUNREGISTER_EVENT(s->server_fd.cid);
> -Â Â Â ÂCLOSE_FD(s->server_fd.fd);
> -}
> -
> Â/* primary functions */
> -static void remus_client_event(event_id_t, char mode, void *private);
> -static void remus_connect_event(event_id_t id, char mode, void *private);
> -static void remus_retry_connect_event(event_id_t id, char mode, void *private);
> +static void remus_client_event(event_id_t id, char mode, void *private);
> Âstatic int primary_forward_request(struct tdremus_state *s,
> Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âconst td_request_t *treq);
>
> @@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state *s,
> Â */
> Âstatic void primary_failed(struct tdremus_state *s, int rc)
> Â{
> -Â Â Â Âclose_stream_fd(s);
> +Â Â Â Âtd_replication_connect_kill(&s->t);
> Â Â Â Â if (rc == ERROR_INTERNAL)
> Â Â Â Â Â Â Â Â RPRINTF("switch to unprotected mode due to internal error");
> Â Â Â Â if (rc == ERROR_CLOSE)
> Â Â Â Â Â Â Â Â RPRINTF("switch to unprotected mode before closing");
> +Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.id);
> Â Â Â Â switch_mode(s->tdremus_driver, mode_unprotected);
> Â}
>
> -static int primary_do_connect(struct tdremus_state *state)
> -{
> -Â Â Â Âevent_id_t id;
> -Â Â Â Âint fd;
> -Â Â Â Âint rc;
> -Â Â Â Âint flags;
> -
> -Â Â Â ÂRPRINTF("client connecting to %s:%d...\n",
> -Â Â Â Â Â Â Â Âinet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> -Â Â Â Âif ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("could not create client socket: %d\n", errno);
> -Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> -Â Â Â Â}
> -Â Â Â Âstate->stream_fd.fd = fd;
> -
> -Â Â Â Â/* make socket nonblocking */
> -Â Â Â Âif ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> -Â Â Â Â Â Â Â Âflags = 0;
> -Â Â Â Âif (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error setting fd %d to non block mode\n", fd);
> -Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> -Â Â Â Â}
> -
> -Â Â Â Â/*
> -Â Â Â Â * once we have created the socket and populated the address,
> -Â Â Â Â * we can now start our non-blocking connect. rather than
> -Â Â Â Â * duplicating code we trigger a timeout on the socket fd,
> -Â Â Â Â * which calls out nonblocking connect code
> -Â Â Â Â */
> -Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_retry_connect_event,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â state)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering timeout client connection event handler: %s\n",
> -Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> -Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> -Â Â Â Â}
> -
> -Â Â Â Âstate->stream_fd.cid = id;
> -Â Â Â Âreturn 0;
> -}
> -
> Âstatic int remus_handle_queued_io(struct tdremus_state *s)
> Â{
> Â Â Â Â struct req_ring *queued_io = &s->queued_io;
> @@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state *s)
> Â Â Â Â return 0;
> Â}
>
> -static int remus_connection_done(struct tdremus_state *s)
> +static void remus_client_established(td_replication_connect_t *t, int rc)
> Â{
> +Â Â Â Âstruct tdremus_state *s = CONTAINER_OF(t, *s, t);
> Â Â Â Â event_id_t id;
>
> -Â Â Â Â/* the connect succeeded */
> -Â Â Â Â/* unregister this function and register a new event handler */
> -Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.cid);
> -Â Â Â Âs->stream_fd.cid = -1;
> +Â Â Â Âif (rc) {
> +Â Â Â Â Â Â Â Âprimary_failed(s, rc);
> +Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â}
>
> -Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd,
> +Â Â Â Â/* the connect succeeded */
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
> Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â0, remus_client_event, s);
> Â Â Â Â if(id < 0) {
> Â Â Â Â Â Â Â Â RPRINTF("error registering client event handler: %s\n",
> Â Â Â Â Â Â Â Â Â Â Â Â strerror(id));
> -Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> -Â Â Â Â}
> -Â Â Â Âs->stream_fd.rid = id;
> -
> -Â Â Â Â/* handle the queued requests */
> -Â Â Â Âreturn remus_handle_queued_io(s);
> -}
> -
> -static int remus_retry_connect(struct tdremus_state *s)
> -{
> -Â Â Â Âevent_id_t id;
> -
> -Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.cid);
> -Â Â Â Âs->stream_fd.cid = -1;
> -
> -Â Â Â ÂRPRINTF("connect to backup 1 second later");
> -Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â s->stream_fd.fd,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â REMUS_CONNRETRY_TIMEOUT,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_retry_connect_event, s);
> -Â Â Â Âif (id < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering timeout client connection event handler: %s\n",
> -Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> -Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> -Â Â Â Â}
> -
> -Â Â Â Âs->stream_fd.cid = id;
> -Â Â Â Âreturn 0;
> -}
> -
> -static int remus_wait_connect_done(struct tdremus_state *s)
> -{
> -Â Â Â Âevent_id_t id;
> -
> -Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.cid);
> -Â Â Â Âs->stream_fd.cid = -1;
> -
> -Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â s->stream_fd.fd, 0,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_connect_event, s);
> -Â Â Â Âif (id < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering client connection event handler: %s\n",
> -Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> -Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> -Â Â Â Â}
> -Â Â Â Âs->stream_fd.cid = id;
> -
> -Â Â Â Âreturn 0;
> -}
> -
> -/* return 1 if we need to reconnect to backup */
> -static int check_connect_errno(int err)
> -{
> -Â Â Â Â/*
> -Â Â Â Â * The fd is non-block, so we will not get ETIMEDOUT
> -Â Â Â Â * after calling connect(). We only can get this errno
> -Â Â Â Â * by getsockopt().
> -Â Â Â Â */
> -Â Â Â Âif (err == ECONNREFUSED || err == ENETUNREACH ||
> -Â Â Â Â Â Âerr == EAGAIN || err == ECONNABORTED ||
> -Â Â Â Â Â Âerr == ETIMEDOUT)
> -Â Â Â Â Â Âreturn 1;
> -
> -Â Â Â Âreturn 0;
> -}
> -
> -static void remus_retry_connect_event(event_id_t id, char mode, void *private)
> -{
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)private;
> -Â Â Â Âint rc, ret;
> -
> -Â Â Â Â/* do a non-blocking connect */
> -Â Â Â Âret = connect(s->stream_fd.fd,
> -Â Â Â Â Â Â Â Â Â Â Â(struct sockaddr *)&s->sa,
> -Â Â Â Â Â Â Â Â Â Â Âsizeof(s->sa));
> -Â Â Â Âif (ret) {
> -Â Â Â Â Â Â Â Âif (errno == EINPROGRESS) {
> -Â Â Â Â Â Â Â Â Â Â Â Â/*
> -Â Â Â Â Â Â Â Â Â Â Â Â * the connect returned EINPROGRESS (nonblocking
> -Â Â Â Â Â Â Â Â Â Â Â Â * connect) we must wait for the fd to be writeable
> -Â Â Â Â Â Â Â Â Â Â Â Â * to determine if the connect worked
> -Â Â Â Â Â Â Â Â Â Â Â Â */
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = remus_wait_connect_done(s);
> -Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> -Â Â Â Â Â Â Â Â}
> -
> -Â Â Â Â Â Â Â Âif (check_connect_errno(errno)) {
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = remus_retry_connect(s);
> -Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> -Â Â Â Â Â Â Â Â}
> -
> -Â Â Â Â Â Â Â Â/* not recoverable */
> -Â Â Â Â Â Â Â ÂRPRINTF("error connection to server %s\n", strerror(errno));
> -Â Â Â Â Â Â Â Ârc = ERROR_CONNECTION;
> -Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â}
> -
> -Â Â Â Â/* The connection is established unexpectedly */
> -Â Â Â Ârc = remus_connection_done(s);
> -Â Â Â Âif (rc)
> -Â Â Â Â Â Â Â Âgoto fail;
> -
> -Â Â Â Âreturn;
> -
> -fail:
> -Â Â Â Âprimary_failed(s, rc);
> -Â Â Â Âreturn;
> -}
> -
> -/* callback when nonblocking connect() is finished */
> -static void remus_connect_event(event_id_t id, char mode, void *private)
> -{
> -Â Â Â Âint socket_errno;
> -Â Â Â Âsocklen_t socket_errno_size;
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)private;
> -Â Â Â Âint rc;
> -
> -Â Â Â Â/* check to see if the connect succeeded */
> -Â Â Â Âsocket_errno_size = sizeof(socket_errno);
> -Â Â Â Âif (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
> -Â Â Â Â Â Â Â Â Â Â Â &socket_errno, &socket_errno_size)) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error getting socket errno\n");
> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_INTERNAL);
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
>
> -Â Â Â ÂRPRINTF("socket connect returned %d\n", socket_errno);
> +Â Â Â Âs->stream_fd.fd = t->fd;
> +Â Â Â Âs->stream_fd.id = id;
>
> -Â Â Â Âif (socket_errno) {
> -Â Â Â Â Â Â Â Â/* the connect did not succeed */
> -Â Â Â Â Â Â Â Âif (check_connect_errno(socket_errno)) {
> -Â Â Â Â Â Â Â Â Â Â Â Â/*
> -Â Â Â Â Â Â Â Â Â Â Â Â * we can probably assume that the backup is down.
> -Â Â Â Â Â Â Â Â Â Â Â Â * just try again later
> -Â Â Â Â Â Â Â Â Â Â Â Â */
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = remus_retry_connect(s);
> -Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -
> -Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> -Â Â Â Â Â Â Â Â} else {
> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("socket connect returned %d, giving up\n",
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsocket_errno);
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = ERROR_CONNECTION;
> -Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â Â Â Â Â}
> -
> -Â Â Â Â Â Â Â Âreturn;
> -Â Â Â Â}
> -
> -Â Â Â Ârc = remus_connection_done(s);
> +Â Â Â Â/* handle the queued requests */
> +Â Â Â Ârc = remus_handle_queued_io(s);
> Â Â Â Â if (rc)
> -Â Â Â Â Â Â Â Âgoto fail;
> -
> -Â Â Â Âreturn;
> -
> -fail:
> -Â Â Â Âprimary_failed(s, rc);
> +Â Â Â Â Â Â Â Âprimary_failed(s, rc);
> Â}
>
> -
> Â/*
> Â * we install this event handler on the primary once we have
> Â * connected to the backup.
> @@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state *s,
> Âstatic void primary_queue_write(td_driver_t *driver, td_request_t treq)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
> -Â Â Â Âint rc;
> +Â Â Â Âint rc, ret;
>
> Â Â Â Â // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
>
> -Â Â Â Âif(s->stream_fd.fd < 0) {
> +Â Â Â Âret = td_replication_connect_status(&s->t);
> +Â Â Â Âif(ret == -1) {
> Â Â Â Â Â Â Â Â RPRINTF("connecting to backup...\n");
> -Â Â Â Â Â Â Â Ârc = primary_do_connect(s);
> +Â Â Â Â Â Â Â Âs->t.callback = remus_client_established;
> +Â Â Â Â Â Â Â Ârc = td_replication_client_start(&s->t);
> Â Â Â Â Â Â Â Â if (rc)
> Â Â Â Â Â Â Â Â Â Â Â Â goto fail;
> Â Â Â Â }
>
> Â Â Â Â /* The connection is not established, just queue the request */
> -Â Â Â Âif (s->stream_fd.cid >= 0) {
> +Â Â Â Âif (ret != 1) {
> Â Â Â Â Â Â Â Â ring_add_request(&s->queued_io, &treq);
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
> @@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver)
> Â Â Â Â s->queue_flush = primary_flush;
>
> Â Â Â Â s->stream_fd.fd = -1;
> -Â Â Â Âs->stream_fd.cid = -1;
> -Â Â Â Âs->stream_fd.rid = -1;
> -Â Â Â Âs->stream_fd.wid = -1;
> +Â Â Â Âs->stream_fd.id = -1;
>
> Â Â Â Â return 0;
> Â}
> @@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char mode, void *private);
> Â/* It is called when we find some I/O error */
> Âstatic void backup_failed(struct tdremus_state *s, int rc)
> Â{
> -Â Â Â Âclose_stream_fd(s);
> -Â Â Â Âclose_server_fd(s);
> +Â Â Â Âtd_replication_connect_kill(&s->t);
> Â Â Â Â /* We will switch to unprotected mode in backup_queue_write() */
> Â}
>
> Â/* returns the socket that receives write requests */
> -static void remus_server_accept(event_id_t id, char mode, void* private)
> +static void remus_server_established(td_replication_connect_t *t, int rc)
> Â{
> -Â Â Â Âstruct tdremus_state* s = (struct tdremus_state *) private;
> -
> -Â Â Â Âint stream_fd;
> -
> -Â Â Â Â/* XXX: add address-based black/white list */
> -Â Â Â Âif ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error accepting connection: %d\n", errno);
> -Â Â Â Â Â Â Â Âreturn;
> -Â Â Â Â}
> +Â Â Â Âstruct tdremus_state *s = CONTAINER_OF(t, *s, t);
> +Â Â Â Âevent_id_t id;
>
> -Â Â Â Â/*
> -Â Â Â Â * TODO: check to see if we are already replicating.
> -Â Â Â Â * if so just close the connection (or do something
> -Â Â Â Â * smarter)
> -Â Â Â Â */
> -Â Â Â ÂRPRINTF("server accepted connection\n");
> +Â Â Â Â/* rc is always 0 */
>
> Â Â Â Â /* add tapdisk event for replication stream */
> -Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
> Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âremus_server_event, s);
>
> Â Â Â Â if (id < 0) {
> Â Â Â Â Â Â Â Â RPRINTF("error registering connection event handler: %s\n",
> Â Â Â Â Â Â Â Â Â Â Â Â strerror(errno));
> -Â Â Â Â Â Â Â Âclose(stream_fd);
> +Â Â Â Â Â Â Â Âtd_replication_server_restart(t);
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
>
> Â Â Â Â /* store replication file descriptor */
> -Â Â Â Âs->stream_fd.fd = stream_fd;
> -Â Â Â Âs->stream_fd.rid = id;
> -}
> -
> -/* returns -2 if EADDRNOTAVAIL */
> -static int remus_bind(struct tdremus_state* s)
> -{
> -Â Â Â Âint opt;
> -Â Â Â Âint rc = -1;
> -Â Â Â Âevent_id_t id;
> -
> -Â Â Â Âif ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("could not create server socket: %d\n", errno);
> -Â Â Â Â Â Â Â Âreturn rc;
> -Â Â Â Â}
> -
> -Â Â Â Âopt = 1;
> -Â Â Â Âif (setsockopt(s->server_fd.fd, SOL_SOCKET,
> -Â Â Â Â Â Â Â Â Â Â Â SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> -Â Â Â Â Â Â Â ÂRPRINTF("Error setting REUSEADDR on %d: %d\n",
> -Â Â Â Â Â Â Â Â Â Â Â Âs->server_fd.fd, errno);
> -
> -Â Â Â Âif (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
> -Â Â Â Â Â Â Â Â sizeof(s->sa)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
> -Â Â Â Â Â Â Â Â Â Â Â Âs->server_fd.fd, inet_ntoa(s->sa.sin_addr),
> -Â Â Â Â Â Â Â Â Â Â Â Ântohs(s->sa.sin_port), errno, strerror(errno));
> -Â Â Â Â Â Â Â Âif (errno == EADDRNOTAVAIL)
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = -2;
> -Â Â Â Â Â Â Â Âgoto err_sfd;
> -Â Â Â Â}
> -
> -Â Â Â Âif (listen(s->server_fd.fd, 10)) {
> -Â Â Â Â Â Â Â ÂRPRINTF("could not listen on socket: %d\n", errno);
> -Â Â Â Â Â Â Â Âgoto err_sfd;
> -Â Â Â Â}
> -
> -Â Â Â Â/*
> -Â Â Â Â * The socket s now bound to the address and listening so we
> -Â Â Â Â * may now register the fd with tapdisk
> -Â Â Â Â */
> -Â Â Â Âid =Â tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âs->server_fd.fd, 0,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âremus_server_accept, s);
> -Â Â Â Âif (id < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering server connection event handler: %s",
> -Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> -Â Â Â Â Â Â Â Âgoto err_sfd;
> -Â Â Â Â}
> -Â Â Â Âs->server_fd.cid = id;
> -
> -Â Â Â Âreturn 0;
> -
> -err_sfd:
> -Â Â Â ÂCLOSE_FD(s->server_fd.fd);
> -
> -Â Â Â Âreturn rc;
> +Â Â Â Âs->stream_fd.fd = t->fd;
> +Â Â Â Âs->stream_fd.id = id;
> Â}
>
> Â/* wait for latest checkpoint to be applied */
> @@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver)
>
>
> Â/* control */
> -
> -static inline int resolve_address(const char* addr, struct in_addr* ia)
> -{
> -Â Â Â Âstruct hostent* he;
> -Â Â Â Âuint32_t ip;
> -
> -Â Â Â Âif (!(he = gethostbyname(addr))) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error resolving %s: %d\n", addr, h_errno);
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> -
> -Â Â Â Âif (!he->h_addr_list[0]) {
> -Â Â Â Â Â Â Â ÂRPRINTF("no address found for %s\n", addr);
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> -
> -Â Â Â Â/* network byte order */
> -Â Â Â Âip = *((uint32_t**)he->h_addr_list)[0];
> -Â Â Â Âia->s_addr = ip;
> -
> -Â Â Â Âreturn 0;
> -}
> -
> -static int get_args(td_driver_t *driver, const char* name)
> -{
> -Â Â Â Âstruct tdremus_state *state = (struct tdremus_state *)driver->data;
> -Â Â Â Âchar* host;
> -Â Â Â Âchar* port;
> -//Â char* driver_str;
> -//Â char* parent;
> -//Â int type;
> -//Â char* path;
> -//Â unsigned long ulport;
> -//Â int i;
> -//Â struct sockaddr_in server_addr_in;
> -
> -Â Â Â Âint gai_status;
> -Â Â Â Âint valid_addr;
> -Â Â Â Âstruct addrinfo gai_hints;
> -Â Â Â Âstruct addrinfo *servinfo, *servinfo_itr;
> -
> -Â Â Â Âmemset(&gai_hints, 0, sizeof gai_hints);
> -Â Â Â Âgai_hints.ai_family = AF_UNSPEC;
> -Â Â Â Âgai_hints.ai_socktype = SOCK_STREAM;
> -
> -Â Â Â Âport = strchr(name, ':');
> -Â Â Â Âif (!port) {
> -Â Â Â Â Â Â Â ÂRPRINTF("missing host in %s\n", name);
> -Â Â Â Â Â Â Â Âreturn -ENOENT;
> -Â Â Â Â}
> -Â Â Â Âif (!(host = strndup(name, port - name))) {
> -Â Â Â Â Â Â Â ÂRPRINTF("unable to allocate host\n");
> -Â Â Â Â Â Â Â Âreturn -ENOMEM;
> -Â Â Â Â}
> -Â Â Â Âport++;
> -
> -Â Â Â Âif ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
> -Â Â Â Â Â Â Â Âreturn -ENOENT;
> -Â Â Â Â}
> -
> -Â Â Â Â/* TODO: do something smarter here */
> -Â Â Â Âvalid_addr = 0;
> -Â Â Â Âfor(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
> -Â Â Â Â Â Â Â Âvoid *addr;
> -Â Â Â Â Â Â Â Âchar *ipver;
> -
> -Â Â Â Â Â Â Â Âif (servinfo_itr->ai_family == AF_INET) {
> -Â Â Â Â Â Â Â Â Â Â Â Âvalid_addr = 1;
> -Â Â Â Â Â Â Â Â Â Â Â Âmemset(&state->sa, 0, sizeof(state->sa));
> -Â Â Â Â Â Â Â Â Â Â Â Âstate->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
> -Â Â Â Â Â Â Â Â Â Â Â Âbreak;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â}
> -Â Â Â Âfreeaddrinfo(servinfo);
> -
> -Â Â Â Âif (!valid_addr)
> -Â Â Â Â Â Â Â Âreturn -ENOENT;
> -
> -Â Â Â ÂRPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> -
> -Â Â Â Âreturn 0;
> -}
> -
> Âstatic int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
> @@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s)
> Â Â Â Â RPRINTF("registering ctl fifo\n");
>
> Â Â Â Â /* register ctl fd */
> -Â Â Â Âs->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
> +Â Â Â Âs->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
>
> -Â Â Â Âif (s->ctl_fd.cid < 0) {
> +Â Â Â Âif (s->ctl_fd.id < 0) {
> Â Â Â Â Â Â Â Â RPRINTF("error registering ctrl FIFO %s: %d\n",
> -Â Â Â Â Â Â Â Â Â Â Â Âs->ctl_path, s->ctl_fd.cid);
> +Â Â Â Â Â Â Â Â Â Â Â Âs->ctl_path, s->ctl_fd.id);
> Â Â Â Â Â Â Â Â return -1;
> Â Â Â Â }
>
> @@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s)
> Â{
> Â Â Â Â RPRINTF("unregistering ctl fifo\n");
>
> -Â Â Â ÂUNREGISTER_EVENT(s->ctl_fd.cid);
> +Â Â Â ÂUNREGISTER_EVENT(s->ctl_fd.id);
> Â}
>
> Â/* interface */
> @@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s)
> Âstatic int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
> +Â Â Â Âtd_replication_connect_t *t = &s->t;
> Â Â Â Â int rc;
> Â Â Â Â const char *name = image->name;
> Â Â Â Â td_flag_t flags = image->flags;
> @@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> Â Â Â Â remus_image = image;
>
> Â Â Â Â memset(s, 0, sizeof(*s));
> -Â Â Â Âs->server_fd.fd = -1;
> Â Â Â Â s->stream_fd.fd = -1;
> Â Â Â Â s->ctl_fd.fd = -1;
> Â Â Â Â s->msg_fd.fd = -1;
> @@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> Â Â Â Â Â* the driver stack from the stream_fd event handler */
> Â Â Â Â s->tdremus_driver = driver;
>
> +Â Â Â Ât->log_prefix = "remus";
> +Â Â Â Ât->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
> +Â Â Â Ât->max_connections = 10;
> +Â Â Â Ât->callback = remus_server_established;
> Â Â Â Â /* parse name to get info etc */
> -Â Â Â Âif ((rc = get_args(driver, name)))
> +Â Â Â Âif ((rc = td_replication_connect_init(t, name)))
> Â Â Â Â Â Â Â Â return rc;
>
> Â Â Â Â if ((rc = ctl_open(driver, name))) {
> @@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
> Â Â Â Â Â Â Â Â return rc;
> Â Â Â Â }
>
> -Â Â Â Âif (!(rc = remus_bind(s)))
> +Â Â Â Âif (!(rc = td_replication_server_start(t)))
> Â Â Â Â Â Â Â Â rc = switch_mode(driver, mode_backup);
> Â Â Â Â else if (rc == -2)
> Â Â Â Â Â Â Â Â rc = switch_mode(driver, mode_primary);
> @@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver)
> Â Â Â Â if (s->ramdisk.inprogress)
> Â Â Â Â Â Â Â Â hashtable_destroy(s->ramdisk.inprogress, 0);
>
> -Â Â Â Âclose_server_fd(s);
> -Â Â Â Âclose_stream_fd(s);
> +Â Â Â Âtd_replication_connect_kill(&s->t);
> Â Â Â Â ctl_unregister(s);
> Â Â Â Â ctl_close(s);
>
> diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c
> new file mode 100644
> index 0000000..e4b2679
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.c
> @@ -0,0 +1,468 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU Lesser General Public License as published
> + * by the Free Software Foundation; version 2.1 only. with the special
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#include "tapdisk-server.h"
> +#include "block-replication.h"
> +
> +#include <string.h>
> +#include <errno.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <syslog.h>
> +#include <stdlib.h>
> +#include <arpa/inet.h>
> +
> +#undef DPRINTF
> +#undef EPRINTF
> +#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
> +#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
> +
> +/* connection status */
> +enum {
> +Â Â Â Âconnection_none,
> +Â Â Â Âconnection_in_progress,
> +Â Â Â Âconnection_established,
> +Â Â Â Âconnection_closed,
> +};
> +
> +/* common functions */
> +/* args should be host:port */
> +static int get_args(td_replication_connect_t *t, const char* name)
> +{
> +Â Â Â Âchar* host;
> +Â Â Â Âconst char* port;
> +Â Â Â Âint gai_status;
> +Â Â Â Âint valid_addr;
> +Â Â Â Âstruct addrinfo gai_hints;
> +Â Â Â Âstruct addrinfo *servinfo, *servinfo_itr;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â Âmemset(&gai_hints, 0, sizeof gai_hints);
> +Â Â Â Âgai_hints.ai_family = AF_UNSPEC;
> +Â Â Â Âgai_hints.ai_socktype = SOCK_STREAM;
> +
> +Â Â Â Âport = strchr(name, ':');
> +Â Â Â Âif (!port) {
> +Â Â Â Â Â Â Â ÂEPRINTF("missing host in %s\n", name);
> +Â Â Â Â Â Â Â Âreturn -ENOENT;
> +Â Â Â Â}
> +Â Â Â Âif (!(host = strndup(name, port - name))) {
> +Â Â Â Â Â Â Â ÂEPRINTF("unable to allocate host\n");
> +Â Â Â Â Â Â Â Âreturn -ENOMEM;
> +Â Â Â Â}
> +Â Â Â Âport++;
> +Â Â Â Âif ((gai_status = getaddrinfo(host, port,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â&gai_hints, &servinfo)) != 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
> +Â Â Â Â Â Â Â Âfree(host);
> +Â Â Â Â Â Â Â Âreturn -ENOENT;
> +Â Â Â Â}
> +Â Â Â Âfree(host);
> +
> +Â Â Â Â/* TODO: do something smarter here */
> +Â Â Â Âvalid_addr = 0;
> +Â Â Â Âfor (servinfo_itr = servinfo; servinfo_itr != NULL;
> +Â Â Â Â Â Â servinfo_itr = servinfo_itr->ai_next) {
> +Â Â Â Â Â Â Â Âif (servinfo_itr->ai_family == AF_INET) {
> +Â Â Â Â Â Â Â Â Â Â Â Âvalid_addr = 1;
> +Â Â Â Â Â Â Â Â Â Â Â Âmemset(&t->sa, 0, sizeof(t->sa));
> +Â Â Â Â Â Â Â Â Â Â Â Ât->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
> +Â Â Â Â Â Â Â Â Â Â Â Âbreak;
> +Â Â Â Â Â Â Â Â}
> +Â Â Â Â}
> +Â Â Â Âfreeaddrinfo(servinfo);
> +
> +Â Â Â Âif (!valid_addr)
> +Â Â Â Â Â Â Â Âreturn -ENOENT;
> +
> +Â Â Â ÂDPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
> +Â Â Â Â Â Â Â Ântohs(t->sa.sin_port));
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +int td_replication_connect_init(td_replication_connect_t *t, const char *name)
> +{
> +Â Â Â Âint rc;
> +
> +Â Â Â Ârc = get_args(t, name);
> +Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Âreturn rc;
> +
> +Â Â Â Ât->listen_fd = -1;
> +Â Â Â Ât->id = -1;
> +Â Â Â Ât->status = connection_none;
> +Â Â Â Âreturn 0;
> +}
> +
> +int td_replication_connect_status(td_replication_connect_t *t)
> +{
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â Âswitch (t->status) {
> +Â Â Â Âcase connection_none:
> +Â Â Â Âcase connection_closed:
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Âcase connection_in_progress:
> +Â Â Â Â Â Â Â Âreturn 0;
> +Â Â Â Âcase connection_established:
> +Â Â Â Â Â Â Â Âreturn 1;
> +Â Â Â Âdefault:
> +Â Â Â Â Â Â Â ÂEPRINTF("td_replication_connect is corruptted\n");
> +Â Â Â Â Â Â Â Âreturn -2;
> +Â Â Â Â}
> +}
> +
> +void td_replication_connect_kill(td_replication_connect_t *t)
> +{
> +Â Â Â Âif (t->status != connection_in_progress &&
> +Â Â Â Â Â Ât->status != connection_established)
> +Â Â Â Â Â Â Â Âreturn;
> +
> +Â Â Â ÂUNREGISTER_EVENT(t->id);
> +Â Â Â ÂCLOSE_FD(t->fd);
> +Â Â Â ÂCLOSE_FD(t->listen_fd);
> +Â Â Â Ât->status = connection_closed;
> +}
> +
> +/* server */
> +static void td_replication_server_accept(event_id_t id, char mode,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â void *private);
> +
> +int td_replication_server_start(td_replication_connect_t *t)
> +{
> +Â Â Â Âint opt;
> +Â Â Â Âint rc = -1;
> +Â Â Â Âevent_id_t id;
> +Â Â Â Âint fd;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â Âif (t->status == connection_in_progress ||
> +Â Â Â Â Â Ât->status == connection_established)
> +Â Â Â Â Â Â Â Âreturn rc;
> +
> +Â Â Â Âfd = socket(AF_INET, SOCK_STREAM, 0);
> +Â Â Â Âif (fd < 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("could not create server socket: %d\n", errno);
> +Â Â Â Â Â Â Â Âreturn rc;
> +Â Â Â Â}
> +
> +Â Â Â Âopt = 1;
> +Â Â Â Âif (setsockopt(fd, SOL_SOCKET,
> +Â Â Â Â Â Â Â Â Â Â Â SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> +Â Â Â Â Â Â Â ÂDPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
> +
> +Â Â Â Âif (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
> +Â Â Â Â Â Â Â ÂDPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âfd, inet_ntoa(t->sa.sin_addr),
> +Â Â Â Â Â Â Â Â Â Â Â Ântohs(t->sa.sin_port), errno, strerror(errno));
> +Â Â Â Â Â Â Â Âif (errno == EADDRNOTAVAIL)
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = -2;
> +Â Â Â Â Â Â Â Âgoto err;
> +Â Â Â Â}
> +
> +Â Â Â Âif (listen(fd, t->max_connections)) {
> +Â Â Â Â Â Â Â ÂEPRINTF("could not listen on socket: %d\n", errno);
> +Â Â Â Â Â Â Â Âgoto err;
> +Â Â Â Â}
> +
> +Â Â Â Â/*
> +Â Â Â Â * The socket is now bound to the address and listening so we
> +Â Â Â Â * may now register the fd with tapdisk
> +Â Â Â Â */
> +Â Â Â Âid =Â tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âfd, 0,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âtd_replication_server_accept, t);
> +Â Â Â Âif (id < 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("error registering server connection event handler: %s",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âgoto err;
> +Â Â Â Â}
> +Â Â Â Ât->listen_fd = fd;
> +Â Â Â Ât->id = id;
> +Â Â Â Ât->status = connection_in_progress;
> +
> +Â Â Â Âreturn 0;
> +
> +err:
> +Â Â Â Âclose(fd);
> +Â Â Â Âreturn rc;
> +}
> +
> +static void td_replication_server_accept(event_id_t id, char mode,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â void *private)
> +{
> +Â Â Â Âtd_replication_connect_t *t = private;
> +Â Â Â Âint fd;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â Â/* XXX: add address-based black/white list */
> +Â Â Â Âfd = accept(t->listen_fd, NULL, NULL);
> +Â Â Â Âif (fd < 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("error accepting connection: %d\n", errno);
> +Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â}
> +
> +Â Â Â Âif (t->status == connection_established) {
> +Â Â Â Â Â Â Â ÂEPRINTF("connection is already established\n");
> +Â Â Â Â Â Â Â Âclose(fd);
> +Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â}
> +
> +Â Â Â ÂDPRINTF("server accepted connection\n");
> +Â Â Â Ât->fd = fd;
> +Â Â Â Ât->status = connection_established;
> +Â Â Â Ât->callback(t, 0);
> +}
> +
> +int td_replication_server_restart(td_replication_connect_t *t)
> +{
> +Â Â Â Âswitch (t->status) {
> +Â Â Â Âcase connection_in_progress:
> +Â Â Â Â Â Â Â Âreturn 0;
> +Â Â Â Âcase connection_established:
> +Â Â Â Â Â Â Â ÂCLOSE_FD(t->fd);
> +Â Â Â Â Â Â Â Ât->status = connection_in_progress;
> +Â Â Â Â Â Â Â Âreturn 0;
> +Â Â Â Âcase connection_none:
> +Â Â Â Âcase connection_closed:
> +Â Â Â Â Â Â Â Âreturn td_replication_server_start(t);
> +Â Â Â Âdefault:
> +Â Â Â Â Â Â Â Â/* not reached */
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +}
> +
> +/* client */
> +static void td_replication_retry_connect_event(event_id_t id, char mode,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â void *private);
> +static void td_replication_connect_event(event_id_t id, char mode,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â void *private);
> +int td_replication_client_start(td_replication_connect_t *t)
> +{
> +Â Â Â Âevent_id_t id;
> +Â Â Â Âint fd;
> +Â Â Â Âint rc;
> +Â Â Â Âint flags;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â Âif (t->status == connection_in_progress ||
> +Â Â Â Â Â Ât->status == connection_established)
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> +
> +Â Â Â ÂDPRINTF("client connecting to %s:%d...\n",
> +Â Â Â Â Â Â Â Âinet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
> +
> +Â Â Â Âif ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("could not create client socket: %d\n", errno);
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> +Â Â Â Â}
> +
> +Â Â Â Â/* make socket nonblocking */
> +Â Â Â Âif ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> +Â Â Â Â Â Â Â Âflags = 0;
> +Â Â Â Âif (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> +Â Â Â Â Â Â Â ÂEPRINTF("error setting fd %d to non block mode\n", fd);
> +Â Â Â Â Â Â Â Âgoto err;
> +Â Â Â Â}
> +
> +Â Â Â Â/*
> +Â Â Â Â * once we have created the socket and populated the address,
> +Â Â Â Â * we can now start our non-blocking connect. rather than
> +Â Â Â Â * duplicating code we trigger a timeout on the socket fd,
> +Â Â Â Â * which calls out nonblocking connect code
> +Â Â Â Â */
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â td_replication_retry_connect_event,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â t);
> +Â Â Â Âif(id < 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("error registering timeout client connection event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âgoto err;
> +Â Â Â Â}
> +
> +Â Â Â Ât->fd = fd;
> +Â Â Â Ât->id = id;
> +Â Â Â Ât->status = connection_in_progress;
> +Â Â Â Âreturn 0;
> +
> +err:
> +Â Â Â Âclose(fd);
> +Â Â Â Âreturn ERROR_INTERNAL;
> +}
> +
> +static void td_replication_client_failed(td_replication_connect_t *t, int rc)
> +{
> +Â Â Â Âtd_replication_connect_kill(t);
> +Â Â Â Ât->callback(t, rc);
> +}
> +
> +static void td_replication_client_done(td_replication_connect_t *t)
> +{
> +Â Â Â ÂUNREGISTER_EVENT(t->id);
> +Â Â Â Ât->status = connection_established;
> +Â Â Â Ât->callback(t, 0);
> +}
> +
> +static int td_replication_retry_connect(td_replication_connect_t *t)
> +{
> +Â Â Â Âevent_id_t id;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â ÂUNREGISTER_EVENT(t->id);
> +
> +Â Â Â ÂDPRINTF("connect to server 1 second later");
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â t->fd, t->retry_timeout_s,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â td_replication_retry_connect_event,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â t);
> +Â Â Â Âif (id < 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("error registering timeout client connection event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> +Â Â Â Â}
> +
> +Â Â Â Ât->id = id;
> +Â Â Â Âreturn 0;
> +}
> +
> +static int td_replication_wait_connect_done(td_replication_connect_t *t)
> +{
> +Â Â Â Âevent_id_t id;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â ÂUNREGISTER_EVENT(t->id);
> +
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â t->fd, 0,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â td_replication_connect_event, t);
> +Â Â Â Âif (id < 0) {
> +Â Â Â Â Â Â Â ÂEPRINTF("error registering client connection event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> +Â Â Â Â}
> +Â Â Â Ât->id = id;
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +/* return 1 if we need to reconnect to backup server */
> +static int check_connect_errno(int err)
> +{
> +Â Â Â Â/*
> +Â Â Â Â * The fd is non-block, so we will not get ETIMEDOUT
> +Â Â Â Â * after calling connect(). We only can get this errno
> +Â Â Â Â * by getsockopt().
> +Â Â Â Â */
> +Â Â Â Âif (err == ECONNREFUSED || err == ENETUNREACH ||
> +Â Â Â Â Â Âerr == EAGAIN || err == ECONNABORTED ||
> +Â Â Â Â Â Âerr == ETIMEDOUT)
> +Â Â Â Â Â Âreturn 1;
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +static void td_replication_retry_connect_event(event_id_t id, char mode,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â void *private)
> +{
> +Â Â Â Âtd_replication_connect_t *t = private;
> +Â Â Â Âint rc, ret;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â Â/* do a non-blocking connect */
> +Â Â Â Âret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
> +Â Â Â Âif (ret) {
> +Â Â Â Â Â Â Â Âif (errno == EINPROGRESS) {
> +Â Â Â Â Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â Â Â Â Â * the connect returned EINPROGRESS (nonblocking
> +Â Â Â Â Â Â Â Â Â Â Â Â * connect) we must wait for the fd to be writeable
> +Â Â Â Â Â Â Â Â Â Â Â Â * to determine if the connect worked
> +Â Â Â Â Â Â Â Â Â Â Â Â */
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = td_replication_wait_connect_done(t);
> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â Â Â Â Â}
> +
> +Â Â Â Â Â Â Â Âif (check_connect_errno(errno)) {
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = td_replication_retry_connect(t);
> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â Â Â Â Â}
> +
> +Â Â Â Â Â Â Â Â/* not recoverable */
> +Â Â Â Â Â Â Â ÂEPRINTF("error connection to server %s\n", strerror(errno));
> +Â Â Â Â Â Â Â Ârc = ERROR_CONNECTION;
> +Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â}
> +
> +Â Â Â Â/* The connection is established unexpectedly */
> +Â Â Â Âtd_replication_client_done(t);
> +
> +Â Â Â Âreturn;
> +
> +fail:
> +Â Â Â Âtd_replication_client_failed(t, rc);
> +}
> +
> +/* callback when nonblocking connect() is finished */
> +static void td_replication_connect_event(event_id_t id, char mode,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â void *private)
> +{
> +Â Â Â Âint socket_errno;
> +Â Â Â Âsocklen_t socket_errno_size;
> +Â Â Â Âtd_replication_connect_t *t = private;
> +Â Â Â Âint rc;
> +Â Â Â Âconst char *log_prefix = t->log_prefix;
> +
> +Â Â Â Â/* check to see if the connect succeeded */
> +Â Â Â Âsocket_errno_size = sizeof(socket_errno);
> +Â Â Â Âif (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
> +Â Â Â Â Â Â Â Â Â Â Â &socket_errno, &socket_errno_size)) {
> +Â Â Â Â Â Â Â ÂEPRINTF("error getting socket errno\n");
> +Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â}
> +
> +Â Â Â ÂDPRINTF("socket connect returned %d\n", socket_errno);
> +
> +Â Â Â Âif (socket_errno) {
> +Â Â Â Â Â Â Â Â/* the connect did not succeed */
> +Â Â Â Â Â Â Â Âif (check_connect_errno(socket_errno)) {
> +Â Â Â Â Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â Â Â Â Â * we can probably assume that the backup is down.
> +Â Â Â Â Â Â Â Â Â Â Â Â * just try again later
> +Â Â Â Â Â Â Â Â Â Â Â Â */
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = td_replication_retry_connect(t);
> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +
> +Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â Â Â Â Â} else {
> +Â Â Â Â Â Â Â Â Â Â Â ÂEPRINTF("socket connect returned %d, giving up\n",
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsocket_errno);
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = ERROR_CONNECTION;
> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â Â Â Â Â}
> +Â Â Â Â}
> +
> +Â Â Â Âtd_replication_client_done(t);
> +
> +Â Â Â Âreturn;
> +
> +fail:
> +Â Â Â Âtd_replication_client_failed(t, rc);
> +}
> diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h
> new file mode 100644
> index 0000000..0bd6e71
> --- /dev/null
> +++ b/tools/blktap2/drivers/block-replication.h
> @@ -0,0 +1,113 @@
> +/*
> + * Copyright (C) 2014 FUJITSU LIMITED
> + * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU Lesser General Public License as published
> + * by the Free Software Foundation; version 2.1 only. with the special
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#ifndef BLOCK_REPLICATION_H
> +#define BLOCK_REPLICATION_H
> +
> +#include "scheduler.h"
> +#include <sys/socket.h>
> +#include <netdb.h>
> +
> +#define CONTAINER_OF(inner_ptr, outer, member_name)Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â({Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Âtypeof(outer) *container_of_;Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â Â Â Â Âcontainer_of_ = (void*)((char*)(inner_ptr) -Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âoffsetof(typeof(outer), member_name));Â \
> +Â Â Â Â Â Â Â Â(void)(&container_of_->member_name ==Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â Â Â Â Â Â Â Â (typeof(inner_ptr))0) /* type check */;Â Â Â Â Â \
> +Â Â Â Â Â Â Â Âcontainer_of_;Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â})
> +
> +#define UNREGISTER_EVENT(id)Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Âdo {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Âif (id >= 0) {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(id);Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âid = -1;Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â}Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â} while (0)
> +#define CLOSE_FD(fd)Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Âdo {Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Âif (fd >= 0) {Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âclose(fd);Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âfd = -1;Â Â Â Â \
> +Â Â Â Â Â Â Â Â}Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â} while (0)
> +
> +enum {
> +Â Â Â ÂERROR_INTERNAL = -1,
> +Â Â Â ÂERROR_IO = -2,
> +Â Â Â ÂERROR_CONNECTION = -3,
> +Â Â Â ÂERROR_CLOSE = -4,
> +};
> +
> +typedef struct td_replication_connect td_replication_connect_t;
> +typedef void td_replication_callback(td_replication_connect_t *r, int rc);
> +
> +struct td_replication_connect {
> +Â Â Â Â/*
> +Â Â Â Â * caller must fill these in before calling
> +Â Â Â Â * td_replication_connect_init()
> +Â Â Â Â */
> +Â Â Â Âconst char *log_prefix;
> +Â Â Â Âtd_replication_callback *callback;
> +Â Â Â Âint retry_timeout_s;
> +Â Â Â Âint max_connections;
> +Â Â Â Â/*
> +Â Â Â Â * The caller uses this fd to read/write after
> +Â Â Â Â * the connection is established
> +Â Â Â Â */
> +Â Â Â Âint fd;
> +
> +Â Â Â Â/* private */
> +Â Â Â Âstruct sockaddr_in sa;
> +Â Â Â Âint listen_fd;
> +Â Â Â Âevent_id_t id;
> +
> +Â Â Â Âint status;
> +};
> +
> +/* return -errno if failure happened, otherwise return 0 */
> +int td_replication_connect_init(td_replication_connect_t *t, const char *name);
> +/*
> + * Return value:
> + *Â Â-1: connection is closed or not connected
> + *Â Â 0: connection is in progress
> + *Â Â 1: connection is established
> + */
> +int td_replication_connect_status(td_replication_connect_t *t);
> +void td_replication_connect_kill(td_replication_connect_t *t);
> +
> +/*
> + * Return value:
> + *Â Â-2: this caller should be client
> + *Â Â-1: error
> + *Â Â 0: connection is in progress
> + */
> +int td_replication_server_start(td_replication_connect_t *t);
> +/*
> + * Return value:
> + *Â Â-2: this caller should be client
> + *Â Â-1: error
> + *Â Â 0: connection is in progress
> + */
> +int td_replication_server_restart(td_replication_connect_t *t);
> +/*
> + * Return value:
> + *Â Â-1: error
> + *Â Â 0: connection is in progress
> + */
> +int td_replication_client_start(td_replication_connect_t *t);
> +
> +#endif
> --
> 1.9.3
>

Acked-by:Â Shriram Rajagopalan <rshriram@xxxxxxxxx>

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.