Changeset 103
- Timestamp:
- 11/19/07 22:35:05 (1 year ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (2 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_jobs.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/jobs.py
r95 r103 169 169 instantiate my own if not. 170 170 171 I maintain a dict I{updates} of update tasks to perform for each jobID 172 before any (further) runs for that job. Each sequence has four elements:: 173 174 [funcName, args, kw, workersUpdated] 175 176 When a worker runs a given update task, that worker's ID is appended to the 177 I{workersUpdate} list that is the fourth element of I{updates}. That will 178 indicate that it needs not run the update task again. 179 171 180 @ivar queue: The TaskQueue instance I'm using. 172 181 … … 286 295 287 296 def _runUpdate(self, jobID, worker): 288 funcName, args, kw = self.updates[jobID] 289 return worker.remoteCaller('runJob', jobID, funcName, *args, **kw) 290 297 dList = [] 298 for funcName, args, kw, workersUpdated in self.updates[jobID]: 299 if worker.ID in workersUpdated: 300 continue 301 d = worker.remoteCaller('runJob', jobID, funcName, *args, **kw) 302 d.addCallback(lambda _: workersUpdated.append(worker.ID)) 303 dList.append(d) 304 return defer.DeferredList(dList) 305 291 306 def update(self, jobID, callName, *args, **kw): 292 307 """ 293 """ 294 self.updates[jobID] = callName, args, kw 308 Appends a new task to the update list for the specified I{jobID}, 309 running the new update task on all workers currently attached. 310 """ 311 if jobID not in self.updates: 312 self.updates[jobID] = [] 313 self.updates[jobID].append([callName, args, kw, []]) 295 314 dList = [ 296 self._runUpdate(jobID, worker) 297 for worker in self.queue.workers()] 315 self._runUpdate(jobID, worker) for worker in self.queue.workers()] 298 316 return defer.DeferredList(dList) 299 317 projects/AsynQueue/trunk/asynqueue/test/test_jobs.py
r96 r103 37 37 return G 38 38 39 def total(): 40 return sum(G) 41 39 42 def test(a, b, c=0): 40 43 return a + 2*b + 3*c … … 54 57 result = self.root.remote_newJob(JOB_ID, JOB_CODE) 55 58 self.failUnlessEqual(result[0], True) 56 self.failUnlessElementsEqual(result[1], ['setup', 'test', 'bogusable']) 59 self.failUnlessElementsEqual( 60 result[1], ['setup', 'total', 'test', 'bogusable']) 57 61 58 62 def test_newJob_bogus(self): … … 217 221 return d 218 222 223 def test_run_updates(self): 224 def gotJobID(jobID): 225 d = self.mgr.update(jobID, 'setup', 1) 226 d.addCallback(lambda _: self.mgr.update(jobID, 'setup', 2)) 227 d.addCallback(lambda _: self.mgr.run(jobID, 'total')) 228 d.addCallback(self.failUnlessEqual, 3) 229 return d 230 231 d = self._attach() 232 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 233 d.addCallback(gotJobID) 234 return d 235 219 236 def test_run_one(self): 220 237 d = self._attach()
