Changeset 125
- Timestamp:
- 02/20/08 21:38:39 (11 months ago)
- Files:
-
- projects/AsynCluster/trunk/asyncluster/master/control.py (modified) (4 diffs)
- projects/AsynCluster/trunk/asyncluster/master/nodes.py (modified) (1 diff)
- projects/AsynCluster/trunk/asyncluster/ndm/client.py (modified) (5 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/gui.py (modified) (2 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/node.py (modified) (6 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/test/test_client.py (modified) (5 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/worker.py (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/control.py
r124 r125 250 250 the node identified by the integer I{nodeID}, supplying any provided 251 251 arguments or keywords. 252 253 Returns a deferred that fires with the result of the remote call. 252 254 """ 253 255 if nodeID not in self.nodes: … … 264 266 the user identified by the string I{userID} has an active session, 265 267 supplying any provided arguments or keywords. 268 269 Returns a deferred that fires with the result of the remote call. 266 270 """ 267 271 for ID, nodeStuff in self.nodes.iteritems(): … … 273 277 return defer.fail(Failure( 274 278 RemoteCallError("Invalid user '%s'" % userID))) 279 280 def allRemote(self, called, *args, **kw): 281 """ 282 Runs a remote call to the object specified by the string I{called} on 283 all connected nodes, supplying any provided arguments or keywords. 284 285 Returns a deferred that fires with a list of results of the remote 286 calls, in no particular node order. 287 """ 288 for ID, nodeStuff in self.nodes.iteritems(): 289 d = nodeRoot.callRemote(called, *args, **kw) 290 d.addErrback(self._remoteError, ID) 291 dList.append(d) 292 return defer.gatherResults(dList) 275 293 276 294 … … 381 399 def remote_cancelJob(self, jobID): 382 400 """ 401 Cancels the job specified by I{jobID}. 383 402 """ 384 403 return self.ctl.jobber.cancel(jobID) 404 405 def remote_resetup(self, sourcePath): 406 """ 407 Instructs each connected node to do a new python setup in the specified 408 I{sourcePath} and respawn its coeterie of child worker process. 409 410 Terminates upon encountering any problems with the setup runs. 411 412 Returns a deferred that fires with C{True} if all went well, C{False} 413 if there were any problems. 414 """ 415 def next(successCodes): 416 for success in successCodes: 417 if not success: 418 return False 419 d = self.ctl.allRemote('spawnWorkers', True) 420 d.addCallbacks(lambda _: True, lambda _: False) 421 return d 422 423 d = self.ctl.allRemote('bash', "cd %s; python setup.py install") 424 return d.addCallback(next) 425 426 projects/AsynCluster/trunk/asyncluster/master/nodes.py
r124 r125 72 72 """ 73 73 def responded(acceptanceCode): 74 print "RESPONDED", acceptanceCode75 74 if acceptanceCode == 'node': 76 75 self.nodeClient = True projects/AsynCluster/trunk/asyncluster/ndm/client.py
r124 r125 28 28 """ 29 29 30 import os, signal 30 import os, signal, sys 31 31 from twisted.internet import defer, threads, reactor 32 32 from twisted.cred import credentials … … 36 36 37 37 PYTHON="/usr/bin/python" 38 CONFIG_PATH = "/etc/asyncluster.conf"39 38 40 39 … … 60 59 61 60 62 class ChildManager(object):63 """64 I manage child worker clients. Construct one instance of me per child65 worker process.66 67 @ivar config: A L{configobj} config object loaded from the config file.68 69 """70 def __init__(self):71 # The config object72 import configobj73 self.config = configobj.ConfigObj(CONFIG_PATH)74 # The session-less client75 self.client = Client(self)76 # Go!77 reactor.callWhenRunning(self.client.connect)78 reactor.run()79 80 def shutdown(self):81 d = self.client.disconnect()82 d.addCallback(lambda _: reactor.stop())83 return d84 85 86 61 class ChildRoot(jobs.ChildRoot): 87 62 """ … … 112 87 clients and managing user sessions. 113 88 """ 114 workerCmd = "from asyncluster.ndm import client; client.ChildManager()"89 workerCmd = "from asyncluster.ndm import worker; worker.run()" 115 90 116 91 def __init__(self, serverPassword, main): … … 209 184 for k in xrange(N): 210 185 os.spawnl(os.P_NOWAIT, PYTHON, PYTHON, "-c", self.workerCmd) 211 186 212 187 213 188 class ClientFactory(pb.PBClientFactory): 214 189 """ 215 I am a client factory that raises a L{ConnectionError} if I fail to connect 216 to the AsynCluster server. 217 """ 218 def clientConnectionFailed(self, connector, reason): 219 raise ConnectionError("Couldn't connect to server") 220 221 def connectionLost(self): 190 I am a client factory that terminates my Python process upon disconnection 191 from the AsynCluster server. 192 """ 193 def clientConnectionLost(self, *args, **kw): 222 194 """ 223 195 Called to terminate my process upon loss of connection to the PB server. 224 196 """ 225 reactor.stop() 226 197 print "Connection lost to server!" 198 pb.PBClientFactory.clientConnectionLost(self, *args, **kw) 199 try: 200 reactor.stop() 201 except: 202 pass 203 227 204 228 205 class Client(object): projects/AsynCluster/trunk/asyncluster/ndm/gui.py
r124 r125 25 25 26 26 """ 27 The main module of the NDM application. Installs a PyQt4 QApplication() object 28 into Twisted's qtreactor(). 27 GUI operation of non-headless NDM application. 28 29 Installs a PyQt4 QApplication() object into Twisted's qtreactor(). 29 30 """ 30 31 31 32 # Start PyQt4 with Twisted integration 32 import os, pwd 33 from twisted_goodies.qtwisted import qt4reactor 34 from PyQt4.QtGui import QApplication 35 app = QApplication([]) 36 qt4reactor.install(app) 37 38 # Now the regular imports 39 import os 40 from twisted.internet import defer, reactor, protocol 33 41 from PyQt4 import QtCore, QtGui 34 42 35 # Other dependency imports36 from twisted.internet import defer, reactor, protocol37 43 from asyncluster import util 38 44 … … 57 63 # Fixed Size and centered (initial) position 58 64 size = [int(x) for x in self.main.config['display']['size']] 59 center = [getattr( self.main.desktop.size(), x)()/265 center = [getattr(app.desktop().size(), x)()/2 60 66 for x in ('width', 'height')] 61 67 rect = QtCore.QRect() projects/AsynCluster/trunk/asyncluster/ndm/node.py
r124 r125 25 25 26 26 """ 27 The main module of the NDM application. Installs a PyQt4 QApplication() object28 into Twisted's qtreactor(). 27 The main module for node workers. 28 29 29 """ 30 31 # Start PyQt4 with Twisted integration32 from twisted_goodies.qtwisted import qt4reactor33 from PyQt4.QtGui import QApplication34 app = QApplication([])35 qt4reactor.install(app)36 37 # Now the regular imports38 import os39 from twisted.internet import defer, reactor40 41 import configobj, client, gui42 30 43 31 … … 47 35 class Manager(object): 48 36 """ 49 I am a manager for a node clients. 37 I manage a node client. Instantiate me with I{headless} set C{True} if 38 there will be no GUI for user logins or display management. 50 39 51 @ cvar config: A L{configobj} config object loaded from the config file.40 @ivar config: A L{configobj} config object loaded from the config file. 52 41 53 42 """ 54 def __init__(self): 43 def __init__(self, headless=False): 44 import configobj 55 45 self.config = configobj.ConfigObj(CONFIG_PATH) 56 self.desktop = app.desktop() 46 if headless: 47 self.gui = None 48 else: 49 import gui 50 self.gui = gui 51 # Regular reactor import comes after possible Qt reactor integration in 52 # gui module 53 from twisted.internet import reactor 57 54 reactor.callWhenRunning(self.startup) 58 55 reactor.run() … … 64 61 def gotSessionMgr(sessionMgr): 65 62 self.sessionMgr = sessionMgr 66 self.loginWindow = gui.LoginWindow(self) 63 if self.gui: 64 self.loginWindow = self.gui.LoginWindow(self) 67 65 66 import client 68 67 self.client = client.Client(self, session=True) 69 68 d = self.client.connect() 70 69 d.addCallback(lambda p: p.callRemote('getSessionManager')) 71 70 d.addCallback(gotSessionMgr) 72 return d73 74 def shutdown(self):75 d = self.client.disconnect()76 d.addCallback(lambda _: reactor.stop())77 71 return d 78 72 … … 86 80 if hasattr(self, 'loginWindow'): 87 81 self.loginWindow.hide() 88 self.sessionWindow = gui.SessionWindow(self, user)82 self.sessionWindow = self.gui.SessionWindow(self, user) 89 83 self.sessionWindow.show() 90 84 self.activeUser = user … … 114 108 new session. 115 109 """ 116 self.loginWindow.show() 117 self.loginWindow.repaint() 110 if hasattr(self, 'loginWindow'): 111 self.loginWindow.show() 112 self.loginWindow.repaint() 118 113 if hasattr(self, 'sessionWindow'): 119 114 self.sessionWindow.wmStop() … … 123 118 if callServer: 124 119 return self.sessionMgr.callRemote('end') 120 from twisted.internet import defer 125 121 return defer.succeed(None) 126 122 127 123 128 if __name__ == "__main__":124 def run(): 129 125 Manager() 126 127 def runHeadless(): 128 Manager(headless=True) projects/AsynCluster/trunk/asyncluster/ndm/test/test_client.py
r124 r125 35 35 36 36 37 def deferToDelay(delay=0. 3):37 def deferToDelay(delay=0.4): 38 38 d = defer.Deferred() 39 39 reactor.callLater(delay, d.callback, None) … … 85 85 def test_pids(self): 86 86 def next(null): 87 pidsAfter = [x for x in self.root._pids() if x not in pidsBefore] 88 self.failUnlessEqual(pidsAfter, [pid]) 89 os.kill(pid, signal.SIGHUP) 87 pidsNew = [x for x in self.root._pids() if x not in pidsBefore] 88 self.failUnlessEqual(len(pidsNew), 1) 89 pidNew = pidsNew[0] 90 self.failUnless(pidNew > pid) 91 os.kill(pidNew, signal.SIGHUP) 90 92 91 93 pidsBefore = self.root._pids() … … 94 96 client.PYTHON, client.PYTHON, "-c", self.root.workerCmd) 95 97 return deferToDelay().addCallback(next) 98 99 def _killWorkers(self): 100 for pid in self.root._pids(): 101 os.kill(pid, signal.SIGHUP) 96 102 97 103 def test_spawnWorkers_normal(self): … … 107 113 self.root.remote_spawnWorkers() 108 114 self.failUnlessEqual(pids, self.root._pids()) 109 110 for pid in self.root._pids(): 111 os.kill(pid, signal.SIGHUP) 115 # Kill the spawned worker process(es) 116 self._killWorkers() 117 118 self._killWorkers() 112 119 return deferToDelay().addCallback(first) 113 120 … … 121 128 for pid in self.root._pids(): 122 129 self.failIf(pid in pidsA) 123 130 self._killWorkers() 131 124 132 # The worker(s) after the first spawn 125 133 self.root.remote_spawnWorkers()
