ChangeSet 1.1327.2.3, 2005/04/21 15:11:29+01:00, mjw@xxxxxxxxxxxxxxxxxxx
Add some locking to console handling.
Remove a dead file.
Signed-off-by: Mike Wray <mike.wray@xxxxxx>
b/tools/python/xen/xend/server/console.py | 202 ++++++++++++++++++------------
tools/python/xen/xend/EventTypes.py | 34 -----
2 files changed, 126 insertions(+), 110 deletions(-)
diff -Nru a/tools/python/xen/xend/EventTypes.py
b/tools/python/xen/xend/EventTypes.py
--- a/tools/python/xen/xend/EventTypes.py 2005-05-13 16:03:44 -04:00
+++ /dev/null Wed Dec 31 16:00:00 196900
@@ -1,34 +0,0 @@
-# Copyright (C) 2004 Mike Wray <mike.wray@xxxxxx>
-
-## XEND_DOMAIN_CREATE = "xend.domain.create": dom
-## create:
-## xend.domain.destroy: dom, reason:died/crashed
-## xend.domain.up ?
-
-## xend.domain.unpause: dom
-## xend.domain.pause: dom
-## xend.domain.shutdown: dom
-## xend.domain.destroy: dom
-
-## xend.domain.migrate.begin: dom, to
-## Begin tells: src host, src domain uri, dst host. Dst id known?
-## err: src host, src domain uri, dst host, dst id if known, status (of
domain: ok, dead,...), reason
-## end: src host, src domain uri, dst host, dst uri
-
-## Events for both ends of migrate: for exporter and importer?
-## Include migrate id so can tie together.
-## Have uri /xend/migrate/<id> for migrate info (migrations in progress).
-
-## (xend.domain.migrate.begin (src <host>) (src.domain <id>)
-## (dst <host>) (id <migrate id>))
-
-## xend.domain.migrate.end:
-## (xend.domain.migrate.end (domain <id>) (to <host>)
-
-## xend.node.up: xend uri
-## xend.node.down: xend uri
-
-## xend.error ?
-
-## format:
-
diff -Nru a/tools/python/xen/xend/server/console.py
b/tools/python/xen/xend/server/console.py
--- a/tools/python/xen/xend/server/console.py 2005-05-13 16:03:44 -04:00
+++ b/tools/python/xen/xend/server/console.py 2005-05-13 16:03:44 -04:00
@@ -1,6 +1,7 @@
# Copyright (C) 2004 Mike Wray <mike.wray@xxxxxx>
import socket
+import threading
from xen.web import reactor, protocol
@@ -86,6 +87,7 @@
def __init__(self, controller, id, config, recreate=False):
Dev.__init__(self, controller, id, config)
+ self.lock = threading.RLock()
self.status = self.STATUS_NEW
self.addr = None
self.conn = None
@@ -107,9 +109,13 @@
[self.id, self.getDomain(), self.console_port])
def init(self, recreate=False, reboot=False):
- self.destroyed = False
- self.channel = self.getChannel()
- self.listen()
+ try:
+ self.lock.acquire()
+ self.destroyed = False
+ self.channel = self.getChannel()
+ self.listen()
+ finally:
+ self.lock.release()
def checkConsolePort(self, console_port):
"""Check that a console port is not in use by another console.
@@ -121,29 +127,41 @@
ctrl.checkConsolePort(console_port)
def sxpr(self):
- val = ['console',
- ['status', self.status ],
- ['id', self.id ],
- ['domain', self.getDomain() ] ]
- val.append(['local_port', self.getLocalPort() ])
- val.append(['remote_port', self.getRemotePort() ])
- val.append(['console_port', self.console_port ])
- val.append(['index', self.getIndex()])
- if self.addr:
- val.append(['connected', self.addr[0], self.addr[1]])
+ try:
+ self.lock.acquire()
+ val = ['console',
+ ['status', self.status ],
+ ['id', self.id ],
+ ['domain', self.getDomain() ] ]
+ val.append(['local_port', self.getLocalPort() ])
+ val.append(['remote_port', self.getRemotePort() ])
+ val.append(['console_port', self.console_port ])
+ val.append(['index', self.getIndex()])
+ if self.addr:
+ val.append(['connected', self.addr[0], self.addr[1]])
+ finally:
+ self.lock.release()
return val
def getLocalPort(self):
- if self.channel:
- return self.channel.getLocalPort()
- else:
- return 0
+ try:
+ self.lock.acquire()
+ if self.channel:
+ return self.channel.getLocalPort()
+ else:
+ return 0
+ finally:
+ self.lock.release()
def getRemotePort(self):
- if self.channel:
- return self.channel.getRemotePort()
- else:
- return 0
+ try:
+ self.lock.acquire()
+ if self.channel:
+ return self.channel.getRemotePort()
+ else:
+ return 0
+ finally:
+ self.lock.release()
def uri(self):
"""Get the uri to use to connect to the console.
@@ -166,23 +184,31 @@
print 'ConsoleDev>destroy>', self, reboot
if reboot:
return
- self.status = self.STATUS_CLOSED
- if self.conn:
- self.conn.loseConnection()
- self.listener.stopListening()
+ try:
+ self.lock.acquire()
+ self.status = self.STATUS_CLOSED
+ if self.conn:
+ self.conn.loseConnection()
+ self.listener.stopListening()
+ finally:
+ self.lock.release()
def listen(self):
"""Listen for TCP connections to the console port..
"""
- if self.closed():
- return
- if self.listener:
- pass
- else:
- self.status = self.STATUS_LISTENING
- cf = ConsoleFactory(self, self.id)
- interface = xroot.get_console_address()
- self.listener = reactor.listenTCP(self.console_port, cf,
interface=interface)
+ try:
+ self.lock.acquire()
+ if self.closed():
+ return
+ if self.listener:
+ pass
+ else:
+ self.status = self.STATUS_LISTENING
+ cf = ConsoleFactory(self, self.id)
+ interface = xroot.get_console_address()
+ self.listener = reactor.listenTCP(self.console_port, cf,
interface=interface)
+ finally:
+ self.lock.release()
def connect(self, addr, conn):
"""Connect a TCP connection to the console.
@@ -193,27 +219,35 @@
returns 0 if ok, negative otherwise
"""
- if self.closed():
- return -1
- if self.connected():
- return -1
- self.addr = addr
- self.conn = conn
- self.status = self.STATUS_CONNECTED
- self.writeOutput()
+ try:
+ self.lock.acquire()
+ if self.closed():
+ return -1
+ if self.connected():
+ return -1
+ self.addr = addr
+ self.conn = conn
+ self.status = self.STATUS_CONNECTED
+ self.writeOutput()
+ finally:
+ self.lock.release()
return 0
def disconnect(self, conn=None):
"""Disconnect the TCP connection to the console.
"""
print 'ConsoleDev>disconnect>', conn
- if conn and conn != self.conn: return
- if self.conn:
- self.conn.loseConnection()
- self.addr = None
- self.conn = None
- self.status = self.STATUS_LISTENING
- self.listen()
+ try:
+ self.lock.acquire()
+ if conn and conn != self.conn: return
+ if self.conn:
+ self.conn.loseConnection()
+ self.addr = None
+ self.conn = None
+ self.status = self.STATUS_LISTENING
+ self.listen()
+ finally:
+ self.lock.release()
def receiveOutput(self, msg):
"""Receive output console data from the console channel.
@@ -223,30 +257,38 @@
subtype minor message typ
"""
# Treat the obuf as a ring buffer.
- data = msg.get_payload()
- data_n = len(data)
- if self.obuf.space() < data_n:
- self.obuf.discard(data_n)
- if self.obuf.space() < data_n:
- data = data[-self.obuf.space():]
- self.obuf.write(data)
- self.writeOutput()
+ try:
+ self.lock.acquire()
+ data = msg.get_payload()
+ data_n = len(data)
+ if self.obuf.space() < data_n:
+ self.obuf.discard(data_n)
+ if self.obuf.space() < data_n:
+ data = data[-self.obuf.space():]
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog
|