[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 2 of 3] Remus: add control script to activate remus on a VM



# HG changeset patch
# User Brendan Cully <brendan@xxxxxxxxx>
# Date 1258074147 28800
# Node ID 4e36da19dc8f433910be8adabd8a3e4e5cead5d6
# Parent  213fb814acf431d2a382e8f9c09b4cea106c0958
Remus: add control script to activate remus on a VM

Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>

diff --git a/tools/Makefile b/tools/Makefile
--- a/tools/Makefile
+++ b/tools/Makefile
@@ -33,6 +33,7 @@
 SUBDIRS-$(CONFIG_IOEMU) += ioemu-dir
 SUBDIRS-y += xenpmd
 SUBDIRS-y += libxl
+SUBDIRS-y += remus
 
 # These don't cross-compile
 ifeq ($(XEN_COMPILE_ARCH),$(XEN_TARGET_ARCH))
diff --git a/tools/remus/Makefile b/tools/remus/Makefile
new file mode 100644
--- /dev/null
+++ b/tools/remus/Makefile
@@ -0,0 +1,20 @@
+XEN_ROOT=../..
+include $(XEN_ROOT)/tools/Rules.mk
+
+SCRIPTS = remus
+
+.PHONY: all
+all: build
+
+.PHONY: build
+build:
+       echo "Nothing to do"
+
+.PHONY: install
+install:
+       $(INSTALL_DIR) $(DESTDIR)$(BINDIR)
+       $(INSTALL_PYTHON_PROG) $(SCRIPTS) $(DESTDIR)$(BINDIR)
+
+.PHONY: clean
+clean:
+       echo "Nothing to do"
diff --git a/tools/remus/README b/tools/remus/README
new file mode 100644
--- /dev/null
+++ b/tools/remus/README
@@ -0,0 +1,4 @@
+Remus provides fault tolerance for virtual machines by sending continuous
+checkpoints to a backup, which will activate if the target VM fails.
+
+See the website at http://nss.cs.ubc.ca/remus/ for details.
diff --git a/tools/remus/remus b/tools/remus/remus
new file mode 100755
--- /dev/null
+++ b/tools/remus/remus
@@ -0,0 +1,362 @@
+#!/usr/bin/env python
+#
+# This is a save process which also buffers outgoing I/O between
+# rounds, so that external viewers never see anything that hasn't
+# been committed at the backup
+#
+# TODO: fencing.
+
+import optparse, os, re, select, signal, sys, time
+from xen.remus import save, vm
+from xen.xend import XendOptions
+from xen.remus import netlink, qdisc, util
+
+class CfgException(Exception): pass
+
+class Cfg(object):
+    def __init__(self):
+        # must be set
+        self.domid = 0
+
+        self.host = 'localhost'
+        self.port = XendOptions.instance().get_xend_relocation_port()
+        self.interval = 200
+        self.netbuffer = True
+        self.nobackup = False
+        self.timer = False
+
+        parser = optparse.OptionParser()
+        parser.usage = '%prog [options] domain [destination]'
+        parser.add_option('-i', '--interval', dest='interval', type='int',
+                          metavar='MS',
+                          help='checkpoint every MS milliseconds')
+        parser.add_option('-p', '--port', dest='port', type='int',
+                          help='send stream to port PORT', metavar='PORT')
+        parser.add_option('', '--no-net', dest='nonet', action='store_true',
+                          help='run without net buffering (benchmark option)')
+        parser.add_option('', '--timer', dest='timer', action='store_true',
+                          help='force pause at checkpoint interval 
(experimental)')
+        parser.add_option('', '--no-backup', dest='nobackup',
+                          action='store_true',
+                          help='prevent backup from starting up (benchmark '
+                          'option)')
+        self.parser = parser
+
+    def usage(self):
+        self.parser.print_help()
+
+    def getargs(self):
+        opts, args = self.parser.parse_args()
+
+        if opts.interval:
+            self.interval = opts.interval
+        if opts.port:
+            self.port = opts.port
+        if opts.nonet:
+            self.netbuffer = False
+        if opts.timer:
+            self.timer = True
+
+        if not args:
+            raise CfgException('Missing domain')
+        self.domid = args[0]
+        if (len(args) > 1):
+            self.host = args[1]
+
+class ReplicatedDiskException(Exception): pass
+
+class BufferedDevice(object):
+    'Base class for buffered devices'
+
+    def postsuspend(self):
+        'called after guest has suspended'
+        pass
+
+    def preresume(self):
+        'called before guest resumes'
+        pass
+
+    def commit(self):
+        'called when backup has acknowledged checkpoint reception'
+        pass
+
+class ReplicatedDisk(BufferedDevice):
+    """
+    Send a checkpoint message to a replicated disk while the domain
+    is paused between epochs.
+    """
+    FIFODIR = '/var/run/tap'
+
+    def __init__(self, disk):
+        # look up disk, make sure it is tap:buffer, and set up socket
+        # to request commits.
+        self.ctlfd = None
+
+        if not disk.uname.startswith('tap:remus:') and not 
disk.uname.startswith('tap:tapdisk:remus:'):
+            raise ReplicatedDiskException('Disk is not replicated: %s' %
+                                        str(disk))
+        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', 
'_')
+        absfifo = os.path.join(self.FIFODIR, fifo)
+        absmsgfifo = absfifo + '.msg'
+
+        self.installed = False
+        self.ctlfd = open(absfifo, 'w+b')
+        self.msgfd = open(absmsgfifo, 'r+b')
+
+    def __del__(self):
+        self.uninstall()
+
+    def setup(self):
+        #self.ctlfd.write('buffer')
+        #self.ctlfd.flush()
+        self.installed = True
+
+    def uninstall(self):
+        if self.ctlfd:
+            self.ctlfd.close()
+            self.ctlfd = None
+
+    def postsuspend(self):
+        if not self.installed:
+            self.setup()
+
+        os.write(self.ctlfd.fileno(), 'flush')
+
+    def commit(self):
+        msg = os.read(self.msgfd.fileno(), 4)
+        if msg != 'done':
+            print 'Unknown message: %s' % msg
+
+class NetbufferException(Exception): pass
+
+class Netbuffer(BufferedDevice):
+    """
+    Buffer a protected domain's network output between rounds so that
+    nothing is issued that a failover might not know about.
+    """
+    # shared rtnetlink handle
+    rth = None
+
+    def __init__(self, domid):
+        self.installed = False
+
+        if not self.rth:
+            self.rth = netlink.rtnl()
+
+        self.devname = self._startimq(domid)
+        dev = self.rth.getlink(self.devname)
+        if not dev:
+            raise NetbufferException('could not find device %s' % self.devname)
+        self.dev = dev['index']
+        self.handle = qdisc.TC_H_ROOT
+        self.q = qdisc.QueueQdisc()
+
+    def __del__(self):
+        self.uninstall()
+
+    def postsuspend(self):
+        if not self.installed:
+            self._setup()
+
+        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
+
+    def commit(self):
+        '''Called when checkpoint has been acknowledged by
+        the backup'''
+        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
+
+    def _sendqmsg(self, action):
+        self.q.action = action
+        req = qdisc.changerequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+
+    def _setup(self):
+        q = self.rth.getqdisc(self.dev)
+        if q:
+            if q['kind'] == 'queue':
+                self.installed = True
+                return
+            if q['kind'] != 'pfifo_fast':
+                raise NetbufferException('there is already a queueing '
+                                         'discipline on %s' % self.devname)
+
+        print 'installing buffer on %s' % self.devname
+        req = qdisc.addrequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+        self.installed = True
+
+    def uninstall(self):
+        if self.installed:
+            req = qdisc.delrequest(self.dev, self.handle)
+            self.rth.talk(req.pack())
+            self.installed = False
+
+    def _startimq(self, domid):
+        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
+        imqebt = '/usr/lib/xen/bin/imqebt'
+        imqdev = 'imq0'
+        vid = 'vif%d.0' % domid
+        for mod in ['sch_queue', 'imq', 'ebt_imq']:
+            util.runcmd(['modprobe', mod])
+        util.runcmd("ip link set %s up" % (imqdev))
+        util.runcmd("ebtables -F FORWARD")
+        util.runcmd("ebtables -A FORWARD -i %s -j imq --todev %s" % (vid, 
imqdev))
+
+        return imqdev
+
+class SignalException(Exception): pass
+
+def run(cfg):
+    closure = lambda: None
+    closure.cmd = None
+
+    def sigexception(signo, frame):
+        raise SignalException(signo)
+
+    def die():
+        # I am not sure what the best way to die is. xm destroy is another 
option,
+        # or we could attempt to trigger some instant reboot.
+        print "dying..."
+        print util.runcmd(['sudo', 'ifdown', 'eth2'])
+        # dangling imq0 handle on vif locks up the system
+        for buf in bufs:
+            buf.uninstall()
+        print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
+        print util.runcmd(['sudo', 'ifup', 'eth2'])
+
+    def getcommand():
+        """Get a command to execute while running.
+        Commands include:
+          s: die prior to postsuspend hook
+          s2: die after postsuspend hook
+          r: die prior to preresume hook
+          r2: die after preresume hook
+          c: die prior to commit hook
+          c2: die after commit hook
+          """
+        r, w, x = select.select([sys.stdin], [], [], 0)
+        if sys.stdin not in r:
+            return
+
+        cmd = sys.stdin.readline().strip()
+        if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
+            print "unknown command: %s" % cmd
+        closure.cmd = cmd
+
+    signal.signal(signal.SIGTERM, sigexception)
+
+    dom = vm.VM(cfg.domid)
+
+    # set up I/O buffers
+    bufs = []
+
+    # disks must commit before network can be released
+    for disk in dom.disks:
+        try:
+            bufs.append(ReplicatedDisk(disk))
+        except ReplicatedDiskException, e:
+            print e
+            continue
+
+    if cfg.netbuffer:
+        for vif in dom.vifs:
+            bufs.append(Netbuffer(dom.domid))
+
+    fd = save.MigrationSocket((cfg.host, cfg.port))
+
+    def postsuspend():
+        'Begin external checkpointing after domain has paused'
+        if not cfg.timer:
+            # when not using a timer thread, sleep until now + interval
+            closure.starttime = time.time()
+
+        if closure.cmd == 's':
+            die()
+
+        for buf in bufs:
+            buf.postsuspend()
+
+        if closure.cmd == 's2':
+            die()
+
+    def preresume():
+        'Complete external checkpointing before domain resumes'
+        if closure.cmd == 'r':
+            die()
+
+        for buf in bufs:
+            buf.preresume()
+
+        if closure.cmd == 'r2':
+            die()
+
+    def commit():
+        'commit network buffer'
+        if closure.cmd == 'c':
+            die()
+
+        print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
+
+        for buf in bufs:
+            buf.commit()
+
+        if closure.cmd == 'c2':
+            die()
+
+        # Since the domain is running at this point, it's a good time to
+        # check for control channel commands
+        getcommand()
+
+        if not cfg.timer:
+            endtime = time.time()
+            elapsed = (endtime - closure.starttime) * 1000
+
+            if elapsed < cfg.interval:
+                time.sleep((cfg.interval - elapsed) / 1000.0)
+
+        # False ends checkpointing
+        return True
+
+    if cfg.timer:
+        interval = cfg.interval
+    else:
+        interval = 0
+
+    rc = 0
+
+    checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
+                              interval)
+
+    try:
+        checkpointer.start()
+    except save.CheckpointError, e:
+        print e
+        rc = 1
+    except KeyboardInterrupt:
+        pass
+    except SignalException:
+        print '*** signalled ***'
+
+    for buf in bufs:
+        buf.uninstall()
+
+    if cfg.nobackup:
+        # lame attempt to kill backup if protection is stopped deliberately.
+        # It would be much better to move this into the heartbeat "protocol".
+        print util.runcmd(['sudo', '-u', os.getlogin(), 'ssh', cfg.host, 
'sudo', 'xm', 'destroy', dom.name])
+
+    sys.exit(rc)
+
+cfg = Cfg()
+try:
+    cfg.getargs()
+except CfgException, inst:
+    print str(inst)
+    cfg.usage()
+    sys.exit(1)
+
+try:
+    run(cfg)
+except vm.VMException, inst:
+    print str(inst)
+    sys.exit(1)

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.