Changeset 112
- Timestamp:
- 11/26/07 16:30:56 (1 year ago)
- Files:
-
- projects/AsynQueue/branches/debug_reassign (copied) (copied from projects/AsynQueue/trunk)
- projects/AsynQueue/branches/debug_reassign/asynqueue/base.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/base.py) (4 diffs)
- projects/AsynQueue/branches/debug_reassign/asynqueue/debug.py (added)
- projects/AsynQueue/branches/debug_reassign/asynqueue/jobs.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/jobs.py) (9 diffs)
- projects/AsynQueue/branches/debug_reassign/asynqueue/processworker.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/processworker.py)
- projects/AsynQueue/branches/debug_reassign/asynqueue/tasks.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/tasks.py) (6 diffs)
- projects/AsynQueue/branches/debug_reassign/asynqueue/test/mock.py (modified) (2 diffs)
- projects/AsynQueue/branches/debug_reassign/asynqueue/test/test_base.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/test/test_base.py)
- projects/AsynQueue/branches/debug_reassign/asynqueue/test/test_jobs.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/test/test_jobs.py) (13 diffs)
- projects/AsynQueue/branches/debug_reassign/asynqueue/test/test_processworker.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/test/test_processworker.py)
- projects/AsynQueue/branches/debug_reassign/asynqueue/test/test_tasks.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/test/test_tasks.py)
- projects/AsynQueue/branches/debug_reassign/asynqueue/test/util.py (modified) (1 diff)
- projects/AsynQueue/branches/debug_reassign/asynqueue/workers.py (copied) (copied from projects/AsynQueue/trunk/asynqueue/workers.py) (5 diffs)
- projects/AsynQueue/branches/debug_reassign/ez_setup.py (copied) (copied from projects/AsynQueue/trunk/ez_setup.py)
- projects/AsynQueue/trunk/asynqueue/base.py (modified) (4 diffs)
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (8 diffs)
- projects/AsynQueue/trunk/asynqueue/tasks.py (modified) (6 diffs)
- projects/AsynQueue/trunk/asynqueue/test/mock.py (modified) (2 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_jobs.py (modified) (14 diffs)
- projects/AsynQueue/trunk/asynqueue/test/util.py (modified) (1 diff)
- projects/AsynQueue/trunk/asynqueue/workers.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/branches/debug_reassign/asynqueue/base.py
r90 r112 189 189 break 190 190 wfd = defer.waitForDeferred(self.mgr.assignment(task)) 191 yield wfd 191 yield wfd; wfd.getResult() 192 192 # Clean up after the loop exits 193 193 wfd = defer.waitForDeferred(self.mgr.shutdown(self.timeout)) … … 195 195 self.heap.shutdown() 196 196 # The result of the runner is a list of any unfinished tasks. 197 yield wfd.getResult() 197 result = [] 198 try: 199 result = wfd.getResult() 200 except: 201 pass 202 yield result 198 203 199 204 if self.isRunning(): … … 248 253 if worker == workerOrID: 249 254 return thisID 250 raise ValueError("No such worker")251 255 252 256 def detachWorker(self, workerOrID, reassign=False, crash=False): … … 261 265 See L{tasks.WorkerManager.terminate}. 262 266 """ 263 def terminated(unfinishedTasks):264 for task in unfinishedTasks:265 self.mgr.assignment(task)266 267 267 ID = self._getWorkerID(workerOrID) 268 if ID is None: 269 return 268 270 if crash: 269 d = self.mgr.terminate(ID, crash=True )271 d = self.mgr.terminate(ID, crash=True, reassign=reassign) 270 272 else: 271 d = self.mgr.terminate(ID, self.timeout) 272 if reassign: 273 d.addCallback(terminated) 273 d = self.mgr.terminate(ID, self.timeout, reassign=reassign) 274 274 return d 275 275 projects/AsynQueue/branches/debug_reassign/asynqueue/jobs.py
r110 r112 37 37 38 38 39 from debug import dTrace 40 41 39 42 class TrustError(Exception): 40 43 pass … … 136 139 % (callName, jobID))) 137 140 138 def remote_exit(self ):141 def remote_exit(self, stopReactor=False): 139 142 """ 140 143 Terminates my child worker process, calling and waiting for any … … 147 150 if callable(possibleShutdownFunction): 148 151 dList.append(defer.maybeDeferred(possibleShutdownFunction)) 149 return defer.DeferredList(dList).addCallback(lambda _: reactor.stop()) 152 d = defer.DeferredList(dList) 153 if stopReactor: 154 d.addCallback(lambda _: reactor.stop()) 155 return d 150 156 151 157 … … 172 178 d = workers.RemoteCallWorker.stop(self) 173 179 d.addBoth(lambda _: self.remoteCaller('exit')) 180 d.addErrback(lambda _: None) 174 181 return d 175 182 … … 214 221 Shuts down my task queue, returning a deferred that fires when the 215 222 queue has emptied and all interpreter workers have finished and been 216 terminated. 217 """ 218 return self.queue.shutdown() 223 terminated. The task queue shutdown takes care of shutting down 224 everything else, including any attached workers. 225 """ 226 return self.queue.shutdown().addCallback(lambda _: dTrace()) 219 227 220 228 def jobTried(self, result, jobID, worker): … … 306 314 """ 307 315 jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 308 print "NEW", jobID309 316 self.callsPending[jobID] = {} 310 317 self.jobs[jobID] = [jobCode, niceness] … … 384 391 at approximately half the rate of calls for a job with niceness N. 385 392 393 All keywords except for the following are passed to the call: 394 395 - B{timeout}: A timeout interval in seconds after which the call will 396 be retried. 397 386 398 @note: The task object generated contains the name of a callable (as a 387 399 string) for the first element of its I{callTuple} attribute, instead … … 392 404 393 405 """ 394 def jobRan(result, d): 406 def queueJob(doNext=False): 407 if doNext: 408 kw['doNext'] = True 409 dq = self.queue.call(callName, *args, **kw) 410 print "QJ", timeout 411 if timeout: 412 dq.setTimeout( 413 timeout, 414 timeoutFunc=lambda d: jobRan((False, "Timeout"))) 415 dq.addErrback(jobFailed) 416 dq.addCallback(jobRan) 417 418 def jobRan(result): 395 419 status, result = result 396 retryCount, callName, args, kw = self.callsPending[jobID].pop(d)397 420 if status: 398 return result 399 log("Error running job %d:\n%s", jobID, result) 400 if retryCount < self.maxRetries: 401 kw['retryCount'] = retryCount + 1 402 return self.run(jobID, callName, *args, **kw) 403 404 def jobFailed(failure, d): 405 # TODO: Do something different here, as it's a more drastic 406 # situation 407 self.callsPending[jobID].pop(d) 408 log("Unexpected error running job %d:\n%s", 409 jobID, failure.getTraceback()) 410 421 del self.callsPending[jobID][d] 422 d.callback(result) 423 else: 424 log("Error running job %d:\n%s", jobID, result) 425 tryAgain() 426 427 def jobFailed(failure): 428 return False, failure.getTraceback() 429 430 def tryAgain(): 431 if jobID in self.callsPending: 432 retryCount, callName, args, kw = self.callsPending[jobID][d] 433 print "TA", retryCount, callName 434 if True or retryCount < self.maxRetries: 435 self.callsPending[jobID][d][0] = retryCount + 1 436 queueJob(True) 437 return 438 d.callback(None) 439 411 440 jobID = int(jobID) 412 441 if jobID not in self.jobs: 413 442 raise ValueError("No job '%s' registered" % jobID) 443 timeout = kw.pop('timeout', None) 414 444 kw['series'] = jobID 415 445 kw['niceness'] = self.jobs[jobID][1] 416 retryCount = kw.pop('retryCount', 0) 417 d = self.queue.call(callName, *args, **kw) 418 d.addCallback(jobRan, d) 419 d.addErrback(jobFailed, d) 420 self.callsPending[jobID][d] = retryCount, callName, args, kw 446 d = defer.Deferred() 447 self.callsPending[jobID][d] = [0, callName, args, kw] 448 queueJob() 421 449 return d 422 450 … … 430 458 self.updates.pop(jobID, None) 431 459 self.callsPending.pop(jobID, None) 432 433 projects/AsynQueue/branches/debug_reassign/asynqueue/tasks.py
r91 r112 28 28 from errors import ImplementationError 29 29 30 import debug 30 31 31 32 class Task(object): 32 33 """ 33 34 I represent a task that has been dispatched to a queue for running with a 34 given scheduling I{niceness}. I generate a C{Deferred}, accessible as an 35 attribute I{d}, firing it when the task is finally run and its result is 36 obtained. 35 given scheduling I{niceness}. 36 37 I generate a C{Deferred} that you fire by calling either my L{callback} or 38 L{errback} with a result or failure, respectively, when the the task is 39 finally run and its result is obtained. You can call the deferred's 40 versions of those methods directly, but my versions deal with things like 41 repeated callbacks, which happen sometimes with task timeouts. 37 42 38 43 @ivar d: A deferred to the eventual result of the task. … … 52 57 self.d = defer.Deferred() 53 58 59 def called(self): 60 return self.d.called 61 62 def callback(self, result): 63 if not self.d.called: 64 self.d.callback(result) 65 66 def errback(self, result): 67 self.d.errback(result) 68 54 69 def __repr__(self): 55 70 """ … … 309 324 self._workerCounter = workerID 310 325 self.workers[workerID] = worker 326 worker.setResignator( 327 lambda : self.terminate(worker.ID, crash=True, reassign=True)) 311 328 return workerID 312 329 313 def terminate(self, workerID, timeout=None, crash=False ):330 def terminate(self, workerID, timeout=None, crash=False, reassign=False): 314 331 """ 315 332 Removes a worker from my work force, canceling all of its unfullfilled … … 324 341 325 342 @return: A deferred that fires when the worker has been removed, 326 gracefully or not, with a list of the worker's unfinished tasks. 343 gracefully or not, with a list of any tasks left unfinished and not 344 reassigned. 327 345 328 346 """ … … 338 356 return [] 339 357 return result 358 359 def reassignTasks(tasks): 360 for task in tasks: 361 debug.write("RT: %d", id(task.d)) 362 self.assignmentFactory.new(task) 363 return [] 340 364 341 365 worker = self.workers.pop(workerID, None) … … 345 369 self.assignmentFactory.cancelRequests(worker) 346 370 if crash: 347 return defer.succeed(worker.crash()) 348 d = worker.stop() 349 if timeout: 350 callID = reactor.callLater(timeout, crashTheWorker, worker, d) 351 d.addCallback(stopped) 371 d = defer.succeed(worker.crash()) 352 372 else: 353 # No tasks left unfinished if deferred fires without timeout 354 d.addCallback(lambda _: []) 373 d = worker.stop() 374 if timeout: 375 callID = reactor.callLater(timeout, crashTheWorker, worker, d) 376 d.addCallback(stopped) 377 else: 378 # No tasks left unfinished if deferred fires without timeout 379 d.addCallback(lambda _: []) 380 if reassign: 381 d.addCallback(reassignTasks) 355 382 return d 356 383 projects/AsynQueue/branches/debug_reassign/asynqueue/test/mock.py
r79 r112 26 26 from twisted.internet import defer, reactor 27 27 from twisted.trial import unittest 28 29 30 DELAY = 0.05 31 32 def deferToLater(*args, **kw): 33 delay = kw.pop('delay', DELAY) 34 if args: 35 arg = args[0] 36 else: 37 arg = None 38 d = defer.Deferred() 39 if callable(arg): 40 d.addCallback(lambda _: arg(*args[1:], **kw)) 41 reactor.callLater(delay, d.callback, None) 42 else: 43 reactor.callLater(delay, d.callback, arg) 44 return d 45 46 def fireLater(d, value=None, delay=None): 47 """ 48 Fires the supplied deferred I{d} with I{value}, or C{None} if no value 49 supplied, after the standard or specified I{delay}. 50 """ 51 if delay is None: 52 delay = DELAY 53 reactor.callLater(delay, d.callback, value) 28 54 29 55 … … 122 148 123 149 def deferToLater(self, *args, **kw): 124 delay = kw.pop('delay', self.delay) 125 if args: 126 arg = args[0] 127 else: 128 arg = None 129 d = defer.Deferred() 130 if callable(arg): 131 d.addCallback(lambda _: arg(*args[1:], **kw)) 132 reactor.callLater(delay, d.callback, None) 133 else: 134 reactor.callLater(delay, d.callback, arg) 135 return d 136 137 def fireLater(self, d, value=None, delay=None): 138 """ 139 Fires the supplied deferred I{d} with I{value}, or C{None} if no value 140 supplied, after the standard or specified I{delay}. 141 """ 142 if delay is None: 143 delay = self.delay 144 reactor.callLater(delay, d.callback, value) 150 kw['delay'] = self.delay 151 return deferToLater(*args, **kw) 152 153 def fireLater(self, *args, **kw): 154 kw['delay'] = self.delay 155 return fireLater(*args, **kw) 145 156 146 157 projects/AsynQueue/branches/debug_reassign/asynqueue/test/test_jobs.py
r110 r112 28 28 import mock, jobs 29 29 30 #import twisted.internet.base 31 #twisted.internet.base.DelayedCall.debug = True 32 30 33 JOB_ID = 1 31 34 32 35 JOB_CODE = """ 33 36 G = [] 37 TRIES = [] 34 38 35 39 def setup(x): … … 45 49 def bogusable(x): 46 50 return 1.0 / x 51 52 def failFirstTime(): 53 TRIES.append(None) 54 if len(TRIES) == 1: 55 raise Exception 56 return len(TRIES) 47 57 48 58 """ … … 99 109 self.failUnlessEqual(result[0], True) 100 110 self.failUnlessElementsEqual( 101 result[1], ['setup', 'total', 'test', 'bogusable']) 111 result[1], 112 ['setup', 'total', 'test', 'bogusable', 'failFirstTime']) 102 113 103 114 def test_newJob_bogus(self): … … 131 142 def __init__(self, bogus=False): 132 143 self.bogus = bogus 144 self.jobs = {} 133 145 self.callbacks = [] 134 self.jobs = {}135 146 self.registeredClasses = [] 136 147 … … 215 226 216 227 217 class Test_JobManager (mock.TestCase):228 class Test_JobManager_Basics(mock.TestCase): 218 229 def setUp(self): 219 230 jobs.ChildWorker._noTypeCheck = True … … 248 259 return self._attach(bogus=True).addCallback(self.failUnlessEqual, None) 249 260 261 def test_registerClasses(self): 262 def check(null): 263 self.failUnlessElementsEqual( 264 self.root.registeredClasses, stringReps) 265 266 stringReps = ['foo.bar.SomeClass', 'foo.bar.AnotherClass'] 267 d = self._attach() 268 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 269 d.addCallback(lambda _: self.mgr.registerClasses(*stringReps)) 270 d.addCallback(check) 271 return d 272 273 274 class JobManagerBC(mock.TestCase): 275 class TestableChildRoot(jobs.ChildRoot): 276 trusted = True 277 278 def setUp(self): 279 self.servers = [] 280 self.mgr = jobs.JobManager() 281 return self.getReferenceToRoot() 282 283 def getReferenceToRoot(self): 284 def got(ref): 285 self.ref = ref 286 return ref 287 288 root = self.TestableChildRoot() 289 server = reactor.listenTCP(0, pb.PBServerFactory(root)) 290 self.servers.append(server) 291 clientFactory = pb.PBClientFactory() 292 reactor.connectTCP( 293 "127.0.0.1", server.getHost().port, clientFactory) 294 return clientFactory.getRootObject().addCallback(got) 295 296 def tearDown(self): 297 def closeConnection(null): 298 self.ref.broker.transport.loseConnection() 299 for server in self.servers: 300 server.stopListening() 301 302 return self.mgr.shutdown().addCallback(closeConnection) 303 304 def _newJob(self): 305 self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 306 self.mgr.callsPending[JOB_ID] = {} 307 308 def _attach(self, ref): 309 def attached(childID): 310 self.childID = childID 311 return childID 312 313 return self.mgr.attachChild(ref, N=1).addCallback(attached) 314 315 316 class Test_JobManager_Admin(JobManagerBC): 250 317 def test_attachChild_withUpdate(self): 251 318 self._newJob() … … 253 320 d1 = self.mgr.update(JOB_ID, 'setup', 1) 254 321 # The actual attachment event chain 255 d2 = self._attach( )322 d2 = self._attach(self.ref) 256 323 d2.addCallback(self.mgr.run, 'setup', 2) 257 324 d2.addCallback(self.failUnlessEqual, [1, 2]) … … 260 327 261 328 def test_new(self): 329 def attached(null): 330 self.failUnlessEqual(getattr(self, 'childID', None), 1) 331 return self.mgr.new(JOB_CODE).addCallback(check) 332 262 333 def check(jobID): 263 334 self.failUnlessEqual(jobID, 1) … … 265 336 self.failUnlessEqual(worker.iQualified, [1]) 266 337 267 d = self._attach() 268 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 269 d.addCallback(check) 270 return d 338 return self._attach(self.ref).addCallback(attached) 271 339 272 340 def test_run_updates(self): … … 278 346 return d 279 347 280 d = self._attach( )348 d = self._attach(self.ref) 281 349 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 282 350 d.addCallback(gotJobID) 283 351 return d 284 352 285 def test_registerClasses(self): 286 def check(null): 287 self.failUnlessElementsEqual( 288 self.root.registeredClasses, stringReps) 289 290 stringReps = ['foo.bar.SomeClass', 'foo.bar.AnotherClass'] 291 d = self._attach() 292 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 293 d.addCallback(lambda _: self.mgr.registerClasses(*stringReps)) 294 d.addCallback(check) 295 return d 296 353 def test_disconnect_cancel(self): 354 def gotJobID(jobID): 355 self.ref.broker.transport.loseConnection() 356 d = self.mgr.run(jobID, 'test', 0, 0).addCallback(runOver) 357 self.mgr.cancel(jobID) 358 return d 359 360 def runOver(result): 361 print "Job Canceled" 362 363 d = self._attach(self.ref) 364 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 365 d.addCallback(gotJobID) 366 return d 367 368 def test_disconnect_reassign(self): 369 def gotJobID(jobID): 370 self.ref.broker.transport.loseConnection() 371 return mock.deferToLater(delay=1.0).addCallback(delayDone, jobID) 372 373 def delayDone(null, jobID): 374 d = self.mgr.run(jobID, 'test', 1, 2) 375 d.addCallback(self.failUnlessEqual, 5) 376 self.getReferenceToRoot().addCallback(self._attach) 377 return d 378 379 d = self._attach(self.ref) 380 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 381 d.addCallback(gotJobID) 382 return d 383 384 385 386 class Test_JobManager_Run(JobManagerBC): 297 387 def test_run_one(self): 298 d = self._attach( )388 d = self._attach(self.ref) 299 389 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 300 390 d.addCallback(self.mgr.run, 'test', 10, 20) … … 305 395 def test_run_several_sequentially(self): 306 396 results = [] 307 yield defer.waitForDeferred(self._attach( ))397 yield defer.waitForDeferred(self._attach(self.ref)) 308 398 wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 309 399 yield wfd … … 318 408 def test_run_several_queued(self): 319 409 results = [] 320 yield defer.waitForDeferred(self._attach( ))410 yield defer.waitForDeferred(self._attach(self.ref)) 321 411 wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 322 412 yield wfd … … 329 419 yield defer.waitForDeferred(defer.DeferredList(dList)) 330 420 self.failUnlessEqual(results, range(10)) 421 422 def test_retry_after_failure(self): 423 d = self._attach(self.ref) 424 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 425 d.addCallback(self.mgr.run, 'failFirstTime') 426 d.addCallback(self.failUnlessEqual, 2) 427 return d 428 projects/AsynQueue/branches/debug_reassign/asynqueue/test/util.py
r2 r112 65 65 self.iQualified = [] 66 66 67 def setResignator(self, callableObject): 68 pass 69 67 70 def run(self, task): 68 71 def ran(result, d): projects/AsynQueue/branches/debug_reassign/asynqueue/workers.py
r95 r112 29 29 import errors 30 30 31 import debug 31 32 32 33 class IWorker(Interface): … … 149 150 # call will not reach this point. 150 151 except Exception, e: 151 reactor.callFromThread(task. d.errback, failure.Failure(e))152 reactor.callFromThread(task.errback, failure.Failure(e)) 152 153 else: 153 reactor.callFromThread(task. d.callback, result)154 reactor.callFromThread(task.callback, result) 154 155 # Broken out of loop, ready for the thread to end 155 156 reactor.callFromThread(self.d.callback, None) … … 203 204 thread. 204 205 """ 205 if self.task is not None and not self.task. d.called:206 if self.task is not None and not self.task.called: 206 207 result = [self.task] 207 208 else: … … 259 260 job = (task, d) 260 261 self.jobs.append(job) 261 d.addCallback(self.doneTrying, job) 262 d.addErrback(self.oops) 262 d.addBoth(self.doneTrying, job) 263 263 # The task's deferred is NOT returned! 264 264 265 def oops(self, failure):266 if failure.check(*self.disconnectErrors):267 self.resign()268 else:269 return failure270 271 265 def doneTrying(self, result, job): 266 if hasattr(result, 'getTraceback'): 267 print "OOPS", result.getTraceback() 268 if result.check(*self.disconnectErrors): 269 # This was a disconnect error, so bail out now; don't remove 270 # the job or signal the run request queue that the job is done. 271 return 272 print "DT" 272 273 self.jobs.remove(job) 273 274 self.runRequestQueue.put(None) 274 275 task = job[0] 275 task. d.callback(result)276 task.callback(result) 276 277 277 278 def resign(self, *null): … … 296 297 running and I can accept another one. 297 298 """ 298 if getattr(self, 'isShuttingDown', False): 299 raise errors.QueueRunError 299 debug.write("RUN: %d", id(task.d)) 300 #if getattr(self, 'isShuttingDown', False): 301 # raise errors.QueueRunError 300 302 return self.runRequestQueue.get().addCallback(self.runNow, task) 301 303 projects/AsynQueue/trunk/asynqueue/base.py
r90 r112 189 189 break 190 190 wfd = defer.waitForDeferred(self.mgr.assignment(task)) 191 yield wfd 191 yield wfd; wfd.getResult() 192 192 # Clean up after the loop exits 193 193 wfd = defer.waitForDeferred(self.mgr.shutdown(self.timeout)) … … 195 195 self.heap.shutdown() 196 196 # The result of the runner is a list of any unfinished tasks. 197 yield wfd.getResult() 197 result = [] 198 try: 199 result = wfd.getResult() 200 except: 201 pass 202 yield result 198 203 199 204 if self.isRunning(): … … 248 253 if worker == workerOrID: 249 254 return thisID 250 raise ValueError("No such worker")251 255 252 256 def detachWorker(self, workerOrID, reassign=False, crash=False): … … 261 265 See L{tasks.WorkerManager.terminate}. 262 266 """ 263 def terminated(unfinishedTasks):264 for task in unfinishedTasks:265 self.mgr.assignment(task)266 267 267 ID = self._getWorkerID(workerOrID) 268 if ID is None: 269 return 268 270 if crash: 269 d = self.mgr.terminate(ID, crash=True )271 d = self.mgr.terminate(ID, crash=True, reassign=reassign) 270 272 else: 271 d = self.mgr.terminate(ID, self.timeout) 272 if reassign: 273 d.addCallback(terminated) 273 d = self.mgr.terminate(ID, self.timeout, reassign=reassign) 274 274 return d 275 275 projects/AsynQueue/trunk/asynqueue/jobs.py
r110 r112 136 136 % (callName, jobID))) 137 137 138 def remote_exit(self ):138 def remote_exit(self, stopReactor=False): 139 139 """ 140 140 Terminates my child worker process, calling and waiting for any … … 147 147 if callable(possibleShutdownFunction): 148 148 dList.append(defer.maybeDeferred(possibleShutdownFunction)) 149 return defer.DeferredList(dList).addCallback(lambda _: reactor.stop()) 149 d = defer.DeferredList(dList) 150 if stopReactor: 151 d.addCallback(lambda _: reactor.stop()) 152 return d 150 153 151 154 … … 172 175 d = workers.RemoteCallWorker.stop(self) 173 176 d.addBoth(lambda _: self.remoteCaller('exit')) 177 d.addErrback(lambda _: None) 174 178 return d 175 179 … … 214 218 Shuts down my task queue, returning a deferred that fires when the 215 219 queue has emptied and all interpreter workers have finished and been 216 terminated. 220 terminated. The task queue shutdown takes care of shutting down 221 everything else, including any attached workers. 217 222 """ 218 223 return self.queue.shutdown() … … 306 311 """ 307 312 jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 308 print "NEW", jobID309 313 self.callsPending[jobID] = {} 310 314 self.jobs[jobID] = [jobCode, niceness] … … 384 388 at approximately half the rate of calls for a job with niceness N. 385 389 390 All keywords except for the following are passed to the call: 391 392 - B{timeout}: A timeout interval in seconds after which the call will 393 be retried. 394 386 395 @note: The task object generated contains the name of a callable (as a 387 396 string) for the first element of its I{callTuple} attribute, instead … … 392 401 393 402 """ 394 def jobRan(result, d): 403 def queueJob(doNext=False): 404 if doNext: 405 kw['doNext'] = True 406 dq = self.queue.call(callName, *args, **kw) 407 if timeout: 408 dq.setTimeout( 409 timeout, 410 timeoutFunc=lambda d: jobRan((False, "Timeout"))) 411 dq.addErrback(jobFailed) 412 dq.addCallback(jobRan) 413 414 def jobRan(result): 395 415 status, result = result 396 retryCount, callName, args, kw = self.callsPending[jobID].pop(d)397 416 if status: 398 return result 399 log("Error running job %d:\n%s", jobID, result) 400 if retryCount < self.maxRetries: 401 kw['retryCount'] = retryCount + 1 402 return self.run(jobID, callName, *args, **kw) 403 404 def jobFailed(failure, d): 405 # TODO: Do something different here, as it's a more drastic 406 # situation 407 self.callsPending[jobID].pop(d) 408 log("Unexpected error running job %d:\n%s", 409 jobID, failure.getTraceback()) 410 417 del self.callsPending[jobID][d] 418 d.callback(result) 419 else: 420 log("Error running job %d:\n%s", jobID, result) 421 tryAgain() 422 423 def jobFailed(failure): 424 return False, failure.getTraceback() 425 426 def tryAgain(): 427 if jobID in self.callsPending: 428 retryCount, callName, args, kw = self.callsPending[jobID][d] 429 if retryCount < self.maxRetries: 430 self.callsPending[jobID][d][0] = retryCount + 1 431 queueJob(True) 432 return 433 d.callback(None) 434 411 435 jobID = int(jobID) 412 436 if jobID not in self.jobs: 413 437 raise ValueError("No job '%s' registered" % jobID) 438 timeout = kw.pop('timeout', None) 414 439 kw['series'] = jobID 415 440 kw['niceness'] = self.jobs[jobID][1] 416 retryCount = kw.pop('retryCount', 0) 417 d = self.queue.call(callName, *args, **kw) 418 d.addCallback(jobRan, d) 419 d.addErrback(jobFailed, d) 420 self.callsPending[jobID][d] = retryCount, callName, args, kw 441 d = defer.Deferred() 442 self.callsPending[jobID][d] = [0, callName, args, kw] 443 queueJob() 421 444 return d 422 445 … … 430 453 self.updates.pop(jobID, None) 431 454 self.callsPending.pop(jobID, None) 432 433 projects/AsynQueue/trunk/asynqueue/tasks.py
r91 r112 32 32 """ 33 33 I represent a task that has been dispatched to a queue for running with a 34 given scheduling I{niceness}. I generate a C{Deferred}, accessible as an 35 attribute I{d}, firing it when the task is finally run and its result is 36 obtained. 34 given scheduling I{niceness}. 35 36 I generate a C{Deferred} that you fire by calling either my L{callback} or 37 L{errback} with a result or failure, respectively, when the the task is 38 finally run and its result is obtained. You can call the deferred's 39 versions of those methods directly, but my versions deal with things like 40 repeated callbacks, which happen sometimes with task timeouts. 37 41 38 42 @ivar d: A deferred to the eventual result of the task. … … 52 56 self.d = defer.Deferred() 53 57 58 def callback(self, result): 59 if not self.d.called: 60 self.d.callback(result) 61 62 def errback(self, result): 63 self.d.errback(result) 64 54 65 def __repr__(self): 55 66 """ … … 309 320 self._workerCounter = workerID 310 321 self.workers[workerID] = worker 322 worker.setResignator( 323 lambda : self.terminate(worker.ID, crash=True, reassign=True)) 311 324 return workerID 312 325 313 def terminate(self, workerID, timeout=None, crash=False ):326 def terminate(self, workerID, timeout=None, crash=False, reassign=False): 314 327 """ 315 328 Removes a worker from my work force, canceling all of its unfullfilled … … 324 337 325 338 @return: A deferred that fires when the worker has been removed, 326 gracefully or not, with a list of the worker's unfinished tasks. 339 gracefully or not, with a list of any tasks left unfinished and not 340 reassigned. 327 341 328 342 """ … … 338 352 return [] 339 353 return result 354 355 def reassignTasks(tasks): 356 for task in tasks: 357 self.assignmentFactory.new(task) 358 return [] 340 359 341 360 worker = self.workers.pop(workerID, None) … … 345 364 self.assignmentFactory.cancelRequests(worker) 346 365 if crash: 347 return defer.succeed(worker.crash()) 348 d = worker.stop() 349 if timeout: 350 callID = reactor.callLater(timeout, crashTheWorker, worker, d) 351 d.addCallback(stopped) 366 d = defer.succeed(worker.crash()) 352 367 else: 353 # No tasks left unfinished if deferred fires without timeout 354 d.addCallback(lambda _: []) 368 d = worker.stop() 369 if timeout: 370 callID = reactor.callLater(timeout, crashThe
