Changeset 92
- Timestamp:
- 10/04/07 21:29:25 (1 year ago)
- Files:
-
- projects/AsynCluster/trunk/coreworker (modified) (1 diff)
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (4 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_base.py (modified) (1 diff)
- projects/AsynQueue/trunk/asynqueue/test/test_jobs.py (modified) (3 diffs)
- projects/sAsync/branches/smartbroker.py (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/coreworker
r89 r92 29 29 30 30 # Let 'er rip... 31 main.Base Worker()31 main.BaseManager() projects/AsynQueue/trunk/asynqueue/jobs.py
r91 r92 189 189 def jobTried(self, result, jobID, worker): 190 190 """ 191 Callback f or loading a new job or running a call on an existing one.191 Callback from loading a new job. 192 192 193 193 If the worker's root reference raised an unexpected failure, returns … … 199 199 log("Worker %d supplied nonconforming root reference", worker.ID) 200 200 return False 201 if result[0]: 202 msg = "Callable objects: %s" % ", ".join(result[1]) 203 log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg) 204 self.queue.qualifyWorker(worker, jobID) 205 return True 206 log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1]) 207 return None 201 if isinstance(result, (list, tuple)): 202 if result[0]: 203 msg = "Callable objects: %s" % ", ".join(result[1]) 204 log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg) 205 self.queue.qualifyWorker(worker, jobID) 206 return True 207 log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1]) 208 return None 209 # Not a failure or status,result tuple, so just pass it along 210 return result 208 211 209 212 def attachChild(self, childRoot, N=3): … … 299 302 300 303 """ 304 def jobRan(result): 305 status, result = result 306 if status: 307 return result 308 log("Error running job %d:\n%s", jobID, result) 309 301 310 jobID = int(jobID) 302 311 if jobID not in self.jobs: … … 304 313 kw['series'] = jobID 305 314 kw['niceness'] = self.jobs[jobID][1] 306 return self.queue.call(callName, *args, **kw) 315 d = self.queue.call(callName, *args, **kw) 316 d.addBoth(jobRan) 317 return d 307 318 308 319 def cancel(self, jobID): projects/AsynQueue/trunk/asynqueue/test/test_base.py
r91 r92 129 129 return d 130 130 131 def test_cancelSeries(self):132 self.fail("Need to test this!")projects/AsynQueue/trunk/asynqueue/test/test_jobs.py
r91 r92 201 201 202 202 def test_run_one(self): 203 def check(result):204 self.failUnlessEqual(result, (True, 50))205 206 203 d = self._attach() 207 204 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 208 205 d.addCallback(self.mgr.run, 'test', 10, 20) 209 d.addCallback( check)206 d.addCallback(self.failUnlessEqual, 50) 210 207 return d 211 208 … … 221 218 yield wfd 222 219 results.append(wfd.getResult()) 223 self.failUnlessEqual( [x[1] for x in results], range(10))220 self.failUnlessEqual(results, range(10)) 224 221 225 222 @defer.deferredGenerator … … 236 233 dList.append(d) 237 234 yield defer.waitForDeferred(defer.DeferredList(dList)) 238 self.failUnlessEqual( [x[1] for x in results], range(10))235 self.failUnlessEqual(results, range(10))
