Changeset 126
- Timestamp:
- 02/22/08 22:41:33 (9 months ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/base.py (modified) (3 diffs)
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (6 diffs)
- projects/AsynQueue/trunk/asynqueue/tasks.py (modified) (6 diffs)
- projects/AsynQueue/trunk/asynqueue/test/util.py (modified) (2 diffs)
- projects/AsynQueue/trunk/asynqueue/workers.py (modified) (1 diff)
- projects/AsynQueue/trunk/setup.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/base.py
r112 r126 26 26 from zope.interface import implements 27 27 from twisted.python import failure 28 from twisted.internet import defer, reactor, interfaces 28 from twisted.internet import reactor, interfaces, defer 29 # Use C Deferreds if possible, for efficiency 30 try: 31 from twisted.internet import cdefer 32 except: 33 pass 34 else: 35 defer.Deferred = cdefer.Deferred 29 36 30 37 import tasks … … 328 335 task has been queued up. 329 336 337 @keyword timeout: A timeout interval in seconds from when a worker gets 338 a task assignment for the call, after which the call will be retried. 339 330 340 """ 331 341 def oneLessPending(result): … … 351 361 niceness = kw.pop('niceness', 0) 352 362 series = kw.pop('series', None) 353 task = self._taskFactory.new(func, args, kw, niceness, series) 363 timeout = kw.pop('timeout', None) 364 task = self._taskFactory.new(func, args, kw, niceness, series, timeout) 354 365 if kw.pop('doNext', False): 355 366 task.priority = -1000000 projects/AsynQueue/trunk/asynqueue/jobs.py
r116 r126 25 25 from zope.interface import implements, Interface 26 26 from twisted.internet import defer, reactor 27 from twisted.python import reflect 28 from twisted.python.rebuild import rebuild 27 29 from twisted.python.failure import Failure 28 from twisted.python import reflect29 30 from twisted.spread import pb, flavors 31 # Use C Deferreds if possible, for efficiency 32 try: 33 from twisted.internet import cdefer 34 except: 35 pass 36 else: 37 defer.Deferred = cdefer.Deferred 30 38 31 39 import base, workers … … 89 97 if cls.__module__ not in modules: 90 98 modules.append(cls.__module__) 91 # Try to reload the modules for the classes in case they've changed99 # Try to build the modules for the classes in case they've changed 92 100 # since the last run 93 101 for module in modules: 94 102 try: 95 re load(reflect.namedModule(module))103 rebuild(reflect.namedModule(module), doLog=False) 96 104 except: 97 105 pass … … 221 229 222 230 """ 223 maxRetries = 2231 maxRetries = 1 224 232 225 233 def __init__(self, queue=None): … … 409 417 All keywords except for the following are passed to the call: 410 418 411 - B{timeout}: A timeout interval in seconds after which the call will 412 be retried. 419 - B{timeout}: A timeout interval in seconds from when a worker 420 gets a task assignment for the call, after which the call will be 421 retried. 413 422 414 423 @note: The task object generated contains the name of a callable (as a … … 424 433 kw['doNext'] = True 425 434 dq = self.queue.call(callName, *args, **kw) 426 if timeout: 427 dq.setTimeout( 428 timeout, 429 timeoutFunc=lambda d: jobRan((False, "Timeout"))) 430 dq.addErrback(jobFailed) 435 dq.addErrback(lambda failure: (False, failure.getTraceback())) 431 436 dq.addCallback(jobRan) 432 437 433 438 def jobRan(result): 434 439 status, result = result 435 if status and jobID in self.callsPending:436 if d in self.callsPending [jobID]:440 if status: 441 if d in self.callsPending.get(jobID, []): 437 442 del self.callsPending[jobID][d] 438 443 d.callback(result) 444 elif result == 'Timeout': 445 log("Timeout on job %d, retrying", jobID) 446 tryAgain() 439 447 else: 440 448 log("Error running job %d:\n%s", jobID, result) 441 449 tryAgain() 442 443 def jobFailed(failure):444 return False, failure.getTraceback()445 450 446 451 def tryAgain(): … … 456 461 if jobID not in self.jobs: 457 462 raise ValueError("No job '%s' registered" % jobID) 458 timeout = kw.pop('timeout', None)459 463 kw['series'] = jobID 460 464 kw['niceness'] = self.jobs[jobID][1] projects/AsynQueue/trunk/asynqueue/tasks.py
r112 r126 24 24 # Imports 25 25 from twisted.internet import defer, reactor 26 # Use C Deferreds if possible, for efficiency 27 try: 28 from twisted.internet import cdefer 29 except: 30 pass 31 else: 32 defer.Deferred = cdefer.Deferred 26 33 27 34 from workers import IWorker … … 46 53 47 54 """ 48 def __init__(self, f, args, kw, priority, series ):55 def __init__(self, f, args, kw, priority, series, timeout=None): 49 56 if not isinstance(args, (tuple, list)): 50 57 raise TypeError("Second argument 'args' isn't a sequence") … … 55 62 self.series = series 56 63 self.d = defer.Deferred() 64 self.timeout = timeout 65 66 def startTimer(self): 67 if self.timeout: 68 self.callID = reactor.callLater(self.timeout, self.timedout) 69 else: 70 self.callID = None 57 71 58 72 def callback(self, result): 73 if self.callID: 74 self.callID.cancel() 75 self.callID = None 59 76 if not self.d.called: 60 77 self.d.callback(result) 61 78 62 79 def errback(self, result): 80 if self.callID: 81 self.callID.cancel() 82 self.callID = None 63 83 self.d.errback(result) 84 85 def timedout(self): 86 if not self.d.called: 87 self.d.callback((False, "Timeout")) 88 self.callID = None 64 89 65 90 def __repr__(self): … … 105 130 self.seriesNumbers = {} 106 131 107 def new(self, func, args, kw, niceness, series=None ):132 def new(self, func, args, kw, niceness, series=None, timeout=None): 108 133 """ 109 134 Call this to obtain a L{Task} instance that will run in the specified … … 131 156 positivized = niceness + 20 132 157 priority = self._serial(series) * (1 + (float(positivized)/10)**2) 133 return self.TaskClass(func, args, kw, priority, series )158 return self.TaskClass(func, args, kw, priority, series, timeout) 134 159 135 160 def _serial(self, series): … … 176 201 """ 177 202 self.d.callback(None) 203 self.task.startTimer() 178 204 return worker.run(self.task) 179 205 projects/AsynQueue/trunk/asynqueue/test/util.py
r112 r126 38 38 39 39 class MockTask(object): 40 def __init__(self, f, args, kw, priority, series ):40 def __init__(self, f, args, kw, priority, series, timeout=None): 41 41 self.ran = False 42 42 self.callTuple = (f, args, kw) … … 52 52 def __str__(self): 53 53 return str(self.callTuple[0]) 54 55 def startTimer(self): 56 pass 54 57 55 58 projects/AsynQueue/trunk/asynqueue/workers.py
r112 r126 118 118 """ 119 119 I implement an L{IWorker} that runs tasks in a dedicated worker thread. 120 121 You can define one or more specialties that I am qualified to handle with 122 string arguments. 120 123 """ 121 124 implements(IWorker) 122 125 cQualified = [] 123 126 124 def __init__(self ):127 def __init__(self, *specialties): 125 128 import threading 126 self.iQualified = []129 self.iQualified = list(specialties) 127 130 self.event = threading.Event() 128 131 self.thread = threading.Thread(target=self._loop) projects/AsynQueue/trunk/setup.py
r79 r126 30 30 31 31 ### Define setup options 32 kw = {'version':'0. 2',32 kw = {'version':'0.3', 33 33 'license':'GPL', 34 34 'platforms':'OS Independent', … … 47 47 kw['keywords'] = [ 48 48 'Twisted', 'asynchronous', 'threads', 49 'taskqueue', 'queue', 'priority', 'tasks', 'jobs'] 49 'taskqueue', 'queue', 'priority', 'tasks', 'jobs', 'nodes', 'cluster'] 50 50 51 51 52 kw['classifiers'] = [ 52 53 'Development Status :: 4 - Beta', 54 53 55 'Intended Audience :: Developers', 56 'Intended Audience :: Science/Research', 57 54 58 'License :: OSI Approved :: GNU General Public License (GPL)', 55 59 'Operating System :: OS Independent', 56 60 'Programming Language :: Python', 61 62 'Topic :: System :: Distributed Computing', 63 'Topic :: Software Development :: Object Brokering', 57 64 'Topic :: Software Development :: Libraries :: Python Modules', 58 65 ] 66 59 67 60 68 kw['description'] = " ".join("""
