Changeset 78

Show
Ignore:
Timestamp:
08/18/07 00:49:01 (1 year ago)
Author:
edsuom
Message:

Got asynqueue jobs & processworker running and (pretty well) tested

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/branches/processworker/asynqueue/jobs.py

    r77 r78  
    2525from zope.interface import implements, Interface 
    2626from twisted.internet import defer 
     27from twisted.python.failure import Failure 
    2728from twisted.spread import pb, flavors 
    2829 
     
    122123        # This task's deferred is NOT returned! 
    123124 
     125    def stop(self): 
     126        d = workers.RemoteCallWorker.stop(self) 
     127        d.addBoth(lambda _: self.remoteCaller('exit')) 
     128        return d 
     129 
    124130 
    125131class JobManager(object): 
     
    130136    connection to the interpreters and balance the load across the workers 
    131137    while still permitting some priority queueing of jobs by niceness. 
    132     """ 
    133     def __init__(self): 
     138 
     139    You can supply an instance of L{base.TaskQueue} to the constructor. I will 
     140    instantiate my own if not. 
     141 
     142    @ivar queue: The TaskQueue instance I'm using. 
     143     
     144    """ 
     145    def __init__(self, queue=None): 
    134146        self.jobs  = {} 
    135         self.queue = base.TaskQueue() 
    136  
     147        if queue is None: 
     148            self.queue = base.TaskQueue() 
     149        else: 
     150            self.queue = queue 
     151     
    137152    def shutdown(self): 
    138153        """ 
     
    151166         
    152167        """ 
    153         def tried(success, worker, jobID): 
     168        def tried(success, jobID): 
    154169            if success: 
    155170                self.queue.qualifyWorker(worker, jobID) 
    156  
     171         
    157172        def oops(failure): 
    158173            if failure.check(flavors.NoSuchMethod): 
    159                 raise TypeError("Supplied root reference doesn't conform") 
    160          
     174                result[0] = Failure( 
     175                    TypeError("Supplied root reference doesn't conform")) 
     176 
    161177        worker = ChildWorker(childRoot) 
    162         childID = self.queue.attachWorker(worker) 
     178        result = [self.queue.attachWorker(worker)] 
    163179        dList = [] 
    164180        for jobID, jobInfo in self.jobs.iteritems(): 
    165181            jobCode = jobInfo[0] 
    166182            d = childRoot.callRemote('newJob', jobID, jobCode) 
    167             d.addCallback(tried, worker, jobID) 
     183            d.addCallback(tried, jobID) 
    168184            d.addErrback(oops) 
    169185            dList.append(d) 
    170         return defer.DeferredList(dList).addCallback(lambda _: childID
     186        return defer.DeferredList(dList).addCallback(lambda _: result[0]
    171187     
    172188    def detachChild(self, childID): 
  • projects/AsynQueue/branches/processworker/asynqueue/processworker.py

    r77 r78  
    2424""" 
    2525 
     26import sys 
    2627from twisted.internet import protocol, stdio, reactor, defer 
    2728from twisted.spread import pb 
     
    3435    I wrap a L{Protocol} instance in a L{ProcessProtocol} instance so that... 
    3536    """ 
    36     def __init__(self, proto): 
     37    def __init__(self, proto, startCallback=None, stopCallback=None): 
    3738        self.proto = proto 
     39        self.startCallback = startCallback 
     40        self.stopCallback = stopCallback 
    3841 
    3942    def connectionMade(self): 
     
    4245        """ 
    4346        self.proto.connectionMade() 
     47        if callable(self.startCallback): 
     48            self.startCallback() 
    4449     
    4550    def outReceived(self, data): 
     
    5459        When the process ends, the connection is lost. 
    5560        """ 
    56         self.proto.connectionLost(reason) 
     61        self.connectionLost(reason) 
    5762     
    5863    def makeConnection(self, transport): 
     
    6065        """ 
    6166        self.proto.transport = transport 
    62         self.proto.connectionMade() 
     67        self.connectionMade() 
    6368         
    6469    def dataReceived(self, data): 
     
    7378        """ 
    7479        self.proto.connectionLost(reason) 
     80        if callable(self.stopCallback): 
     81            self.stopCallback() 
    7582 
    7683 
    77 class Parent(object): 
     84class ChildManager(jobs.JobManager): 
    7885    """ 
    79     I manage a pool of one or more child python interpreters, attaching and 
    80     detaching them as workers to and from the supplied taskqueue. 
    81  
    82     @ivar d: A deferred that fires when all of the interpreters have been 
    83       started and the queue is ready to accept jobs for them. 
    84      
     86    I am a L{jobs.JobManager} that manages a pool of one or more child python 
     87    interpreters as at least some of its workers. 
    8588    """ 
    86     def __init__(self, queue, N=1): 
    87         def gotRoot(root): 
    88             worker = jobs.ChildWorker(root) 
    89             return queue.attachWorker(worker) 
    90  
    91         dList = [self.spawnChild().addCallback(gotRoot) for k in xrange(N)] 
    92         self.d = defer.DeferredList(dList) 
     89    @defer.deferredGenerator 
     90    def startup(self, N=1): 
     91        """ 
     92        Starts I{N} child interpreters and attaches them to my queue. Returns a 
     93        deferred that fires with a list of the IDs for the interpreters when 
     94        they have all been started and the queue is ready to accept jobs for 
     95        them. 
     96        """ 
     97        self.children = {} 
     98        for k in xrange(N): 
     99            wfd = defer.waitForDeferred(self.spawnChild()) 
     100            yield wfd 
     101            process, root = wfd.getResult() 
     102            wfd = defer.waitForDeferred(self.attachChild(root)) 
     103            yield wfd 
     104            childID = wfd.getResult() 
     105            self.children[childID] = process 
     106        yield self.children.keys() 
    93107     
    94108    def spawnChild(self): 
     
    99113        its server-side PB broker. 
    100114 
    101         Returns a deferred that fires with a reference to the child's root 
    102         object
     115        Returns a deferred that fires with the child's process and PB root 
     116        objects
    103117        """ 
    104118        factory = pb.PBClientFactory() 
    105119        broker = factory.buildProtocol(('127.0.0.1',)) 
    106         wrappedProtocol = ProtocolWrapper(broker) 
    107         d = reactor.spawnProcess( 
    108             wrappedProtocol, sys.executable, 
    109             args=(sys.executable, __file__), env=None) 
     120        d = defer.Deferred() 
     121        wrappedProtocol = ProtocolWrapper( 
     122            broker, startCallback=lambda: d.callback(None)) 
     123        args = (sys.executable, __file__) 
     124        process = reactor.spawnProcess( 
     125            wrappedProtocol, sys.executable, args=args, env=None) 
    110126        d.addCallback(lambda _: factory.getRootObject()) 
     127        d.addCallback(lambda root: (process, root)) 
    111128        return d 
     129 
     130    def shutdown(self): 
     131        def wrapUp(null): 
     132            for process in self.children.itervalues(): 
     133                process.loseConnection() 
     134            self.children.clear() 
     135         
     136        return jobs.JobManager.shutdown(self).addCallback(wrapUp) 
     137 
     138    def detachChild(self, childID): 
     139        result = jobs.JobManager.detachChild(self, childID) 
     140        if childID in self.children: 
     141            process = self.children.pop(childID) 
     142            process.loseConnection() 
     143        return result 
    112144 
    113145 
     
    118150    """ 
    119151    root = jobs.ChildRoot() 
     152    root.trusted = True 
    120153    factory = pb.PBServerFactory(root) 
    121154    broker = factory.buildProtocol(('127.0.0.1',)) 
  • projects/AsynQueue/branches/processworker/asynqueue/test/test_jobs.py

    r77 r78  
    3737 
    3838class Mock_Root(mock.Mock): 
    39     def __init__(self): 
     39    def __init__(self, bogus=False): 
     40        self.bogus = bogus 
    4041        self.callbacks = [] 
    4142        self.jobs = {} 
     
    4546 
    4647    def callRemote(self, called, *args, **kw): 
    47         if called == 'newJob': 
     48        if not self.bogus and called == 'newJob': 
    4849            jobID, jobCode = args[:2] 
    4950            namespace = {} 
     
    5152            self.jobs[jobID] = namespace 
    5253            result = True 
    53         elif called == 'runJob': 
     54        elif not self.bogus and called == 'runJob': 
    5455            namespace = self.jobs[args[0]] 
    5556            calledObject = namespace[args[1]] 
    5657            result = calledObject(*args[2:], **kw) 
     58        elif called == 'exit': 
     59            result = None 
    5760        else: 
    5861            result = pb.CopiedFailure(flavors.NoSuchMethod()) 
     
    110113        return self.mgr.shutdown() 
    111114 
    112     def _attach(self): 
     115    def _attach(self, bogus=False): 
    113116        def attached(childID): 
    114117            self.childID = childID 
    115118         
    116         self.root = Mock_Root(
     119        self.root = Mock_Root(bogus
    117120        return self.mgr.attachChild(self.root).addCallback(attached) 
    118121 
     
    124127        self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 
    125128        return self._attach().addCallback(check) 
     129 
     130    def test_attachChild_bogus(self): 
     131        def check(result): 
     132            self.failUnless(isinstance(result, failure.Failure)) 
     133 
     134        self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 
     135        return self._attach(bogus=True).addBoth(check) 
    126136 
    127137    def test_new(self): 
     
    136146        return d 
    137147 
    138     def test_run(self): 
     148    def test_run_one(self): 
    139149        def check(result): 
    140150            self.failUnlessEqual(result, 50) 
     
    145155        d.addCallback(check) 
    146156        return d 
     157 
     158    @defer.deferredGenerator 
     159    def test_run_several_sequentially(self): 
     160        results = [] 
     161        yield defer.waitForDeferred(self._attach()) 
     162        wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 
     163        yield wfd 
     164        jobID = wfd.getResult() 
     165        for x in xrange(10): 
     166            wfd = defer.waitForDeferred(self.mgr.run(jobID, 'test', x, 0)) 
     167            yield wfd 
     168            results.append(wfd.getResult()) 
     169        self.failUnlessEqual(results, range(10)) 
     170 
     171    @defer.deferredGenerator 
     172    def test_run_several_queued(self): 
     173        results = [] 
     174        yield defer.waitForDeferred(self._attach()) 
     175        wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 
     176        yield wfd 
     177        jobID = wfd.getResult() 
     178        dList = [] 
     179        for x in xrange(10): 
     180            d = self.mgr.run(jobID, 'test', x, 0) 
     181            d.addCallback(results.append) 
     182            dList.append(d) 
     183        yield defer.waitForDeferred(defer.DeferredList(dList)) 
     184        self.failUnlessEqual(results, range(10)) 
  • projects/AsynQueue/branches/processworker/asynqueue/workers.py

    r77 r78  
    236236        self.resignators = [] 
    237237        self.disconnectErrors = (pb.DeadReferenceError, pb.PBConnectionLost) 
    238         remoteReference.notifyOnDisconnect(self._resign) 
     238        remoteReference.notifyOnDisconnect(self.resign) 
    239239        # Setup attributes 
    240240        self.N = N 
     
    258258    def _oops(self, failure): 
    259259        if failure.check(*self.disconnectErrors): 
    260             self._resign() 
     260            self.resign() 
    261261        else: 
    262262            return failure 
     
    268268        task.d.callback(result) 
    269269     
    270     def _resign(self, *null): 
     270    def resign(self, *null): 
    271271        while self.resignators: 
    272272            callableObject = self.resignators.pop()