Changeset 213

Show
Ignore:
Timestamp:
06/22/08 00:19:21 (2 months ago)
Author:
edsuom
Message:

Improved job updating, unit tests for processworker

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r126 r213  
    366366        all new workers run the task for that job before they run any other 
    367367        tasks for it. 
    368         """ 
    369         if jobID not in self.updates: 
    370             self.updates[jobID] = [] 
    371         self.updates[jobID].append([callName, args, kw, []]) 
    372         dList = [ 
    373             self._runUpdate(jobID, worker) for worker in self.queue.workers()] 
     368 
     369        The updates are run via a direct remoteCall to each worker, not through 
     370        the queue. Because of the disconnect between queued and direct calls, 
     371        it is likely but not guaranteed that any jobs you have queued when this 
     372        method is called will run on a particular worker B{after} this update 
     373        is run. Wait for the deferred from this method to fire before queuing 
     374        any jobs that need the update to be in place before running. 
     375 
     376        If you don't want the task saved to the update list, but only run on 
     377        the workers currently attached, set the I{ephemeral} keyword C{True}. 
     378        """ 
     379        ephemeral = kw.pop('ephemeral', False) 
     380        if ephemeral: 
     381            dList = [ 
     382                worker.remoteCaller('runJob', jobID, callName, *args, **kw) 
     383                for worker in self.queue.workers()] 
     384        else: 
     385            if jobID not in self.updates: 
     386                self.updates[jobID] = [] 
     387            self.updates[jobID].append([callName, args, kw, []]) 
     388            dList = [ 
     389                self._runUpdate(jobID, worker) 
     390                for worker in self.queue.workers()] 
    374391        return defer.DeferredList(dList) 
    375392 
  • projects/AsynQueue/trunk/asynqueue/test/test_processworker.py

    r91 r213  
    2222""" 
    2323 
     24from twisted.internet import defer 
    2425import mock, processworker 
    2526 
     
    4950         
    5051        return self.mgr.spawnChild().addCallback(check) 
     52 
     53    @defer.deferredGenerator 
     54    def test_callOnChild(self): 
     55        wfd = defer.waitForDeferred(self.mgr.spawnChild()) 
     56        yield wfd 
     57        process, root = wfd.getResult() 
     58        self.mgr.children = {1:process} 
     59        wfd = defer.waitForDeferred( 
     60            root.callRemote('newJob', JOB_ID, JOB_CODE)) 
     61        yield wfd 
     62        self.failUnless(wfd.getResult(), "Child didn't accept job") 
     63        wfd = defer.waitForDeferred( 
     64            root.callRemote('runJob', 1, 'test', 10, 20)) 
     65        yield wfd 
     66        status, result = wfd.getResult() 
     67        self.failUnless(status, "Call failed") 
     68        self.failUnlessEqual(result, 50) 
     69         
     70    @defer.deferredGenerator 
     71    def test_runMultipleChildren(self): 
     72        N = 3 
     73        N_iter = 100 
     74        wfd = defer.waitForDeferred(self.mgr.startup(N)) 
     75        yield wfd 
     76        childIDs = wfd.getResult() 
     77        self.failUnlessEqual(len(childIDs), N) 
     78        wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 
     79        yield wfd 
     80        jobID = wfd.getResult() 
     81        dList = [] 
     82        for k in xrange(N_iter): 
     83            expectedResult = 10*k + 2*20*k + 3*30*k 
     84            d = self.mgr.run(jobID, 'test', 10*k, 20*k, 30*k) 
     85            d.addCallback(self.failUnlessEqual, expectedResult) 
     86            dList.append(d) 
     87        wfd = defer.waitForDeferred(defer.DeferredList(dList)) 
     88        yield wfd 
     89        wfd.getResult() 
     90         
     91 
     92                                     
     93