WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-devel

[Xen-devel] [PATCH 2 of 3] Remus: add control script to activate remus o

To: xen-devel@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-devel] [PATCH 2 of 3] Remus: add control script to activate remus on a VM
From: Brendan Cully <brendan@xxxxxxxxx>
Date: Thu, 12 Nov 2009 17:10:23 -0800
Cc: andy@xxxxxxxxx
Delivery-date: Thu, 12 Nov 2009 17:11:57 -0800
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
In-reply-to: <patchbomb.1258074621@xxxxxxxxxxxx>
List-help: <mailto:xen-devel-request@lists.xensource.com?subject=help>
List-id: Xen developer discussion <xen-devel.lists.xensource.com>
List-post: <mailto:xen-devel@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/mailman/listinfo/xen-devel>, <mailto:xen-devel-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/mailman/listinfo/xen-devel>, <mailto:xen-devel-request@lists.xensource.com?subject=unsubscribe>
References: <patchbomb.1258074621@xxxxxxxxxxxx>
Sender: xen-devel-bounces@xxxxxxxxxxxxxxxxxxx
User-agent: Mercurial-patchbomb/1.3.1+375-2dee9a359262
# HG changeset patch
# User Brendan Cully <brendan@xxxxxxxxx>
# Date 1258074147 28800
# Node ID 4e36da19dc8f433910be8adabd8a3e4e5cead5d6
# Parent  213fb814acf431d2a382e8f9c09b4cea106c0958
Remus: add control script to activate remus on a VM

Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>

diff --git a/tools/Makefile b/tools/Makefile
--- a/tools/Makefile
+++ b/tools/Makefile
@@ -33,6 +33,7 @@
 SUBDIRS-$(CONFIG_IOEMU) += ioemu-dir
 SUBDIRS-y += xenpmd
 SUBDIRS-y += libxl
+SUBDIRS-y += remus
 
 # These don't cross-compile
 ifeq ($(XEN_COMPILE_ARCH),$(XEN_TARGET_ARCH))
