Changeset 121

Show
Ignore:
Timestamp:
02/14/08 22:27:24 (10 months ago)
Author:
edsuom
Message:

Working on node/worker client refactoring...

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/asyncluster/master/control.py

    r119 r121  
    4242    substitution of the message prototype. 
    4343    """ 
    44     msg = "N%02d" % nodeID 
     44    msg = "%4s" % nodeID 
    4545    if args: 
    4646        msg = "%s (%s): %s" % (msg, args[0], msgProto) 
     
    178178 
    179179 
     180class WorkerRoster(object): 
     181    """ 
     182    I manage a roster of child worker clients, generating new unique IDs as 
     183    requested and keeping track of which workers are running on which node. 
     184 
     185    @ivar pids: A dict of pids of child worker processes on the various nodes, 
     186      keyed by the unique ID I've assigned to that child worker. 
     187     
     188    """ 
     189    def __init__(self): 
     190        self.counter = 0 
     191        self.pids = {} 
     192 
     193    def all(self): 
     194        """ 
     195        Returns a list of all PIDs of all child worker processes on all 
     196        nodes. (There may be duplicates, due to the same PID being used on 
     197        different nodes.) 
     198        """ 
     199        return self.pids.values() 
     200 
     201         
     202     
     203 
    180204class Controller(object): 
    181205    """ 
    182206    I control everything, I{heh heh heh...} 
     207 
     208    @ivar nodes: A dict of tuples, keyed by node client ID, where each tuple 
     209      contains the local perspective instance and remote root reference for one 
     210      node client. 
     211     
    183212    """ 
    184213    def __init__(self, config): 
    185214        self.config = config 
     215        self.counter = 0 
    186216        self.nodes = {} 
     217        self.roster = WorkerRoster() 
    187218        self.jobber = jobs.JobManager() 
    188219 
     
    196227        return self.sessionManager 
    197228 
     229    @defer.deferredGenerator 
    198230    def attachNode(self, nodePerspective, nodeRoot): 
    199231        """ 
    200         Another mutually authenticated node client has connected, so give it a 
    201         unique ID, attach its root referenceable to my map of node roots under 
    202         that ID, and return a deferred that fires with the ID when the jobber 
    203         has attached the node as well. 
    204  
    205         TODO: Don't attach the node to the jobber if the perspective is for a 
    206         GUI client. 
    207         """ 
    208         def gotID(ID): 
    209             self.nodes[ID] = nodePerspective, nodeRoot 
    210             return ID 
    211          
    212         return self.jobber.attachChild(nodeRoot).addCallback(gotID) 
    213      
     232        Call when another mutually authenticated node client has 
     233        connected. Attaches its root referenceable to my map of node roots 
     234        under a new ID in the form 'N001', then has the node spawn as many 
     235        child worker clients as needed to keep all of its cores occupied. 
     236 
     237        Returns a deferred that fires with the node ID when all the spawning is 
     238        done. 
     239        """ 
     240        self.counter += 1 
     241        ID = "N%03d" % self.counter 
     242        self.nodes[ID] = nodePerspective, nodeRoot 
     243        wfd = defer.waitForDeferred(nodeRoot.callRemote('mips')) 
     244        yield wfd 
     245        N_cores = len(wfd.getResult()) 
     246        wfd = defer.waitForDeferred( 
     247            nodeRoot.callRemote('checkWorkers', self,roster.all())) 
     248        yield wfd 
     249 
     250 
    214251    def detachNode(self, ID): 
    215252        """ 
    216253        A node client has disconnected, so detach its root referenceable from 
    217254        my map of node roots. 
    218  
    219         AsynQueue's job manager takes care of detaching the node's worker from 
    220         the queue, so we don't get involved with that. 
    221255        """ 
    222256        self.nodes.pop(ID, None) 
     257        if hasattr(self, 'sessionManager'): 
     258            return self.sessionManager.end(ID, callClient=False) 
     259 
     260    def attachWorker(self, nodeRoot, ID): 
     261        """ 
     262        Call when another mutually authenticated worker client has 
     263        connected. Attaches its root referenceable to my jobber and returns a 
     264        new ID in the form 'W001' based on the integer ID assigned by the 
     265        jobber. 
     266        """ 
     267        # TODO 
     268        d = self.jobber.attachChild(nodeRoot) 
     269        d.addCallback(lambda ID: "W%03d" % ID) 
     270        return d 
     271 
     272    def detachWorker(self, ID): 
     273        """ 
     274        A worker client has disconnected, so detach it from the jobber and 
     275        remote its PID from the I{workers} map. 
     276        """ 
     277        # TODO 
    223278        if hasattr(self, 'sessionManager'): 
    224279            return self.sessionManager.end(ID, callClient=False) 
     
    231286    def nodeRemote(self, nodeID, called, *args, **kw): 
    232287        """ 
    233         Immediately runs a non-queued call to the object specified by the 
    234         string I{called} on the node identified by the integer I{nodeID}, 
    235         supplying any provided arguments or keywords. 
     288        Runs a remote call to the object specified by the string I{called} on 
     289        the node identified by the integer I{nodeID}, supplying any provided 
     290        arguments or keywords. 
    236291        """ 
    237292        if nodeID not in self.nodes: 
  • projects/AsynCluster/trunk/asyncluster/master/nodes.py

    r2 r121  
    44# any of the cluster node workstations. 
    55# 
    6 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com 
     6# Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 
    77# 
    88# This program is free software; you can redistribute it and/or modify it under 
     
    3838    master server. 
    3939 
    40     @ivar ID: A unique ID for this node during a mutually authenticated 
    41         client-server connection. 
     40    @ivar ID: A unique ID for this client, established during a mutually 
     41      authenticated client-server connection, in the form 'N001' for node 
     42      clients and 'W001' for worker clients. 
    4243 
    43     @ivar userID: The ID of any user having a session underway on the node. 
     44    @ivar userID: The ID of any user having a session underway on the client if 
     45      it is a node rather than a worker. 
    4446     
    4547    """ 
     
    6466        this server to execute for computing jobs will not do bad things to it. 
    6567        """ 
    66         def responded(accepted): 
    67             if accepted: 
     68        def responded(acceptanceCode): 
     69            if acceptanceCode is None: 
     70                d = defer.succeed(None) 
     71            else: 
    6872                clientRoot.notifyOnDisconnect(self.detached) 
    69                 d = self.ctl.attachNode(self, clientRoot) 
     73                if acceptanceCode is True: 
     74                    d = self.ctl.attachNode(self, clientRoot) 
     75                else: 
     76                    d = self.ctl.attachWorker(clientRoot, acceptanceCode) 
    7077                d.addCallback(done) 
    71             else: 
    72                 d = defer.succeed(None) 
    7378            d.addCallback(lambda _: (pb.IPerspective, self, self.detached)) 
    7479            return d 
    75  
     80     
    7681        def done(ID): 
    77             print "N%02d: Attached" % ID 
     82            print "%4s: Attached" % ID 
    7883            self.ID = ID 
    7984         
     
    8893        """ 
    8994        if hasattr(self, 'ID'): 
    90             print "N%02d: Detached" % self.ID 
     95            print "%4s: Detached" % self.ID 
    9196            self.ctl.detachNode(self.ID) 
    9297            del self.ID 
     
    135140    def requestAvatar(self, avatarID, mind, *interfaces): 
    136141        """ 
    137         Returns a deferred of fires with the required 
    138         I{interface, perspective, logout} tuple after the perspective attempts 
    139         a reverse log into the client. 
     142        Returns a deferred that fires with the required I{interface, 
     143        perspective, logout} tuple after the perspective attempts a reverse 
     144        login to the client. 
    140145        """ 
    141146        if pb.IPerspective not in interfaces: 
  • projects/AsynCluster/trunk/asyncluster/ndm/client.py

    r119 r121  
    99# to the workstation users. 
    1010# 
    11 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com 
     11# Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 
    1212# 
    1313# This program is free software; you can redistribute it and/or modify it under 
     
    2828""" 
    2929 
    30 import os 
     30import os, signal 
    3131from twisted.internet import defer, reactor, threads 
    3232from twisted.cred import credentials 
     
    3535from asynqueue import jobs 
    3636 
    37  
    38 WINDOW_LINGER_TIME = 0.5 
     37PYTHON="/usr/bin/python" 
     38 
     39 
     40def checkTrust(f): 
     41    if f.im_self.trusted: 
     42        return f 
     43    raise jobs.TrustError() 
    3944 
    4045 
     
    4651 
    4752 
    48 class ClientRoot(jobs.ChildRoot): 
    49     """ 
    50     I am the root resource for one cluster node, with the remote-callable 
    51     methods of a child worker plus a few extra for NDM purposes. 
    52     """ 
    53     def __init__(self, main, serverPassword): 
     53class ChildRoot(jobs.ChildRoot): 
     54    """ 
     55    I am the root resource for one child worker client capable of running 
     56    cluster jobs. 
     57    """ 
     58    trusted = False 
     59     
     60    def __init__(self, serverPassword, ID): 
     61        self.serverPassword, self.ID = serverPassword, ID 
     62 
     63    def remote_reverseLogin(self, password): 
     64        """ 
     65        The server calls this method with its own password to authenticate 
     66        itself to the client, in this case a child worker client. 
     67 
     68        If the server is authenticated, returns my unique integer 
     69        ID. Otherwise, returns C{None}. 
     70        """ 
     71        self.trusted = (password == self.serverPassword) 
     72        if self.trusted: 
     73            return self.ID 
     74 
     75 
     76class SessionRoot(pb.Root): 
     77    """ 
     78    I am the root resource for one NDM client capable of spawning worker 
     79    clients and managing user sessions. 
     80    """ 
     81    workerCmd = "from asyncluster.ndm import main; main.BaseManager()" 
     82     
     83    def __init__(self, serverPassword, main): 
     84        self.serverPassword = serverPassword 
    5485        self.main = main 
    55         self.serverPassword = serverPassword 
    56      
     86 
    5787    def remote_reverseLogin(self, password): 
    5888        """ 
    5989        The server calls this method with its own password to authenticate 
    60         itself to the client. 
    61  
    62         @return: C{True} if the server was successfully authenticated. 
     90        itself to the client, in this case a node client. 
     91 
     92        If the server is authenticated, returns C{True}. Otherwise, returns 
     93        C{None}. 
    6394        """ 
    6495        self.trusted = (password == self.serverPassword) 
    65         return self.trusted 
     96        if self.trusted: 
     97            return True 
    6698 
    6799    def remote_setTimeLeft(self, hoursLeft): 
     
    77109        pass 
    78110 
     111    @checkTrust 
    79112    def remote_bash(self, script): 
    80113        """ 
     
    89122        return threads.deferToThread(os.waitpid, pid, 0).addCallback(done) 
    90123 
    91     def remote_mips(self): 
     124    def _mips(self): 
    92125        """ 
    93126        Returns a list of bogomips float values for each core in the client's 
     
    102135        return values 
    103136 
     137    def _workersRunning(self): 
     138        """ 
     139        Returns the number of child worker processes currently running on the 
     140        client node. 
     141        """ 
     142        count = 0 
     143        for subdir in os.listdir("/proc/"): 
     144            if not subdir.isdigit(): 
     145                continue 
     146            procPath = "/proc/%s/cmdline" % subdir 
     147            if not os.access(procPath, os.R_OK): 
     148                continue 
     149            fh = open(procPath, 'rb') 
     150            cmdline = fh.read().split('\x00') 
     151            fh.close() 
     152            if cmdline[0] != PYTHON: 
     153                continue 
     154            if cmdline[1] != '-c': 
     155                continue 
     156            if cmdline[2] == self.workerCmd: 
     157                count += 1 
     158        return count 
     159 
     160    @checkTrust 
     161    def remote_spawnWorkers(self): 
     162        """ 
     163        Spawns child processes as needed to keep one child worker client 
     164        running for each CPU core of the node. 
     165        """ 
     166        N = len(self._mips()) - self._workersRunning() 
     167        for k in xrange(N): 
     168            os.spawnl(os.P_NOWAIT, PYTHON, PYTHON, "-c", self.workerCmd) 
     169     
     170    @checkTrust 
     171    def remote_kill(self, *pids): 
     172        """ 
     173        Kills the processes corresponding to the PID(s) supplied as one or more 
     174        integer arguments. 
     175        """ 
     176        for pid in pids: 
     177            os.kill(pid, signal.SIGHUP) 
     178 
     179     
    104180 
    105181class ClientFactory(pb.PBClientFactory): 
     
    116192    I connect to the master TCP server via PB and offer it L{ClientRoot} as my 
    117193    root resource object. 
    118     """ 
    119     def __init__(self, main): 
     194 
     195    If I am constructed with the I{ID} keyword set to an integer, any 
     196    connection to the server will be as a child worker client. Otherwise, any 
     197    such connection will be as a node client, in which case, I will obtain and 
     198    return a remote reference to the server's global session manager upon 
     199    connecting. 
     200    """ 
     201    def __init__(self, main, ID=None): 
    120202        self.main = main 
     203        self.ID = ID 
    121204     
    122205    def connect(self): 
    123206        """ 
    124         Connects to the master TCP server, returning a deferred that fires with 
    125         a remote reference to the server's global session manager. 
     207        Connects to the master TCP server. Returns a deferred that fires with 
     208        the perspective provided by the server if and when the connection 
     209        succeeds. 
    126210        """ 
    127211        def gotAnswer(answer): 
    128212            if pb.IUnjellyable.providedBy(answer): 
    129213                self.perspective = answer 
    130                 return answer.callRemote('getSessionManager') 
     214                return answer 
    131215            raise ConnectionError("Couldn't authorize connection to server") 
    132216         
     
    139223        credential = credentials.UsernamePassword(cc['user'], cc['password']) 
    140224        serverPassword = self.main.config['common']['server password'] 
    141         self.root = ClientRoot(self.main, serverPassword) 
     225        if self.ID is None: 
     226            self.root = SessionRoot(serverPassword, self.main) 
     227        else: 
     228            self.root = ChildRoot(serverPassword, ID) 
    142229        # Do the login 
    143230        return factory.login(credential, self.root).addBoth(gotAnswer) 
     
    156243        # When that happens, we will want to: 
    157244        # (1) end any active session, and 
    158         d.addCallback(lambda _: self.main.sessionEnd()) 
     245        if self.ID is None: 
     246            d.addCallback(lambda _: self.main.sessionEnd()) 
    159247        # (2) disconnect from the server 
    160248        d.addCallback(lambda _: self.connector.disconnect()) 
  • projects/AsynCluster/trunk/asyncluster/ndm/main.py

    r115 r121  
    99# to the workstation users. 
    1010# 
    11 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com 
     11# Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 
    1212# 
    1313# This program is free software; you can redistribute it and/or modify it under 
     
    3232from twisted.internet import defer 
    3333 
     34import configobj, client 
     35 
    3436 
    3537CONFIG_PATH = "/etc/asyncluster.conf" 
     
    3840class BaseManager(object): 
    3941    """ 
    40     I am a base class for the node client, GUI or console. 
     42    I am a base class for node and worker clients alike. 
    4143 
    42     @ivar d: A deferred that fires when the client connects to the 
    43       AsynCluster server. 
    44  
    45     @ivar config: A L{configobj} config object loaded from the config file. 
     44    @cvar config: A L{configobj} config object loaded from the config file. 
    4645     
    4746    """ 
    48     def __init__(self): 
     47    config = configobj.ConfigObj(CONFIG_PATH) 
     48 
     49    def __init__(self, ID): 
    4950        # The Twisted reactor, with no GUI integration needed 
    5051        from twisted.internet import reactor; self.reactor = reactor 
     52        # The session-less client 
     53        self.client = client.Client(self, ID) 
    5154        # Go! 
    52         self.reactor.callWhenRunning(self.startup
     55        self.reactor.callWhenRunning(self.client.connect
    5356        self.reactor.run() 
    5457 
    55     def gotConnected(self, sessionMgr): 
    56         """ 
    57         Connected callback for console clients. 
    58         """ 
    59         self.sessionMgr = sessionMgr 
    60  
    61     def startup(self): 
    62         import configobj, client 
    63         self.config = configobj.ConfigObj(CONFIG_PATH) 
    64         self.client = client.Client(self) 
    65         self.activeUser = None 
    66         self.client.connect().addCallback(self.gotConnected) 
    67      
    6858    def shutdown(self): 
    6959        d = self.client.disconnect() 
     
    7161        return d 
    7262 
     63 
     64class NodeManager(BaseManager): 
     65    """ 
     66    I am a manager for a node clients. 
     67    """ 
     68    def __init__(self): 
     69        # Start PyQt4 with Twisted integration 
     70        from twisted_goodies.qtwisted import qt4reactor 
     71        from PyQt4.QtGui import QApplication 
     72        self.app = QApplication([]) 
     73        qt4reactor.install(self.app) 
     74        # Now get the regular reactor 
     75        from twisted.internet import reactor; self.reactor = reactor 
     76        # The gui module... 
     77        import gui; self.gui = gui 
     78        # Go! 
     79        self.reactor.callWhenRunning(self.startup) 
     80        self.reactor.run() 
     81 
     82    def startup(self): 
     83        """ 
     84        Instantiates a session-capable client and connects it to the server. 
     85        """ 
     86        def gotSessionMgr(sessionMgr): 
     87            self.sessionMgr = sessionMgr 
     88            self.loginWindow = self.gui.LoginWindow(self)         
     89 
     90        self.client = client.Client(self) 
     91        d = self.client.connect() 
     92        d.addCallback(lambda p: p.callRemote('getSessionManager')) 
     93        d.addCallback(gotSessionMgr) 
     94        return d 
     95 
    7396    def sessionBegin(self, user, password): 
    7497        """ 
    75         For all clients, requests a session for the specified I{user}, 
    76         authenticated with the supplied I{password}. 
     98        Requests a session for the specified I{user}, authenticated with the 
     99        supplied I{password}. 
    77100        """ 
    78101        def gotSessionAnswer(approved): 
     
    93116    def sessionUpdate(self, hoursLeft): 
    94117        """ 
    95         For all clients, updates the session. 
     118        Updates the session. 
    96119        """ 
    97120        if self.activeUser is None: 
     
    105128    def sessionEnd(self, callServer=True): 
    106129        """ 
    107         For all clients, ends the session. 
    108         """ 
    109         self.activeUser = None 
    110         if callServer: 
    111             return self.sessionMgr.callRemote('end') 
    112         return defer.succeed(None) 
    113  
    114  
    115 class GuiManager(BaseManager): 
    116     """ 
    117     """ 
    118     def __init__(self): 
    119         # Start PyQt4 with Twisted integration 
    120         from twisted_goodies.qtwisted import qt4reactor 
    121         from PyQt4.QtGui import QApplication 
    122         self.app = QApplication([]) 
    123         qt4reactor.install(self.app) 
    124         # Now get the regular reactor 
    125         from twisted.internet import reactor; self.reactor = reactor 
    126         # The gui module... 
    127         import gui; self.gui = gui 
    128         # Go! 
    129         self.reactor.callWhenRunning(self.startup) 
    130         self.reactor.run() 
    131  
    132     def gotConnected(self, sessionMgr): 
    133         """ 
    134         Connected callback for GUI clients. 
    135         """ 
    136         self.sessionMgr = sessionMgr 
    137         self.loginWindow = self.gui.LoginWindow(self)         
    138  
    139     def sessionEnd(self, callServer=True): 
    140         """ 
    141         Ends the session for GUI clients. 
     130        Ends the session, returning a deferred that fires when I'm ready for a 
     131        new session. 
    142132        """ 
    143133        self.loginWindow.show() 
     
    147137            self.sessionWindow.close() 
    148138            del self.sessionWindow 
    149         return BaseManager.sessionEnd(self, callServer) 
     139        self.activeUser = None 
     140        if callServer: 
     141            return self.sessionMgr.callRemote('end') 
     142        return defer.succeed(None) 
    150143 
    151144 
    152 if __name__ == '__main__': 
    153     from optparse import OptionParser 
    154     parser = OptionParser() 
    155     parser.add_option( 
    156         "-g", "--gui", 
    157         action="store_true", dest="gui", 
    158         help="Run the NDM application in a fixed-sized, unmanaged window") 
    159     opts, args = parser.parse_args() 
    160  
    161     if opts.gui: 
    162         # Run the NDM application in a fixed-sized, unmanaged window with the 
    163         # overall event loop under Twisted control. 
    164         GuiManager() 
    165     else: 
    166         # Run a console-only client with no user session. 
    167         BaseManager()     
    168  
    169