Changeset 213
- Timestamp:
- 06/22/08 00:19:21 (2 months ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (1 diff)
- projects/AsynQueue/trunk/asynqueue/test/test_processworker.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/jobs.py
r126 r213 366 366 all new workers run the task for that job before they run any other 367 367 tasks for it. 368 """ 369 if jobID not in self.updates: 370 self.updates[jobID] = [] 371 self.updates[jobID].append([callName, args, kw, []]) 372 dList = [ 373 self._runUpdate(jobID, worker) for worker in self.queue.workers()] 368 369 The updates are run via a direct remoteCall to each worker, not through 370 the queue. Because of the disconnect between queued and direct calls, 371 it is likely but not guaranteed that any jobs you have queued when this 372 method is called will run on a particular worker B{after} this update 373 is run. Wait for the deferred from this method to fire before queuing 374 any jobs that need the update to be in place before running. 375 376 If you don't want the task saved to the update list, but only run on 377 the workers currently attached, set the I{ephemeral} keyword C{True}. 378 """ 379 ephemeral = kw.pop('ephemeral', False) 380 if ephemeral: 381 dList = [ 382 worker.remoteCaller('runJob', jobID, callName, *args, **kw) 383 for worker in self.queue.workers()] 384 else: 385 if jobID not in self.updates: 386 self.updates[jobID] = [] 387 self.updates[jobID].append([callName, args, kw, []]) 388 dList = [ 389 self._runUpdate(jobID, worker) 390 for worker in self.queue.workers()] 374 391 return defer.DeferredList(dList) 375 392 projects/AsynQueue/trunk/asynqueue/test/test_processworker.py
r91 r213 22 22 """ 23 23 24 from twisted.internet import defer 24 25 import mock, processworker 25 26 … … 49 50 50 51 return self.mgr.spawnChild().addCallback(check) 52 53 @defer.deferredGenerator 54 def test_callOnChild(self): 55 wfd = defer.waitForDeferred(self.mgr.spawnChild()) 56 yield wfd 57 process, root = wfd.getResult() 58 self.mgr.children = {1:process} 59 wfd = defer.waitForDeferred( 60 root.callRemote('newJob', JOB_ID, JOB_CODE)) 61 yield wfd 62 self.failUnless(wfd.getResult(), "Child didn't accept job") 63 wfd = defer.waitForDeferred( 64 root.callRemote('runJob', 1, 'test', 10, 20)) 65 yield wfd 66 status, result = wfd.getResult() 67 self.failUnless(status, "Call failed") 68 self.failUnlessEqual(result, 50) 69 70 @defer.deferredGenerator 71 def test_runMultipleChildren(self): 72 N = 3 73 N_iter = 100 74 wfd = defer.waitForDeferred(self.mgr.startup(N)) 75 yield wfd 76 childIDs = wfd.getResult() 77 self.failUnlessEqual(len(childIDs), N) 78 wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 79 yield wfd 80 jobID = wfd.getResult() 81 dList = [] 82 for k in xrange(N_iter): 83 expectedResult = 10*k + 2*20*k + 3*30*k 84 d = self.mgr.run(jobID, 'test', 10*k, 20*k, 30*k) 85 d.addCallback(self.failUnlessEqual, expectedResult) 86 dList.append(d) 87 wfd = defer.waitForDeferred(defer.DeferredList(dList)) 88 yield wfd 89 wfd.getResult() 90 91 92 93