diff --git a/tools/remus/Makefile b/tools/remus/Makefile
new file mode 100644
--- /dev/null
+++ b/tools/remus/Makefile
@@ -0,0 +1,20 @@
+XEN_ROOT=../..
+include $(XEN_ROOT)/tools/Rules.mk
+
+SCRIPTS = remus
+
+.PHONY: all
+all: build
+
+.PHONY: build
+build:
+       echo "Nothing to do"
+
+.PHONY: install
+install:
+       $(INSTALL_DIR) $(DESTDIR)$(BINDIR)
+       $(INSTALL_PYTHON_PROG) $(SCRIPTS) $(DESTDIR)$(BINDIR)
+
+.PHONY: clean
+clean:
+       echo "Nothing to do"
diff --git a/tools/remus/README b/tools/remus/README
new file mode 100644
--- /dev/null
+++ b/tools/remus/README
@@ -0,0 +1,4 @@
+Remus provides fault tolerance for virtual machines by sending continuous
+checkpoints to a backup, which will activate if the target VM fails.
+
+See the website at http://nss.cs.ubc.ca/remus/ for details.
diff --git a/tools/remus/remus b/tools/remus/remus
new file mode 100755
--- /dev/null
+++ b/tools/remus/remus
@@ -0,0 +1,362 @@
+#!/usr/bin/env python
+#
+# This is a save process which also buffers outgoing I/O between
+# rounds, so that external viewers never see anything that hasn't
+# been committed at the backup
+#
+# TODO: fencing.
+
+import optparse, os, re, select, signal, sys, time
+from xen.remus import save, vm
+from xen.xend import XendOptions
+from xen.remus import netlink, qdisc, util
+
+class CfgException(Exception): pass
+
+class Cfg(object):
+    def __init__(self):
+        # must be set
+        self.domid = 0
+
+        self.host = 'localhost'
+        self.port = XendOptions.instance().get_xend_relocation_port()
+        self.interval = 200
+        self.netbuffer = True
+        self.nobackup = False
+        self.timer = False
+
+        parser = optparse.OptionParser()
+        parser.usage = '%prog [options] domain [destination]'
+        parser.add_option('-i', '--interval', dest='interval', type='int',
+                          metavar='MS',
+                          help='checkpoint every MS milliseconds')
+        parser.add_option('-p', '--port', dest='port', type='int',
+                          help='send stream to port PORT', metavar='PORT')
+        parser.add_option('', '--no-net', dest='nonet', action='store_true',
+                          help='run without net buffering (benchmark option)')
+        parser.add_option('', '--timer', dest='timer', action='store_true',
+                          help='force pause at checkpoint interval 
(experimental)')
+        parser.add_option('', '--no-backup', dest='nobackup',
+                          action='store_true',
+                          help='prevent backup from starting up (benchmark '
+                          'option)')
+        self.parser = parser
+
+    def usage(self):
+        self.parser.print_help()
+
+    def getargs(self):
+        opts, args = self.parser.parse_args()
+
+        if opts.interval:
+            self.interval = opts.interval
+        if opts.port:
+            self.port = opts.port
+        if opts.nonet:
+            self.netbuffer = False
+        if opts.timer:
+            self.timer = True
+
+        if not args:
+            raise CfgException('Missing domain')
+        self.domid = args[0]
+        if (len(args) > 1):
+            self.host = args[1]
+
+class ReplicatedDiskException(Exception): pass
+
+class BufferedDevice(object):
+    'Base class for buffered devices'
+
+    def postsuspend(self):
+        'called after guest has suspended'
+        pass
+
+    def preresume(self):
+        'called before guest resumes'
+        pass
+
+    def commit(self):
+        'called when backup has acknowledged checkpoint reception'
+        pass
+
+class ReplicatedDisk(BufferedDevice):
+    """
+    Send a checkpoint message to a replicated disk while the domain
+    is paused between epochs.
+    """
+    FIFODIR = '/var/run/tap'
+
+    def __init__(self, disk):
+        # look up disk, make sure it is tap:buffer, and set up socket
+        # to request commits.
+        self.ctlfd = None
+
+        if not disk.uname.startswith('tap:remus:') and not 
disk.uname.startswith('tap:tapdisk:remus:'):
+            raise ReplicatedDiskException('Disk is not replicated: %s' %
+                                        str(disk))
+        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', 
'_')
+        absfifo = os.path.join(self.FIFODIR, fifo)
+        absmsgfifo = absfifo + '.msg'
+
+        self.installed = False
+        self.ctlfd = open(absfifo, 'w+b')
+        self.msgfd = open(absmsgfifo, 'r+b')
+
+    def __del__(self):
+        self.uninstall()
+
+    def setup(self):
+        #self.ctlfd.write('buffer')
+        #self.ctlfd.flush()
+        self.installed = True
+
+    def uninstall(self):
+        if self.ctlfd:
+            self.ctlfd.close()
+            self.ctlfd = None
+
+    def postsuspend(self):
+        if not self.installed:
+            self.setup()
+
+        os.write(self.ctlfd.fileno(), 'flush')
+
+    def commit(self):
+        msg = os.read(self.msgfd.fileno(), 4)
+        if msg != 'done':
+            print 'Unknown message: %s' % msg
+
+class NetbufferException(Exception): pass
+
+class Netbuffer(BufferedDevice):
+    """
+    Buffer a protected domain's network output between rounds so that
+    nothing is issued that a failover might not know about.
+    """
+    # shared rtnetlink handle
+    rth = None
+
+    def __init__(self, domid):
+        self.installed = False
+
+        if not self.rth:
+            self.rth = netlink.rtnl()
+
+        self.devname = self._startimq(domid)
+        dev = self.rth.getlink(self.devname)
+        if not dev:
+            raise NetbufferException('could not find device %s' % self.devname)
+        self.dev = dev['index']
+        self.handle = qdisc.TC_H_ROOT
+        self.q = qdisc.QueueQdisc()
+
+    def __del__(self):
+        self.uninstall()
+
+    def postsuspend(self):
+        if not self.installed:
+            self._setup()
+
+        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
+
+    def commit(self):
+        '''Called when checkpoint has been acknowledged by
+        the backup'''
+        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
+
+    def _sendqmsg(self, action):
+        self.q.action = action
+        req = qdisc.changerequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+
+    def _setup(self):
+        q = self.rth.getqdisc(self.dev)
+        if q:
+            if q['kind'] == 'queue':
+                self.installed = True
+                return
+            if q['kind'] != 'pfifo_fast':
+                raise NetbufferException('there is already a queueing '
+                                         'discipline on %s' % self.devname)
+
+        print 'installing buffer on %s' % self.devname
+        req = qdisc.addrequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+        self.installed = True
+
+    def uninstall(self):
+        if self.installed:
+            req = qdisc.delrequest(self.dev, self.handle)
+            self.rth.talk(req.pack())
+            self.installed = False
+
+    def _startimq(self, domid):
+        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
+        imqebt = '/usr/lib/xen/bin/imqebt'
+        imqdev = 'imq0'
+        vid = 'vif%d.0' % domid
+        for mod in ['sch_queue', 'imq', 'ebt_imq']:
+            util.runcmd(['modprobe', mod])
+        util.runcmd("ip link set %s up" % (imqdev))
+        util.runcmd("ebtables -F FORWARD")
+        util.runcmd("ebtables -A FORWARD -i %s -j imq --todev %s" % (vid, 
imqdev))
+
+        return imqdev
+
+class SignalException(Exception): pass
+
+def run(cfg):
+    closure = lambda: None
+    closure.cmd = None
+
+    def sigexception(signo, frame):
+        raise SignalException(signo)
+
+    def die():
+        # I am not sure what the best way to die is. xm destroy is another 
option,
+        # or we could attempt to trigger some instant reboot.
+        print "dying..."
+        print util.runcmd(['sudo', 'ifdown', 'eth2'])
+        # dangling imq0 handle on vif locks up the system
+        for buf in bufs:
+            buf.uninstall()
+        print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
+        print util.runcmd(['sudo', 'ifup', 'eth2'])
+
+    def getcommand():
+        """Get a command to execute while running.
+        Commands include:
+          s: die prior to postsuspend hook
+          s2: die after postsuspend hook
+          r: die prior to preresume hook
+          r2: die after preresume hook
+          c: die prior to commit hook
+          c2: die after commit hook
+          """
+        r, w, x = select.select([sys.stdin], [], [], 0)
+        if sys.stdin not in r:
+            return
+
+        cmd = sys.stdin.readline().strip()
+        if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
+            print "unknown command: %s" % cmd
+        closure.cmd = cmd
+
+    signal.signal(signal.SIGTERM, sigexception)
+
+    dom = vm.VM(cfg.domid)
+
+    # set up I/O buffers
+    bufs = []
+
+    # disks must commit before network can be released
+    for disk in dom.disks:
+        try:
+            bufs.append(ReplicatedDisk(disk))
+        except ReplicatedDiskException, e:
+            print e
+            continue
+
+    if cfg.netbuffer:
+        for vif in dom.vifs:
+            bufs.append(Netbuffer(dom.domid))
+
+    fd = save.MigrationSocket((cfg.host, cfg.port))
+
+    def postsuspend():
+        'Begin external checkpointing after domain has paused'
+        if not cfg.timer:
+            # when not using a timer thread, sleep until now + interval
+            closure.starttime = time.time()
+
+        if closure.cmd == 's':
+            die()
+
+        for buf in bufs:
+            buf.postsuspend()
+
+        if closure.cmd == 's2':
+            die()
+
+    def preresume():
+        'Complete external checkpointing before domain resumes'
+        if closure.cmd == 'r':
+            die()
+
+        for buf in bufs:
+            buf.preresume()
+
+        if closure.cmd == 'r2':
+            die()
+
+    def commit():
+        'commit network buffer'
+        if closure.cmd == 'c':
+            die()
+
+        print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
+
+        for buf in bufs:
+            buf.commit()
+
+        if closure.cmd == 'c2':
+            die()
+
+        # Since the domain is running at this point, it's a good time to
+        # check for control channel commands
+        getcommand()
+
+        if not cfg.timer:
+            endtime = time.time()
+            elapsed = (endtime - closure.starttime) * 1000
+
+            if elapsed < cfg.interval:
+                time.sleep((cfg.interval - elapsed) / 1000.0)
+
+        # False ends checkpointing
+        return True
+
+    if cfg.timer:
+        interval = cfg.interval
+    else:
+        interval = 0
+
+    rc = 0
+
+    checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
+                              interval)
+
+    try:
+        checkpointer.start()
+    except save.CheckpointError, e:
+        print e
+        rc = 1
+    except KeyboardInterrupt:
+        pass
+    except SignalException:
+        print '*** signalled ***'
+
+    for buf in bufs:
+        buf.uninstall()
+
+    if cfg.nobackup:
+        # lame attempt to kill backup if protection is stopped deliberately.
+        # It would be much better to move this into the heartbeat "protocol".
+        print util.runcmd(['sudo', '-u', os.getlogin(), 'ssh', cfg.host, 
'sudo', 'xm', 'destroy', dom.name])
+
+    sys.exit(rc)
+
+cfg = Cfg()
+try:
+    cfg.getargs()
+except CfgException, inst:
+    print str(inst)
+    cfg.usage()
+    sys.exit(1)
+
+try:
+    run(cfg)
+except vm.VMException, inst:
+    print str(inst)
+    sys.exit(1)

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel

<Prev in Thread] Current Thread [Next in Thread>