# HG changeset patch # User Brendan Cully # Date 1272694699 25200 # Node ID 35a9de6bca43769acf76cbd3a4a1f8c4f245da4b # Parent f0786e8c7fd7b56f0c5712649fe52870d84488de Remus: move device handling into its own module Signed-off-by: Brendan Cully diff --git a/tools/remus/remus b/tools/python/xen/remus/device.py copy from tools/remus/remus copy to tools/python/xen/remus/device.py --- a/tools/remus/remus +++ b/tools/python/xen/remus/device.py @@ -1,66 +1,12 @@ -#!/usr/bin/env python +# Remus device interface # -# This is a save process which also buffers outgoing I/O between -# rounds, so that external viewers never see anything that hasn't -# been committed at the backup -# -# TODO: fencing. +# Coordinates with devices at suspend, resume, and commit hooks -import optparse, os, re, select, signal, sys, time -from xen.remus import save, vm -from xen.xend import XendOptions -from xen.remus import netlink, qdisc, util +import os -class CfgException(Exception): pass +import netlink, qdisc, util -class Cfg(object): - def __init__(self): - # must be set - self.domid = 0 - - self.host = 'localhost' - self.port = XendOptions.instance().get_xend_relocation_port() - self.interval = 200 - self.netbuffer = True - self.timer = False - - parser = optparse.OptionParser() - parser.usage = '%prog [options] domain [destination]' - parser.add_option('-i', '--interval', dest='interval', type='int', - metavar='MS', - help='checkpoint every MS milliseconds') - parser.add_option('-p', '--port', dest='port', type='int', - help='send stream to port PORT', metavar='PORT') - parser.add_option('', '--no-net', dest='nonet', action='store_true', - help='run without net buffering (benchmark option)') - parser.add_option('', '--timer', dest='timer', action='store_true', - help='force pause at checkpoint interval (experimental)') - self.parser = parser - - def usage(self): - self.parser.print_help() - - def getargs(self): - opts, args = self.parser.parse_args() - - if opts.interval: - self.interval = opts.interval - if opts.port: - self.port = opts.port - if opts.nonet: - self.netbuffer = False - if opts.timer: - self.timer = True - - if not args: - raise CfgException('Missing domain') - self.domid = args[0] - if (len(args) > 1): - self.host = args[1] - -class ReplicatedDiskException(Exception): pass - -class BufferedDevice(object): +class CheckpointedDevice(object): 'Base class for buffered devices' def postsuspend(self): @@ -75,7 +21,9 @@ 'called when backup has acknowledged checkpoint reception' pass -class ReplicatedDisk(BufferedDevice): +class ReplicatedDiskException(Exception): pass + +class ReplicatedDisk(CheckpointedDevice): """ Send a checkpoint message to a replicated disk while the domain is paused between epochs. @@ -114,9 +62,9 @@ if msg != 'done': print 'Unknown message: %s' % msg -class NetbufferException(Exception): pass +class BufferedNICException(Exception): pass -class Netbuffer(BufferedDevice): +class BufferedNIC(CheckpointedDevice): """ Buffer a protected domain's network output between rounds so that nothing is issued that a failover might not know about. @@ -133,7 +81,7 @@ self.devname = self._startimq(domid) dev = self.rth.getlink(self.devname) if not dev: - raise NetbufferException('could not find device %s' % self.devname) + raise BufferedNICException('could not find device %s' % self.devname) self.dev = dev['index'] self.handle = qdisc.TC_H_ROOT self.q = qdisc.QueueQdisc() @@ -164,8 +112,8 @@ self.installed = True return if q['kind'] != 'pfifo_fast': - raise NetbufferException('there is already a queueing ' - 'discipline on %s' % self.devname) + raise BufferedNICException('there is already a queueing ' + 'discipline on %s' % self.devname) print 'installing buffer on %s' % self.devname req = qdisc.addrequest(self.dev, self.handle, self.q) @@ -190,155 +138,3 @@ util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev)) return imqdev - -class SignalException(Exception): pass - -def run(cfg): - closure = lambda: None - closure.cmd = None - - def sigexception(signo, frame): - raise SignalException(signo) - - def die(): - # I am not sure what the best way to die is. xm destroy is another option, - # or we could attempt to trigger some instant reboot. - print "dying..." - print util.runcmd(['sudo', 'ifdown', 'eth2']) - # dangling imq0 handle on vif locks up the system - for buf in bufs: - buf.uninstall() - print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid]) - print util.runcmd(['sudo', 'ifup', 'eth2']) - - def getcommand(): - """Get a command to execute while running. - Commands include: - s: die prior to postsuspend hook - s2: die after postsuspend hook - r: die prior to preresume hook - r2: die after preresume hook - c: die prior to commit hook - c2: die after commit hook - """ - r, w, x = select.select([sys.stdin], [], [], 0) - if sys.stdin not in r: - return - - cmd = sys.stdin.readline().strip() - if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'): - print "unknown command: %s" % cmd - closure.cmd = cmd - - signal.signal(signal.SIGTERM, sigexception) - - dom = vm.VM(cfg.domid) - - # set up I/O buffers - bufs = [] - - # disks must commit before network can be released - for disk in dom.disks: - try: - bufs.append(ReplicatedDisk(disk)) - except ReplicatedDiskException, e: - print e - continue - - if cfg.netbuffer: - for vif in dom.vifs: - bufs.append(Netbuffer(dom.domid)) - - fd = save.MigrationSocket((cfg.host, cfg.port)) - - def postsuspend(): - 'Begin external checkpointing after domain has paused' - if not cfg.timer: - # when not using a timer thread, sleep until now + interval - closure.starttime = time.time() - - if closure.cmd == 's': - die() - - for buf in bufs: - buf.postsuspend() - - if closure.cmd == 's2': - die() - - def preresume(): - 'Complete external checkpointing before domain resumes' - if closure.cmd == 'r': - die() - - for buf in bufs: - buf.preresume() - - if closure.cmd == 'r2': - die() - - def commit(): - 'commit network buffer' - if closure.cmd == 'c': - die() - - print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time()) - - for buf in bufs: - buf.commit() - - if closure.cmd == 'c2': - die() - - # Since the domain is running at this point, it's a good time to - # check for control channel commands - getcommand() - - if not cfg.timer: - endtime = time.time() - elapsed = (endtime - closure.starttime) * 1000 - - if elapsed < cfg.interval: - time.sleep((cfg.interval - elapsed) / 1000.0) - - # False ends checkpointing - return True - - if cfg.timer: - interval = cfg.interval - else: - interval = 0 - - rc = 0 - - checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit, - interval) - - try: - checkpointer.start() - except save.CheckpointError, e: - print e - rc = 1 - except KeyboardInterrupt: - pass - except SignalException: - print '*** signalled ***' - - for buf in bufs: - buf.uninstall() - - sys.exit(rc) - -cfg = Cfg() -try: - cfg.getargs() -except CfgException, inst: - print str(inst) - cfg.usage() - sys.exit(1) - -try: - run(cfg) -except vm.VMException, inst: - print str(inst) - sys.exit(1) diff --git a/tools/remus/remus b/tools/remus/remus --- a/tools/remus/remus +++ b/tools/remus/remus @@ -7,9 +7,10 @@ # TODO: fencing. import optparse, os, re, select, signal, sys, time -from xen.remus import save, vm + +from xen.remus import save, util, vm +from xen.remus.device import ReplicatedDisk, BufferedNIC from xen.xend import XendOptions -from xen.remus import netlink, qdisc, util class CfgException(Exception): pass @@ -58,139 +59,6 @@ if (len(args) > 1): self.host = args[1] -class ReplicatedDiskException(Exception): pass - -class BufferedDevice(object): - 'Base class for buffered devices' - - def postsuspend(self): - 'called after guest has suspended' - pass - - def preresume(self): - 'called before guest resumes' - pass - - def commit(self): - 'called when backup has acknowledged checkpoint reception' - pass - -class ReplicatedDisk(BufferedDevice): - """ - Send a checkpoint message to a replicated disk while the domain - is paused between epochs. - """ - FIFODIR = '/var/run/tap' - - def __init__(self, disk): - # look up disk, make sure it is tap:buffer, and set up socket - # to request commits. - self.ctlfd = None - - if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'): - raise ReplicatedDiskException('Disk is not replicated: %s' % - str(disk)) - fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_') - absfifo = os.path.join(self.FIFODIR, fifo) - absmsgfifo = absfifo + '.msg' - - self.installed = False - self.ctlfd = open(absfifo, 'w+b') - self.msgfd = open(absmsgfifo, 'r+b') - - def __del__(self): - self.uninstall() - - def uninstall(self): - if self.ctlfd: - self.ctlfd.close() - self.ctlfd = None - - def postsuspend(self): - os.write(self.ctlfd.fileno(), 'flush') - - def commit(self): - msg = os.read(self.msgfd.fileno(), 4) - if msg != 'done': - print 'Unknown message: %s' % msg - -class NetbufferException(Exception): pass - -class Netbuffer(BufferedDevice): - """ - Buffer a protected domain's network output between rounds so that - nothing is issued that a failover might not know about. - """ - # shared rtnetlink handle - rth = None - - def __init__(self, domid): - self.installed = False - - if not self.rth: - self.rth = netlink.rtnl() - - self.devname = self._startimq(domid) - dev = self.rth.getlink(self.devname) - if not dev: - raise NetbufferException('could not find device %s' % self.devname) - self.dev = dev['index'] - self.handle = qdisc.TC_H_ROOT - self.q = qdisc.QueueQdisc() - - def __del__(self): - self.uninstall() - - def postsuspend(self): - if not self.installed: - self._setup() - - self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT) - - def commit(self): - '''Called when checkpoint has been acknowledged by - the backup''' - self._sendqmsg(qdisc.TC_QUEUE_RELEASE) - - def _sendqmsg(self, action): - self.q.action = action - req = qdisc.changerequest(self.dev, self.handle, self.q) - self.rth.talk(req.pack()) - - def _setup(self): - q = self.rth.getqdisc(self.dev) - if q: - if q['kind'] == 'queue': - self.installed = True - return - if q['kind'] != 'pfifo_fast': - raise NetbufferException('there is already a queueing ' - 'discipline on %s' % self.devname) - - print 'installing buffer on %s' % self.devname - req = qdisc.addrequest(self.dev, self.handle, self.q) - self.rth.talk(req.pack()) - self.installed = True - - def uninstall(self): - if self.installed: - req = qdisc.delrequest(self.dev, self.handle) - self.rth.talk(req.pack()) - self.installed = False - - def _startimq(self, domid): - # stopgap hack to set up IMQ for an interface. Wrong in many ways. - imqebt = '/usr/lib/xen/bin/imqebt' - imqdev = 'imq0' - vid = 'vif%d.0' % domid - for mod in ['sch_queue', 'imq', 'ebt_imq']: - util.runcmd(['modprobe', mod]) - util.runcmd("ip link set %s up" % (imqdev)) - util.runcmd("%s -F FORWARD" % (imqebt)) - util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev)) - - return imqdev - class SignalException(Exception): pass def run(cfg):