Changeset 125

Show
Ignore:
Timestamp:
02/20/08 21:38:39 (11 months ago)
Author:
edsuom
Message:

Node/worker clients nearing completion

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/asyncluster/master/control.py

    r124 r125  
    250250        the node identified by the integer I{nodeID}, supplying any provided 
    251251        arguments or keywords. 
     252 
     253        Returns a deferred that fires with the result of the remote call. 
    252254        """ 
    253255        if nodeID not in self.nodes: 
     
    264266        the user identified by the string I{userID} has an active session, 
    265267        supplying any provided arguments or keywords. 
     268 
     269        Returns a deferred that fires with the result of the remote call. 
    266270        """ 
    267271        for ID, nodeStuff in self.nodes.iteritems(): 
     
    273277        return defer.fail(Failure( 
    274278            RemoteCallError("Invalid user '%s'" % userID))) 
     279 
     280    def allRemote(self, called, *args, **kw): 
     281        """ 
     282        Runs a remote call to the object specified by the string I{called} on 
     283        all connected nodes, supplying any provided arguments or keywords. 
     284 
     285        Returns a deferred that fires with a list of results of the remote 
     286        calls, in no particular node order. 
     287        """ 
     288        for ID, nodeStuff in self.nodes.iteritems(): 
     289            d = nodeRoot.callRemote(called, *args, **kw) 
     290            d.addErrback(self._remoteError, ID) 
     291            dList.append(d) 
     292        return defer.gatherResults(dList) 
    275293 
    276294 
     
    381399    def remote_cancelJob(self, jobID): 
    382400        """ 
     401        Cancels the job specified by I{jobID}. 
    383402        """ 
    384403        return self.ctl.jobber.cancel(jobID) 
     404 
     405    def remote_resetup(self, sourcePath): 
     406        """ 
     407        Instructs each connected node to do a new python setup in the specified 
     408        I{sourcePath} and respawn its coeterie of child worker process. 
     409 
     410        Terminates upon encountering any problems with the setup runs. 
     411 
     412        Returns a deferred that fires with C{True} if all went well, C{False} 
     413        if there were any problems. 
     414        """ 
     415        def next(successCodes): 
     416            for success in successCodes: 
     417                if not success: 
     418                    return False 
     419            d = self.ctl.allRemote('spawnWorkers', True) 
     420            d.addCallbacks(lambda _: True, lambda _: False) 
     421            return d 
     422         
     423        d = self.ctl.allRemote('bash', "cd %s; python setup.py install") 
     424        return d.addCallback(next) 
     425         
     426         
  • projects/AsynCluster/trunk/asyncluster/master/nodes.py

    r124 r125  
    7272        """ 
    7373        def responded(acceptanceCode): 
    74             print "RESPONDED", acceptanceCode 
    7574            if acceptanceCode == 'node': 
    7675                self.nodeClient = True 
  • projects/AsynCluster/trunk/asyncluster/ndm/client.py

    r124 r125  
    2828""" 
    2929 
    30 import os, signal 
     30import os, signal, sys 
    3131from twisted.internet import defer, threads, reactor 
    3232from twisted.cred import credentials 
     
    3636 
    3737PYTHON="/usr/bin/python" 
    38 CONFIG_PATH = "/etc/asyncluster.conf" 
    3938 
    4039 
     
    6059 
    6160 
    62 class ChildManager(object): 
    63     """ 
    64     I manage child worker clients. Construct one instance of me per child 
    65     worker process. 
    66  
    67     @ivar config: A L{configobj} config object loaded from the config file. 
    68      
    69     """ 
    70     def __init__(self): 
    71         # The config object 
    72         import configobj 
    73         self.config = configobj.ConfigObj(CONFIG_PATH) 
    74         # The session-less client 
    75         self.client = Client(self) 
    76         # Go! 
    77         reactor.callWhenRunning(self.client.connect) 
    78         reactor.run() 
    79  
    80     def shutdown(self): 
    81         d = self.client.disconnect() 
    82         d.addCallback(lambda _: reactor.stop()) 
    83         return d 
    84  
    85  
    8661class ChildRoot(jobs.ChildRoot): 
    8762    """ 
     
    11287    clients and managing user sessions. 
    11388    """ 
    114     workerCmd = "from asyncluster.ndm import client; client.ChildManager()" 
     89    workerCmd = "from asyncluster.ndm import worker; worker.run()" 
    11590     
    11691    def __init__(self, serverPassword, main): 
     
    209184        for k in xrange(N): 
    210185            os.spawnl(os.P_NOWAIT, PYTHON, PYTHON, "-c", self.workerCmd) 
    211      
     186 
    212187 
    213188class ClientFactory(pb.PBClientFactory): 
    214189    """ 
    215     I am a client factory that raises a L{ConnectionError} if I fail to connect 
    216     to the AsynCluster server. 
    217     """ 
    218     def clientConnectionFailed(self, connector, reason): 
    219         raise ConnectionError("Couldn't connect to server") 
    220  
    221     def connectionLost(self): 
     190    I am a client factory that terminates my Python process upon disconnection 
     191    from the AsynCluster server. 
     192    """ 
     193    def clientConnectionLost(self, *args, **kw): 
    222194        """ 
    223195        Called to terminate my process upon loss of connection to the PB server. 
    224196        """ 
    225         reactor.stop() 
    226          
     197        print "Connection lost to server!" 
     198        pb.PBClientFactory.clientConnectionLost(self, *args, **kw) 
     199        try: 
     200            reactor.stop() 
     201        except: 
     202            pass 
     203 
    227204 
    228205class Client(object): 
  • projects/AsynCluster/trunk/asyncluster/ndm/gui.py

    r124 r125  
    2525 
    2626""" 
    27 The main module of the NDM application. Installs a PyQt4 QApplication() object 
    28 into Twisted's qtreactor(). 
     27GUI operation of non-headless NDM application. 
     28 
     29Installs a PyQt4 QApplication() object into Twisted's qtreactor(). 
    2930""" 
    3031 
    3132# Start PyQt4 with Twisted integration 
    32 import os, pwd 
     33from twisted_goodies.qtwisted import qt4reactor 
     34from PyQt4.QtGui import QApplication 
     35app = QApplication([]) 
     36qt4reactor.install(app) 
     37 
     38# Now the regular imports 
     39import os 
     40from twisted.internet import defer, reactor, protocol 
    3341from PyQt4 import QtCore, QtGui 
    3442 
    35 # Other dependency imports 
    36 from twisted.internet import defer, reactor, protocol 
    3743from asyncluster import util 
    3844 
     
    5763        # Fixed Size and centered (initial) position 
    5864        size = [int(x) for x in self.main.config['display']['size']] 
    59         center = [getattr(self.main.desktop.size(), x)()/2 
     65        center = [getattr(app.desktop().size(), x)()/2 
    6066                  for x in ('width', 'height')] 
    6167        rect = QtCore.QRect() 
  • projects/AsynCluster/trunk/asyncluster/ndm/node.py

    r124 r125  
    2525 
    2626""" 
    27 The main module of the NDM application. Installs a PyQt4 QApplication() object 
    28 into Twisted's qtreactor(). 
     27The main module for node workers. 
     28 
    2929""" 
    30  
    31 # Start PyQt4 with Twisted integration 
    32 from twisted_goodies.qtwisted import qt4reactor 
    33 from PyQt4.QtGui import QApplication 
    34 app = QApplication([]) 
    35 qt4reactor.install(app) 
    36  
    37 # Now the regular imports 
    38 import os 
    39 from twisted.internet import defer, reactor 
    40  
    41 import configobj, client, gui 
    4230 
    4331 
     
    4735class Manager(object): 
    4836    """ 
    49     I am a manager for a node clients. 
     37    I manage a node client. Instantiate me with I{headless} set C{True} if 
     38    there will be no GUI for user logins or display management. 
    5039     
    51     @cvar config: A L{configobj} config object loaded from the config file. 
     40    @ivar config: A L{configobj} config object loaded from the config file. 
    5241     
    5342    """ 
    54     def __init__(self): 
     43    def __init__(self, headless=False): 
     44        import configobj 
    5545        self.config = configobj.ConfigObj(CONFIG_PATH) 
    56         self.desktop = app.desktop() 
     46        if headless: 
     47            self.gui = None 
     48        else: 
     49            import gui 
     50            self.gui = gui 
     51        # Regular reactor import comes after possible Qt reactor integration in 
     52        # gui module 
     53        from twisted.internet import reactor 
    5754        reactor.callWhenRunning(self.startup) 
    5855        reactor.run() 
     
    6461        def gotSessionMgr(sessionMgr): 
    6562            self.sessionMgr = sessionMgr 
    66             self.loginWindow = gui.LoginWindow(self)         
     63            if self.gui: 
     64                self.loginWindow = self.gui.LoginWindow(self)         
    6765 
     66        import client 
    6867        self.client = client.Client(self, session=True) 
    6968        d = self.client.connect() 
    7069        d.addCallback(lambda p: p.callRemote('getSessionManager')) 
    7170        d.addCallback(gotSessionMgr) 
    72         return d 
    73  
    74     def shutdown(self): 
    75         d = self.client.disconnect() 
    76         d.addCallback(lambda _: reactor.stop()) 
    7771        return d 
    7872 
     
    8680                if hasattr(self, 'loginWindow'): 
    8781                    self.loginWindow.hide() 
    88                     self.sessionWindow = gui.SessionWindow(self, user) 
     82                    self.sessionWindow = self.gui.SessionWindow(self, user) 
    8983                    self.sessionWindow.show() 
    9084                self.activeUser = user 
     
    114108        new session. 
    115109        """ 
    116         self.loginWindow.show() 
    117         self.loginWindow.repaint() 
     110        if hasattr(self, 'loginWindow'): 
     111            self.loginWindow.show() 
     112            self.loginWindow.repaint() 
    118113        if hasattr(self, 'sessionWindow'): 
    119114            self.sessionWindow.wmStop() 
     
    123118        if callServer: 
    124119            return self.sessionMgr.callRemote('end') 
     120        from twisted.internet import defer 
    125121        return defer.succeed(None) 
    126122 
    127123 
    128 if __name__ == "__main__"
     124def run()
    129125    Manager() 
     126 
     127def runHeadless(): 
     128    Manager(headless=True) 
  • projects/AsynCluster/trunk/asyncluster/ndm/test/test_client.py

    r124 r125  
    3535 
    3636 
    37 def deferToDelay(delay=0.3): 
     37def deferToDelay(delay=0.4): 
    3838    d = defer.Deferred() 
    3939    reactor.callLater(delay, d.callback, None) 
     
    8585    def test_pids(self): 
    8686        def next(null): 
    87             pidsAfter = [x for x in self.root._pids() if x not in pidsBefore] 
    88             self.failUnlessEqual(pidsAfter, [pid]) 
    89             os.kill(pid, signal.SIGHUP) 
     87            pidsNew = [x for x in self.root._pids() if x not in pidsBefore] 
     88            self.failUnlessEqual(len(pidsNew), 1) 
     89            pidNew = pidsNew[0] 
     90            self.failUnless(pidNew > pid) 
     91            os.kill(pidNew, signal.SIGHUP) 
    9092         
    9193        pidsBefore = self.root._pids() 
     
    9496            client.PYTHON, client.PYTHON, "-c", self.root.workerCmd) 
    9597        return deferToDelay().addCallback(next) 
     98 
     99    def _killWorkers(self): 
     100        for pid in self.root._pids(): 
     101            os.kill(pid, signal.SIGHUP) 
    96102 
    97103    def test_spawnWorkers_normal(self): 
     
    107113            self.root.remote_spawnWorkers() 
    108114            self.failUnlessEqual(pids, self.root._pids()) 
    109          
    110         for pid in self.root._pids(): 
    111             os.kill(pid, signal.SIGHUP) 
     115            # Kill the spawned worker process(es) 
     116            self._killWorkers() 
     117             
     118        self._killWorkers() 
    112119        return deferToDelay().addCallback(first) 
    113120     
     
    121128            for pid in self.root._pids(): 
    122129                self.failIf(pid in pidsA) 
    123  
     130            self._killWorkers() 
     131         
    124132        # The worker(s) after the first spawn 
    125133        self.root.remote_spawnWorkers()