Changeset 76

Show
Ignore:
Timestamp:
08/17/07 16:34:03 (1 year ago)
Author:
edsuom
Message:

Refactoring PB-connected worker code from asyncluster.master.jobs to asynqueue.jobs; unit testing

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/asyncluster/master/jobs.py

    r2 r76  
    2626from twisted.internet import defer 
    2727 
    28 from twisted_goodies import taskqueue 
     28import asynqueue 
    2929 
    3030 
    31 class NodeWorker(taskqueue.RemoteCallWorker): 
     31class NodeWorker(asynqueue.RemoteCallWorker): 
    3232    """ 
    33     I implement a L{taskqueue.IWorker} that runs tasks, up to I{N} pending at a 
     33    I implement a L{asynqueue.IWorker} that runs tasks, up to I{N} pending at a 
    3434    time, in a particular job on a particular node. 
    3535    """ 
     
    5252    def __init__(self): 
    5353        self.jobs  = {} 
    54         self.queue = taskqueue.TaskQueue() 
     54        self.queue = asynqueue.TaskQueue() 
    5555 
    5656    def shutdown(self): 
  • projects/AsynCluster/trunk/asyncluster/ndm/client.py

    r16 r76  
    3232from twisted.spread import pb 
    3333 
     34from asynqueue import jobs 
     35 
    3436 
    3537WINDOW_LINGER_TIME = 0.5 
     
    4345 
    4446 
    45 class ClientRoot(pb.Referenceable): 
     47class ClientRoot(jobs.ChildRoot): 
    4648    """ 
    4749    I am the root resource for one cluster node. 
     
    5052        self.main = main 
    5153        self.serverPassword = serverPassword 
    52         self.trusted = False 
    53         self.jobs = {} 
    5454     
    5555    def remote_reverseLogin(self, password): 
     
    7474        """ 
    7575        pass 
    76  
    77     def remote_newJob(self, jobID, jobCode): 
    78         """ 
    79         Registers the job identified by the specified integer I{jobID} and 
    80         represented by Python code contained in the string I{jobCode}. 
    81  
    82         @return: C{True} if the I{jobCode} was accepted and executed OK, 
    83             C{False} otherwise. 
    84              
    85         """ 
    86         if not self.trusted: 
    87             return False 
    88         try: 
    89             namespace = {} 
    90             exec jobCode in namespace 
    91         except: 
    92             return False 
    93         self.jobs[jobID] = namespace 
    94         return True 
    95      
    96     def remote_runJob(self, jobID, callName, *args, **kw): 
    97         """ 
    98         Runs the specified callable object with any supplied 
    99         arguments and keywords in the namespace of the specified I{jobID}. 
    100          
    101         @param jobID: An integer uniquely specifying an already-registered job. 
    102  
    103         @param callName: A string with the name of a callable object in the 
    104             job's namespace. 
    105  
    106         @args: Any arguments for the call. 
    107  
    108         @kw: Any keywords for the call. 
    109          
    110         @return: A deferred to the eventual result of the call. 
    111          
    112         """ 
    113         if self.trusted: 
    114             calledObject = getattr(self.jobs[jobID], callName, None) 
    115             if callable(calledObject): 
    116                 self.d_runningJob = defer.maybeDeferred( 
    117                     calledObject(*args, **kw)) 
    118                 return self.d_runningJob 
    11976                 
    12077 
  • projects/AsynQueue/branches/processworker/asynqueue/processworker.py

    r75 r76  
    1919 
    2020""" 
    21 Perspective Broker (PB) over STDIO 
     21An IWorker implementation using Perspective Broker (PB) over STDIO. 
    2222 
    2323Based on coding by Konrads Smelkovs 
     
    2525 
    2626from twisted.internet import protocol, stdio, reactor 
     27 
     28import base, workers 
    2729 
    2830 
     
    6466        """ 
    6567        self.proto.connectionLost(reason) 
     68 
     69 
     70class ChildRoot(pb.Root): 
     71    """ 
     72    I am the root resource for one child worker process. 
     73    """ 
     74    def __init__(self): 
     75        self.jobs = {} 
     76        self.queue = base.TaskQueue(workers.ThreadWorker()) 
     77        # What's this next line for???? 
     78        self.pbfactory = pb.PBServerFactory(self) 
     79 
     80    def remote_newJob(self, jobID, jobCode): 
     81        """ 
     82        Registers the job identified by the specified integer I{jobID} and 
     83        represented by Python code contained in the string I{jobCode}. 
     84 
     85        Returns a deferred that fires with C{True} if the I{jobCode} executed 
     86        OK, or C{False} otherwise. 
     87        """ 
     88        def tryJob(): 
     89            try: 
     90                namespace = {} 
     91                exec jobCode in namespace 
     92            except: 
     93                return False 
     94            self.jobs[jobID] = namespace 
     95            return True 
     96 
     97        return self.queue.call(tryJob, niceness=-10) 
     98     
     99    def remote_runJob(self, jobID, callName, *args, **kw): 
     100        """ 
     101        Runs the specified callable object with any supplied arguments and 
     102        keywords in the namespace of the specified I{jobID}. 
    66103         
     104        @param jobID: An integer uniquely specifying an already-registered job. 
     105 
     106        @param callName: A string with the name of a callable object in the 
     107          job's namespace. 
     108 
     109        @args: Any arguments for the call. 
     110 
     111        @kw: Any keywords for the call. 
     112         
     113        @return: A deferred to the eventual result of the call. 
     114         
     115        """ 
     116        calledObject = getattr(self.jobs[jobID], callName, None) 
     117        if callable(calledObject): 
     118            return self.queue.call(calledObject, *args, **kw) 
     119        raise AttributeError( 
     120            "No callable object '%s' defined in namespace for job %d" \ 
     121            % (callName, jobID)) 
     122 
     123    def remote_exit(self): 
     124        """ 
     125        Terminates the child worker process. 
     126        """ 
     127        self.queue.shutdown().addCallback(lambda _: reactor.stop()) 
     128 
     129 
     130class ProcessWorker(workers.RemoteCallWorker): 
     131    """ 
     132    I implement an L{asynqueue.IWorker} that runs tasks, up to I{N} pending at 
     133    a time, in a particular job and child process. 
     134    """ 
     135    def _runNow(self, null, task): 
     136        funcName, args, kw = task.callTuple 
     137        d = self.remoteCaller('runJob', task.series, funcName, *args, **kw) 
     138        job = (task, d) 
     139        self.jobs.append(job) 
     140        d.addBoth(self._doneTrying, job) 
     141        # This task's deferred is NOT returned! 
     142 
     143 
    67144 
    68145def PBStdioChildConnector(factory): 
     
    90167    return process 
    91168 
     169 
     170def runAsChild(): 
     171    """ 
     172    This function takes care of everything needed for a Python interpreter to 
     173    act as a child process worker. 
     174    """ 
     175    root = ChildRoot() 
     176 
     177 
     178if __name__ == '__main__': 
     179    runAsChild() 
  • projects/AsynQueue/trunk/asynqueue/workers.py

    r55 r76  
    114114 
    115115 
    116 class ThreadWorker
     116class ThreadWorker(object)
    117117    """ 
    118118    I implement an L{IWorker} that runs tasks in a dedicated worker thread. 
     
    212212 
    213213 
    214 class RemoteCallWorker
     214class RemoteCallWorker(object)
    215215    """ 
    216216    Instances of me provide an L{IWorker} that dispatches