Changeset 122

Show
Ignore:
Timestamp:
02/14/08 23:18:31 (10 months ago)
Author:
edsuom
Message:

Working on node/worker client refactoring...

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/asyncluster/master/control.py

    r121 r122  
    178178 
    179179 
    180 class WorkerRoster(object): 
    181     """ 
    182     I manage a roster of child worker clients, generating new unique IDs as 
    183     requested and keeping track of which workers are running on which node. 
    184  
    185     @ivar pids: A dict of pids of child worker processes on the various nodes, 
    186       keyed by the unique ID I've assigned to that child worker. 
    187      
    188     """ 
    189     def __init__(self): 
    190         self.counter = 0 
    191         self.pids = {} 
    192  
    193     def all(self): 
    194         """ 
    195         Returns a list of all PIDs of all child worker processes on all 
    196         nodes. (There may be duplicates, due to the same PID being used on 
    197         different nodes.) 
    198         """ 
    199         return self.pids.values() 
    200  
    201          
    202      
    203  
    204180class Controller(object): 
    205181    """ 
     
    215191        self.counter = 0 
    216192        self.nodes = {} 
    217         self.roster = WorkerRoster() 
    218193        self.jobber = jobs.JobManager() 
    219194 
     
    227202        return self.sessionManager 
    228203 
    229     @defer.deferredGenerator 
    230204    def attachNode(self, nodePerspective, nodeRoot): 
    231205        """ 
    232206        Call when another mutually authenticated node client has 
    233207        connected. Attaches its root referenceable to my map of node roots 
    234         under a new ID in the form 'N001', then has the node spawn as many 
    235         child worker clients as needed to keep all of its cores occupied. 
     208        under a new integer ID, then has the node spawn as many child worker 
     209        clients as needed to keep all of its cores occupied. 
    236210 
    237211        Returns a deferred that fires with the node ID when all the spawning is 
     
    239213        """ 
    240214        self.counter += 1 
    241         ID = "N%03d" % self.counter 
    242         self.nodes[ID] = nodePerspective, nodeRoot 
    243         wfd = defer.waitForDeferred(nodeRoot.callRemote('mips')) 
    244         yield wfd 
    245         N_cores = len(wfd.getResult()) 
    246         wfd = defer.waitForDeferred( 
    247             nodeRoot.callRemote('checkWorkers', self,roster.all())) 
    248         yield wfd 
    249  
     215        self.nodes[self.counter] = nodePerspective, nodeRoot 
     216        d = nodeRoot.callRemote('spawnWorkers') 
     217        return d.addCallback(lambda _: self.counter) 
    250218 
    251219    def detachNode(self, ID): 
     
    258226            return self.sessionManager.end(ID, callClient=False) 
    259227 
    260     def attachWorker(self, nodeRoot, ID): 
     228    def attachWorker(self, nodeRoot): 
    261229        """ 
    262230        Call when another mutually authenticated worker client has 
    263231        connected. Attaches its root referenceable to my jobber and returns a 
    264         new ID in the form 'W001' based on the integer ID assigned by the 
    265         jobber. 
    266         """ 
    267         # TODO 
    268         d = self.jobber.attachChild(nodeRoot) 
    269         d.addCallback(lambda ID: "W%03d" % ID) 
    270         return d 
     232        deferred that fires with a new integer ID as assigned by the jobber. 
     233        """ 
     234        return self.jobber.attachChild(nodeRoot) 
    271235 
    272236    def detachWorker(self, ID): 
    273237        """ 
    274         A worker client has disconnected, so detach it from the jobber and 
    275         remote its PID from the I{workers} map. 
    276         """ 
    277         # TODO 
    278         if hasattr(self, 'sessionManager'): 
    279             return self.sessionManager.end(ID, callClient=False) 
    280  
     238        A worker client has disconnected, so detach it from the jobber. 
     239        """ 
     240        return self.jobber.detachChild(ID) 
     241     
    281242    def _remoteError(self, failure, ID): 
    282243        if failure.check(pb.DeadReferenceError, pb.PBConnectionLost): 
  • projects/AsynCluster/trunk/asyncluster/master/nodes.py

    r121 r122  
    3434class Perspective(pb.Avatar): 
    3535    """ 
    36     Each node's PB client receives a reference to its very own instance of me 
    37     as its perspective upon making an authenticated TCP connection to the node 
    38     master server. 
     36    Each PB node and worker client receives a reference to its very own 
     37    instance of me as its perspective upon making an authenticated TCP 
     38    connection to the node master server. 
    3939 
    4040    @ivar ID: A unique ID for this client, established during a mutually 
    41       authenticated client-server connection, in the form 'N001' for node 
    42       clients and 'W001' for worker clients. 
     41      authenticated client-server connection. 
    4342 
    4443    @ivar userID: The ID of any user having a session underway on the client if 
    4544      it is a node rather than a worker. 
     45 
     46    @ivar nodeClient: A Boolean that is set C{True} if my client is a node 
     47      client, as opposed to a child worker client. 
    4648     
    4749    """ 
     
    5557        """ 
    5658        return self.ctl.getSessionManager() 
     59 
     60    def _printableID(self): 
     61        return "%s%4d" % ("WN"[self.nodeClient], self.ID) 
    5762 
    5863    def attached(self, clientRoot): 
     
    6873        def responded(acceptanceCode): 
    6974            if acceptanceCode is None: 
    70                 d = defer.succeed(None) 
     75                return (pb.IPerspective, self, lambda _: None) 
     76            if acceptanceCode is True: 
     77                self.nodeClient = True 
     78                d = self.ctl.attachNode(self, clientRoot) 
    7179            else: 
    72                 clientRoot.notifyOnDisconnect(self.detached) 
    73                 if acceptanceCode is True: 
    74                     d = self.ctl.attachNode(self, clientRoot) 
    75                 else: 
    76                     d = self.ctl.attachWorker(clientRoot, acceptanceCode) 
    77                 d.addCallback(done) 
    78             d.addCallback(lambda _: (pb.IPerspective, self, self.detached)) 
    79             return d 
     80                self.nodeClient = False 
     81                d = self.ctl.attachWorker(clientRoot) 
     82            clientRoot.notifyOnDisconnect(self.detached) 
     83            return d.addCallback(doneAttaching) 
    8084     
    81         def done(ID): 
    82             print "%4s: Attached" % ID 
     85        def doneAttaching(ID): 
    8386            self.ID = ID 
     87            print "%s: Attached" % self._printableID() 
     88            return pb.IPerspective, self, self.detached 
    8489         
    8590        serverPassword = self.ctl.config['common']['server password'] 
    8691        d = clientRoot.callRemote('reverseLogin', serverPassword) 
    87         d.addCallback(responded) 
    88         return d 
     92        return d.addCallback(responded) 
    8993 
    9094    def detached(self, *null): 
    9195        """ 
    92         Called when the node client disconnects. 
     96        Called when the client disconnects. 
    9397        """ 
    9498        if hasattr(self, 'ID'): 
    95             print "%4s: Detached" % self.ID 
    96             self.ctl.detachNode(self.ID) 
     99            print "%s: Detached" % self._printableID() 
     100            if self.nodeClient: 
     101                d = self.ctl.detachNode(self.ID) 
     102            else: 
     103                d = self.ctl.detachWorker(self.ID) 
    97104            del self.ID 
     105            return d 
    98106 
    99107 
  • projects/AsynCluster/trunk/asyncluster/master/test/test_control.py

    r2 r122  
    9898    def setUp(self): 
    9999        def testableInit(self): 
     100            self.counter = 0 
    100101            self.nodes = {} 
    101102            self.jobber = mock.JobManager() 
     
    112113        def checkAttached(null): 
    113114            self.failUnless(expectedID in self.ctl.nodes) 
    114             self.failUnlessEqual( 
    115                 self.ctl.jobber.attached.get(expectedID, None), root) 
    116115             
    117116        d = self.ctl.attachNode(node, root) 
     
    191190        d.addCallback(self.ctl.nodeRemote, 'message', president) 
    192191        d.addCallback(checkCalled, president) 
    193         self.ctl.detachNode(self._nodeID
     192        d.addCallback(lambda _: self.ctl.detachNode(self._nodeID)
    194193        d.addCallback( 
    195194            lambda _: self.ctl.nodeRemote(self._nodeID, 'message', president)) 
  • projects/AsynCluster/trunk/asyncluster/ndm/client.py

    r121 r122  
    187187        raise ConnectionError("Couldn't connect to server") 
    188188 
     189    def connectionLost(self): 
     190        """ 
     191        Called to terminate my process upon loss of connection to the PB server. 
     192        """ 
     193        reactor.stop() 
     194         
    189195 
    190196class Client(object): 
     
    193199    root resource object. 
    194200 
    195     If I am constructed with the I{ID} keyword set to an integer, any 
    196     connection to the server will be as a child worker client. Otherwise, any 
    197     such connection will be as a node client, in which case, I will obtain and 
     201    If I am constructed with the I{session} keyword set C{True}, any connection 
     202    to the server will be as a node client. In that case, I will obtain and 
    198203    return a remote reference to the server's global session manager upon 
    199     connecting. 
    200     """ 
    201     def __init__(self, main, ID=None): 
     204    connecting. Otherwise, I'm just a lowly worker client. 
     205    """ 
     206    def __init__(self, main, session=False): 
    202207        self.main = main 
    203         self.ID = ID 
     208        self.session = session 
    204209     
    205210    def connect(self): 
     
    223228        credential = credentials.UsernamePassword(cc['user'], cc['password']) 
    224229        serverPassword = self.main.config['common']['server password'] 
    225         if self.ID is None
     230        if self.session
    226231            self.root = SessionRoot(serverPassword, self.main) 
    227232        else: 
    228             self.root = ChildRoot(serverPassword, ID
     233            self.root = ChildRoot(serverPassword
    229234        # Do the login 
    230235        return factory.login(credential, self.root).addBoth(gotAnswer) 
     
    243248        # When that happens, we will want to: 
    244249        # (1) end any active session, and 
    245         if self.ID is None
     250        if self.session
    246251            d.addCallback(lambda _: self.main.sessionEnd()) 
    247252        # (2) disconnect from the server 
  • projects/AsynCluster/trunk/asyncluster/ndm/main.py

    r121 r122  
    4747    config = configobj.ConfigObj(CONFIG_PATH) 
    4848 
    49     def __init__(self, ID): 
     49    def __init__(self): 
    5050        # The Twisted reactor, with no GUI integration needed 
    5151        from twisted.internet import reactor; self.reactor = reactor 
    5252        # The session-less client 
    53         self.client = client.Client(self, ID
     53        self.client = client.Client(self
    5454        # Go! 
    5555        self.reactor.callWhenRunning(self.client.connect) 
     
    8888            self.loginWindow = self.gui.LoginWindow(self)         
    8989 
    90         self.client = client.Client(self
     90        self.client = client.Client(self, session=True
    9191        d = self.client.connect() 
    9292        d.addCallback(lambda p: p.callRemote('getSessionManager'))