Changeset 103

Show
Ignore:
Timestamp:
11/19/07 22:35:05 (1 year ago)
Author:
edsuom
Message:

Now can line up multiple update calls per job

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r95 r103  
    169169    instantiate my own if not. 
    170170 
     171    I maintain a dict I{updates} of update tasks to perform for each jobID 
     172    before any (further) runs for that job. Each sequence has four elements:: 
     173 
     174        [funcName, args, kw, workersUpdated] 
     175 
     176    When a worker runs a given update task, that worker's ID is appended to the 
     177    I{workersUpdate} list that is the fourth element of I{updates}. That will 
     178    indicate that it needs not run the update task again. 
     179 
    171180    @ivar queue: The TaskQueue instance I'm using. 
    172181     
     
    286295 
    287296    def _runUpdate(self, jobID, worker): 
    288         funcName, args, kw = self.updates[jobID] 
    289         return worker.remoteCaller('runJob', jobID, funcName, *args, **kw) 
    290  
     297        dList = [] 
     298        for funcName, args, kw, workersUpdated in self.updates[jobID]: 
     299            if worker.ID in workersUpdated: 
     300                continue 
     301            d = worker.remoteCaller('runJob', jobID, funcName, *args, **kw) 
     302            d.addCallback(lambda _: workersUpdated.append(worker.ID)) 
     303            dList.append(d) 
     304        return defer.DeferredList(dList) 
     305     
    291306    def update(self, jobID, callName, *args, **kw): 
    292307        """ 
    293         """ 
    294         self.updates[jobID] = callName, args, kw 
     308        Appends a new task to the update list for the specified I{jobID}, 
     309        running the new update task on all workers currently attached. 
     310        """ 
     311        if jobID not in self.updates: 
     312            self.updates[jobID] = [] 
     313        self.updates[jobID].append([callName, args, kw, []]) 
    295314        dList = [ 
    296             self._runUpdate(jobID, worker) 
    297             for worker in self.queue.workers()] 
     315            self._runUpdate(jobID, worker) for worker in self.queue.workers()] 
    298316        return defer.DeferredList(dList) 
    299317     
  • projects/AsynQueue/trunk/asynqueue/test/test_jobs.py

    r96 r103  
    3737    return G 
    3838 
     39def total(): 
     40    return sum(G) 
     41 
    3942def test(a, b, c=0): 
    4043    return a + 2*b + 3*c 
     
    5457        result = self.root.remote_newJob(JOB_ID, JOB_CODE) 
    5558        self.failUnlessEqual(result[0], True) 
    56         self.failUnlessElementsEqual(result[1], ['setup', 'test', 'bogusable']) 
     59        self.failUnlessElementsEqual( 
     60            result[1], ['setup', 'total', 'test', 'bogusable']) 
    5761     
    5862    def test_newJob_bogus(self): 
     
    217221        return d 
    218222 
     223    def test_run_updates(self): 
     224        def gotJobID(jobID): 
     225            d = self.mgr.update(jobID, 'setup', 1) 
     226            d.addCallback(lambda _: self.mgr.update(jobID, 'setup', 2)) 
     227            d.addCallback(lambda _: self.mgr.run(jobID, 'total')) 
     228            d.addCallback(self.failUnlessEqual, 3) 
     229            return d 
     230         
     231        d = self._attach() 
     232        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
     233        d.addCallback(gotJobID) 
     234        return d 
     235 
    219236    def test_run_one(self): 
    220237        d = self._attach()