Changeset 78
- Timestamp:
- 08/18/07 00:49:01 (1 year ago)
- Files:
-
- projects/AsynQueue/branches/processworker/asynqueue/jobs.py (modified) (4 diffs)
- projects/AsynQueue/branches/processworker/asynqueue/processworker.py (modified) (8 diffs)
- projects/AsynQueue/branches/processworker/asynqueue/test/test_jobs.py (modified) (7 diffs)
- projects/AsynQueue/branches/processworker/asynqueue/test/test_processworker.py (added)
- projects/AsynQueue/branches/processworker/asynqueue/workers.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/branches/processworker/asynqueue/jobs.py
r77 r78 25 25 from zope.interface import implements, Interface 26 26 from twisted.internet import defer 27 from twisted.python.failure import Failure 27 28 from twisted.spread import pb, flavors 28 29 … … 122 123 # This task's deferred is NOT returned! 123 124 125 def stop(self): 126 d = workers.RemoteCallWorker.stop(self) 127 d.addBoth(lambda _: self.remoteCaller('exit')) 128 return d 129 124 130 125 131 class JobManager(object): … … 130 136 connection to the interpreters and balance the load across the workers 131 137 while still permitting some priority queueing of jobs by niceness. 132 """ 133 def __init__(self): 138 139 You can supply an instance of L{base.TaskQueue} to the constructor. I will 140 instantiate my own if not. 141 142 @ivar queue: The TaskQueue instance I'm using. 143 144 """ 145 def __init__(self, queue=None): 134 146 self.jobs = {} 135 self.queue = base.TaskQueue() 136 147 if queue is None: 148 self.queue = base.TaskQueue() 149 else: 150 self.queue = queue 151 137 152 def shutdown(self): 138 153 """ … … 151 166 152 167 """ 153 def tried(success, worker,jobID):168 def tried(success, jobID): 154 169 if success: 155 170 self.queue.qualifyWorker(worker, jobID) 156 171 157 172 def oops(failure): 158 173 if failure.check(flavors.NoSuchMethod): 159 raise TypeError("Supplied root reference doesn't conform") 160 174 result[0] = Failure( 175 TypeError("Supplied root reference doesn't conform")) 176 161 177 worker = ChildWorker(childRoot) 162 childID = self.queue.attachWorker(worker)178 result = [self.queue.attachWorker(worker)] 163 179 dList = [] 164 180 for jobID, jobInfo in self.jobs.iteritems(): 165 181 jobCode = jobInfo[0] 166 182 d = childRoot.callRemote('newJob', jobID, jobCode) 167 d.addCallback(tried, worker,jobID)183 d.addCallback(tried, jobID) 168 184 d.addErrback(oops) 169 185 dList.append(d) 170 return defer.DeferredList(dList).addCallback(lambda _: childID)186 return defer.DeferredList(dList).addCallback(lambda _: result[0]) 171 187 172 188 def detachChild(self, childID): projects/AsynQueue/branches/processworker/asynqueue/processworker.py
r77 r78 24 24 """ 25 25 26 import sys 26 27 from twisted.internet import protocol, stdio, reactor, defer 27 28 from twisted.spread import pb … … 34 35 I wrap a L{Protocol} instance in a L{ProcessProtocol} instance so that... 35 36 """ 36 def __init__(self, proto ):37 def __init__(self, proto, startCallback=None, stopCallback=None): 37 38 self.proto = proto 39 self.startCallback = startCallback 40 self.stopCallback = stopCallback 38 41 39 42 def connectionMade(self): … … 42 45 """ 43 46 self.proto.connectionMade() 47 if callable(self.startCallback): 48 self.startCallback() 44 49 45 50 def outReceived(self, data): … … 54 59 When the process ends, the connection is lost. 55 60 """ 56 self. proto.connectionLost(reason)61 self.connectionLost(reason) 57 62 58 63 def makeConnection(self, transport): … … 60 65 """ 61 66 self.proto.transport = transport 62 self. proto.connectionMade()67 self.connectionMade() 63 68 64 69 def dataReceived(self, data): … … 73 78 """ 74 79 self.proto.connectionLost(reason) 80 if callable(self.stopCallback): 81 self.stopCallback() 75 82 76 83 77 class Parent(object):84 class ChildManager(jobs.JobManager): 78 85 """ 79 I manage a pool of one or more child python interpreters, attaching and 80 detaching them as workers to and from the supplied taskqueue. 81 82 @ivar d: A deferred that fires when all of the interpreters have been 83 started and the queue is ready to accept jobs for them. 84 86 I am a L{jobs.JobManager} that manages a pool of one or more child python 87 interpreters as at least some of its workers. 85 88 """ 86 def __init__(self, queue, N=1): 87 def gotRoot(root): 88 worker = jobs.ChildWorker(root) 89 return queue.attachWorker(worker) 90 91 dList = [self.spawnChild().addCallback(gotRoot) for k in xrange(N)] 92 self.d = defer.DeferredList(dList) 89 @defer.deferredGenerator 90 def startup(self, N=1): 91 """ 92 Starts I{N} child interpreters and attaches them to my queue. Returns a 93 deferred that fires with a list of the IDs for the interpreters when 94 they have all been started and the queue is ready to accept jobs for 95 them. 96 """ 97 self.children = {} 98 for k in xrange(N): 99 wfd = defer.waitForDeferred(self.spawnChild()) 100 yield wfd 101 process, root = wfd.getResult() 102 wfd = defer.waitForDeferred(self.attachChild(root)) 103 yield wfd 104 childID = wfd.getResult() 105 self.children[childID] = process 106 yield self.children.keys() 93 107 94 108 def spawnChild(self): … … 99 113 its server-side PB broker. 100 114 101 Returns a deferred that fires with a reference to the child'sroot102 object .115 Returns a deferred that fires with the child's process and PB root 116 objects. 103 117 """ 104 118 factory = pb.PBClientFactory() 105 119 broker = factory.buildProtocol(('127.0.0.1',)) 106 wrappedProtocol = ProtocolWrapper(broker) 107 d = reactor.spawnProcess( 108 wrappedProtocol, sys.executable, 109 args=(sys.executable, __file__), env=None) 120 d = defer.Deferred() 121 wrappedProtocol = ProtocolWrapper( 122 broker, startCallback=lambda: d.callback(None)) 123 args = (sys.executable, __file__) 124 process = reactor.spawnProcess( 125 wrappedProtocol, sys.executable, args=args, env=None) 110 126 d.addCallback(lambda _: factory.getRootObject()) 127 d.addCallback(lambda root: (process, root)) 111 128 return d 129 130 def shutdown(self): 131 def wrapUp(null): 132 for process in self.children.itervalues(): 133 process.loseConnection() 134 self.children.clear() 135 136 return jobs.JobManager.shutdown(self).addCallback(wrapUp) 137 138 def detachChild(self, childID): 139 result = jobs.JobManager.detachChild(self, childID) 140 if childID in self.children: 141 process = self.children.pop(childID) 142 process.loseConnection() 143 return result 112 144 113 145 … … 118 150 """ 119 151 root = jobs.ChildRoot() 152 root.trusted = True 120 153 factory = pb.PBServerFactory(root) 121 154 broker = factory.buildProtocol(('127.0.0.1',)) projects/AsynQueue/branches/processworker/asynqueue/test/test_jobs.py
r77 r78 37 37 38 38 class Mock_Root(mock.Mock): 39 def __init__(self): 39 def __init__(self, bogus=False): 40 self.bogus = bogus 40 41 self.callbacks = [] 41 42 self.jobs = {} … … 45 46 46 47 def callRemote(self, called, *args, **kw): 47 if called == 'newJob':48 if not self.bogus and called == 'newJob': 48 49 jobID, jobCode = args[:2] 49 50 namespace = {} … … 51 52 self.jobs[jobID] = namespace 52 53 result = True 53 elif called == 'runJob':54 elif not self.bogus and called == 'runJob': 54 55 namespace = self.jobs[args[0]] 55 56 calledObject = namespace[args[1]] 56 57 result = calledObject(*args[2:], **kw) 58 elif called == 'exit': 59 result = None 57 60 else: 58 61 result = pb.CopiedFailure(flavors.NoSuchMethod()) … … 110 113 return self.mgr.shutdown() 111 114 112 def _attach(self ):115 def _attach(self, bogus=False): 113 116 def attached(childID): 114 117 self.childID = childID 115 118 116 self.root = Mock_Root( )119 self.root = Mock_Root(bogus) 117 120 return self.mgr.attachChild(self.root).addCallback(attached) 118 121 … … 124 127 self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 125 128 return self._attach().addCallback(check) 129 130 def test_attachChild_bogus(self): 131 def check(result): 132 self.failUnless(isinstance(result, failure.Failure)) 133 134 self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 135 return self._attach(bogus=True).addBoth(check) 126 136 127 137 def test_new(self): … … 136 146 return d 137 147 138 def test_run (self):148 def test_run_one(self): 139 149 def check(result): 140 150 self.failUnlessEqual(result, 50) … … 145 155 d.addCallback(check) 146 156 return d 157 158 @defer.deferredGenerator 159 def test_run_several_sequentially(self): 160 results = [] 161 yield defer.waitForDeferred(self._attach()) 162 wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 163 yield wfd 164 jobID = wfd.getResult() 165 for x in xrange(10): 166 wfd = defer.waitForDeferred(self.mgr.run(jobID, 'test', x, 0)) 167 yield wfd 168 results.append(wfd.getResult()) 169 self.failUnlessEqual(results, range(10)) 170 171 @defer.deferredGenerator 172 def test_run_several_queued(self): 173 results = [] 174 yield defer.waitForDeferred(self._attach()) 175 wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 176 yield wfd 177 jobID = wfd.getResult() 178 dList = [] 179 for x in xrange(10): 180 d = self.mgr.run(jobID, 'test', x, 0) 181 d.addCallback(results.append) 182 dList.append(d) 183 yield defer.waitForDeferred(defer.DeferredList(dList)) 184 self.failUnlessEqual(results, range(10)) projects/AsynQueue/branches/processworker/asynqueue/workers.py
r77 r78 236 236 self.resignators = [] 237 237 self.disconnectErrors = (pb.DeadReferenceError, pb.PBConnectionLost) 238 remoteReference.notifyOnDisconnect(self. _resign)238 remoteReference.notifyOnDisconnect(self.resign) 239 239 # Setup attributes 240 240 self.N = N … … 258 258 def _oops(self, failure): 259 259 if failure.check(*self.disconnectErrors): 260 self. _resign()260 self.resign() 261 261 else: 262 262 return failure … … 268 268 task.d.callback(result) 269 269 270 def _resign(self, *null):270 def resign(self, *null): 271 271 while self.resignators: 272 272 callableObject = self.resignators.pop()
