| 1 |
# AsynQueue: |
|---|
| 2 |
# Asynchronous task queueing based on the Twisted framework, with task |
|---|
| 3 |
# prioritization and a powerful worker/manager interface. |
|---|
| 4 |
# |
|---|
| 5 |
# Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com |
|---|
| 6 |
# |
|---|
| 7 |
# This program is free software; you can redistribute it and/or modify it under |
|---|
| 8 |
# the terms of the GNU General Public License as published by the Free Software |
|---|
| 9 |
# Foundation; either version 2 of the License, or (at your option) any later |
|---|
| 10 |
# version. |
|---|
| 11 |
# |
|---|
| 12 |
# This program is distributed in the hope that it will be useful, but WITHOUT |
|---|
| 13 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 14 |
# FOR A PARTICULAR PURPOSE. See the file COPYING for more details. |
|---|
| 15 |
# |
|---|
| 16 |
# You should have received a copy of the GNU General Public License along with |
|---|
| 17 |
# this program; if not, write to the Free Software Foundation, Inc., 51 |
|---|
| 18 |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA |
|---|
| 19 |
|
|---|
| 20 |
""" |
|---|
| 21 |
Running jobs on python interpreters that are connected as children via |
|---|
| 22 |
Twisted's Perspective Broker. |
|---|
| 23 |
""" |
|---|
| 24 |
|
|---|
| 25 |
from zope.interface import implements, Interface |
|---|
| 26 |
from twisted.internet import defer, reactor |
|---|
| 27 |
from twisted.python import reflect |
|---|
| 28 |
from twisted.python.rebuild import rebuild |
|---|
| 29 |
from twisted.python.failure import Failure |
|---|
| 30 |
from twisted.spread import pb, flavors |
|---|
| 31 |
# Use C Deferreds if possible, for efficiency |
|---|
| 32 |
try: |
|---|
| 33 |
from twisted.internet import cdefer |
|---|
| 34 |
except: |
|---|
| 35 |
pass |
|---|
| 36 |
else: |
|---|
| 37 |
defer.Deferred = cdefer.Deferred |
|---|
| 38 |
|
|---|
| 39 |
import base, workers |
|---|
| 40 |
|
|---|
| 41 |
VERBOSE = True |
|---|
| 42 |
def log(msgProto, *args): |
|---|
| 43 |
if VERBOSE: |
|---|
| 44 |
print msgProto % args |
|---|
| 45 |
|
|---|
| 46 |
|
|---|
| 47 |
class TrustError(Exception): |
|---|
| 48 |
pass |
|---|
| 49 |
|
|---|
| 50 |
|
|---|
| 51 |
class ChildRoot(pb.Root): |
|---|
| 52 |
""" |
|---|
| 53 |
I am the root resource for one child worker interpreter. |
|---|
| 54 |
|
|---|
| 55 |
@ivar trusted: Set C{True} when the 'parent' python interpreter is trusted |
|---|
| 56 |
to provide code that I can run without concerns. |
|---|
| 57 |
|
|---|
| 58 |
""" |
|---|
| 59 |
def __getattr__(self, name): |
|---|
| 60 |
if name == 'jobs': |
|---|
| 61 |
result = self.jobs = {} |
|---|
| 62 |
elif name == 'trusted': |
|---|
| 63 |
result = self.trusted = False |
|---|
| 64 |
else: |
|---|
| 65 |
raise AttributeError("No such attribute '%s'" % name) |
|---|
| 66 |
return result |
|---|
| 67 |
|
|---|
| 68 |
def oops(self, *arg): |
|---|
| 69 |
""" |
|---|
| 70 |
Returns a C{False} status code for a remote call along with a string |
|---|
| 71 |
traceback of the exception raised. You can supply your own exception or |
|---|
| 72 |
L{Failure} instance. If you don't, the current exception will be used. |
|---|
| 73 |
""" |
|---|
| 74 |
if arg and isinstance(arg[0], Failure): |
|---|
| 75 |
failureObject = arg[0] |
|---|
| 76 |
else: |
|---|
| 77 |
failureObject = Failure(*arg) |
|---|
| 78 |
return False, failureObject.getTraceback() |
|---|
| 79 |
|
|---|
| 80 |
def remote_registerClasses(self, *args): |
|---|
| 81 |
""" |
|---|
| 82 |
Instructs my broker to register the classes specified by the |
|---|
| 83 |
argument(s). |
|---|
| 84 |
|
|---|
| 85 |
The classes will be registered for B{all} jobs, and are specified by |
|---|
| 86 |
their string representations:: |
|---|
| 87 |
|
|---|
| 88 |
<package(s).module.class> |
|---|
| 89 |
|
|---|
| 90 |
""" |
|---|
| 91 |
modules = [] |
|---|
| 92 |
for stringRep in args: |
|---|
| 93 |
# Load the class for the string representation |
|---|
| 94 |
cls = reflect.namedObject(stringRep) |
|---|
| 95 |
# Register instances of the class, including its type and module |
|---|
| 96 |
pb.setUnjellyableForClass(stringRep, cls) |
|---|
| 97 |
if cls.__module__ not in modules: |
|---|
| 98 |
modules.append(cls.__module__) |
|---|
| 99 |
# Try to build the modules for the classes in case they've changed |
|---|
| 100 |
# since the last run |
|---|
| 101 |
for module in modules: |
|---|
| 102 |
try: |
|---|
| 103 |
rebuild(reflect.namedModule(module), doLog=False) |
|---|
| 104 |
except: |
|---|
| 105 |
pass |
|---|
| 106 |
|
|---|
| 107 |
def remote_newJob(self, jobID, jobCode): |
|---|
| 108 |
""" |
|---|
| 109 |
Registers the job identified by the specified integer I{jobID} and |
|---|
| 110 |
represented by Python code contained in the string I{jobCode}. |
|---|
| 111 |
|
|---|
| 112 |
The namespace can include a no-argument function C{shutdown}, which may |
|---|
| 113 |
return a deferred. If it is present, the function will be called before |
|---|
| 114 |
my process exits. |
|---|
| 115 |
|
|---|
| 116 |
Returns a tuple containing a status value and the result of loading the |
|---|
| 117 |
job. If all went well, the status is C{True} and the result is a list |
|---|
| 118 |
of names of the callable objects in the job's namespace. If there was a |
|---|
| 119 |
problem, the status is C{False} and the result is a string traceback of |
|---|
| 120 |
the exception that was raised. |
|---|
| 121 |
""" |
|---|
| 122 |
if not self.trusted: |
|---|
| 123 |
return self.oops(TrustError) |
|---|
| 124 |
try: |
|---|
| 125 |
namespace = {} |
|---|
| 126 |
exec jobCode in namespace |
|---|
| 127 |
except: |
|---|
| 128 |
return self.oops() |
|---|
| 129 |
self.jobs[jobID] = namespace |
|---|
| 130 |
return True, [x[0] for x in namespace.iteritems() if callable(x[1])] |
|---|
| 131 |
|
|---|
| 132 |
def remote_runJob(self, jobID, callName, *args, **kw): |
|---|
| 133 |
""" |
|---|
| 134 |
Calls the object that is present in the namespace of I{jobID} under the |
|---|
| 135 |
specified I{callName}, with any supplied arguments and |
|---|
| 136 |
keywords. |
|---|
| 137 |
|
|---|
| 138 |
Returns a tuple containing a status value and the result of the |
|---|
| 139 |
call. loading the job. If all went well, the status is C{True} and the |
|---|
| 140 |
result is the result, of whatever type. If there was a problem, the |
|---|
| 141 |
status is C{False} and the result is a string traceback of the |
|---|
| 142 |
exception that was raised. |
|---|
| 143 |
""" |
|---|
| 144 |
if not self.trusted: |
|---|
| 145 |
return self.oops(TrustError) |
|---|
| 146 |
calledObject = self.jobs[jobID].get(callName, None) |
|---|
| 147 |
if callable(calledObject): |
|---|
| 148 |
d = defer.maybeDeferred(calledObject, *args, **kw) |
|---|
| 149 |
d.addCallback(lambda x: (True, x)) |
|---|
| 150 |
d.addErrback(self.oops) |
|---|
| 151 |
return d |
|---|
| 152 |
return self.oops( |
|---|
| 153 |
AttributeError( |
|---|
| 154 |
"No callable object '%s' defined in namespace for job %d" \ |
|---|
| 155 |
% (callName, jobID))) |
|---|
| 156 |
|
|---|
| 157 |
def remote_forgetJob(self, jobID): |
|---|
| 158 |
""" |
|---|
| 159 |
Call this with the I{jobID} of a job that is done and I will forget its |
|---|
| 160 |
namespace, thus freeing up memory. |
|---|
| 161 |
""" |
|---|
| 162 |
if jobID in self.jobs: |
|---|
| 163 |
del self.jobs[jobID] |
|---|
| 164 |
|
|---|
| 165 |
def remote_exit(self, stopReactor=False): |
|---|
| 166 |
""" |
|---|
| 167 |
Terminates my child worker process, calling and waiting for any |
|---|
| 168 |
C{shutdown} methods present in my jobs' namespaces before stopping my |
|---|
| 169 |
reactor. |
|---|
| 170 |
""" |
|---|
| 171 |
dList = [] |
|---|
| 172 |
for namespace in self.jobs.itervalues(): |
|---|
| 173 |
possibleShutdownFunction = namespace.get('shutdown', None) |
|---|
| 174 |
if callable(possibleShutdownFunction): |
|---|
| 175 |
dList.append(defer.maybeDeferred(possibleShutdownFunction)) |
|---|
| 176 |
d = defer.DeferredList(dList) |
|---|
| 177 |
if stopReactor: |
|---|
| 178 |
d.addCallback(lambda _: reactor.stop()) |
|---|
| 179 |
return d |
|---|
| 180 |
|
|---|
| 181 |
|
|---|
| 182 |
class ChildWorker(workers.RemoteCallWorker): |
|---|
| 183 |
""" |
|---|
| 184 |
I implement an I{IWorker} that runs tasks, up to I{N} pending at a time, in |
|---|
| 185 |
a particular job on a particular child interpreter. |
|---|
| 186 |
""" |
|---|
| 187 |
def __init__(self, *args, **kw): |
|---|
| 188 |
# For unit testing purposes |
|---|
| 189 |
if getattr(self, '_noTypeCheck', False): |
|---|
| 190 |
kw['noTypeCheck'] = True |
|---|
| 191 |
workers.RemoteCallWorker.__init__(self, *args, **kw) |
|---|
| 192 |
|
|---|
| 193 |
def runNow(self, null, task): |
|---|
| 194 |
funcName, args, kw = task.callTuple |
|---|
| 195 |
d = self.remoteCaller('runJob', task.series, funcName, *args, **kw) |
|---|
| 196 |
job = (task, d) |
|---|
| 197 |
self.jobs.append(job) |
|---|
| 198 |
d.addBoth(self.doneTrying, job) |
|---|
| 199 |
# This task's deferred is NOT returned! |
|---|
| 200 |
|
|---|
| 201 |
def stop(self): |
|---|
| 202 |
d = workers.RemoteCallWorker.stop(self) |
|---|
| 203 |
d.addBoth(lambda _: self.remoteCaller('exit')) |
|---|
| 204 |
d.addErrback(lambda _: None) |
|---|
| 205 |
return d |
|---|
| 206 |
|
|---|
| 207 |
|
|---|
| 208 |
class JobManager(object): |
|---|
| 209 |
""" |
|---|
| 210 |
I keep jobs running on python interpreters that are attached as children, |
|---|
| 211 |
maintaining a pipeline of no fewer than I{N} calls pending on each |
|---|
| 212 |
interpreter worker to minimize the effects of network latency for the PB |
|---|
| 213 |
connection to the interpreters and balance the load across the workers |
|---|
| 214 |
while still permitting some priority queueing of jobs by niceness. |
|---|
| 215 |
|
|---|
| 216 |
You can supply an instance of L{base.TaskQueue} to the constructor. I will |
|---|
| 217 |
instantiate my own if not. |
|---|
| 218 |
|
|---|
| 219 |
I maintain a dict I{updates} of update tasks to perform for each jobID |
|---|
| 220 |
before any (further) runs for that job. Each sequence has four elements:: |
|---|
| 221 |
|
|---|
| 222 |
[funcName, args, kw, workersUpdated] |
|---|
| 223 |
|
|---|
| 224 |
When a worker runs a given update task, that worker's ID is appended to the |
|---|
| 225 |
I{workersUpdate} list that is the fourth element of I{updates}. That will |
|---|
| 226 |
indicate that it needs not run the update task again. |
|---|
| 227 |
|
|---|
| 228 |
@ivar queue: The TaskQueue instance I'm using. |
|---|
| 229 |
|
|---|
| 230 |
""" |
|---|
| 231 |
maxRetries = 1 |
|---|
| 232 |
|
|---|
| 233 |
def __init__(self, queue=None): |
|---|
| 234 |
self.jobs = {} |
|---|
| 235 |
self.updates = {} |
|---|
| 236 |
self.callsPending = {} |
|---|
| 237 |
self.registeredClasses = {} |
|---|
| 238 |
if queue is None: |
|---|
| 239 |
self.queue = base.TaskQueue() |
|---|
| 240 |
else: |
|---|
| 241 |
self.queue = queue |
|---|
| 242 |
|
|---|
| 243 |
def shutdown(self): |
|---|
| 244 |
""" |
|---|
| 245 |
Shuts down my task queue, returning a deferred that fires when the |
|---|
| 246 |
queue has emptied and all interpreter workers have finished and been |
|---|
| 247 |
terminated. The task queue shutdown takes care of shutting down |
|---|
| 248 |
everything else, including any attached workers. |
|---|
| 249 |
""" |
|---|
| 250 |
return self.queue.shutdown() |
|---|
| 251 |
|
|---|
| 252 |
def jobTried(self, result, jobID, worker): |
|---|
| 253 |
""" |
|---|
| 254 |
Callback from loading a new job. |
|---|
| 255 |
|
|---|
| 256 |
If the worker's root reference raised an unexpected failure, returns |
|---|
| 257 |
C{False}. If everything went OK, returns C{True}. If there was a |
|---|
| 258 |
failure that may not have been the worker's fault, returns C{None}. |
|---|
| 259 |
""" |
|---|
| 260 |
if hasattr(result, 'check'): |
|---|
| 261 |
# Oops, failure from a bogus root reference. |
|---|
| 262 |
log("Worker %d supplied nonconforming root reference", worker.ID) |
|---|
| 263 |
return False |
|---|
| 264 |
if isinstance(result, (list, tuple)): |
|---|
| 265 |
if result[0]: |
|---|
| 266 |
msg = "Callable objects: %s" % ", ".join(result[1]) |
|---|
| 267 |
log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg) |
|---|
| 268 |
self.queue.qualifyWorker(worker, jobID) |
|---|
| 269 |
return True |
|---|
| 270 |
log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1]) |
|---|
| 271 |
return None |
|---|
| 272 |
# Not a failure or status,result tuple, so just pass it along |
|---|
| 273 |
return result |
|---|
| 274 |
|
|---|
| 275 |
def attachChild(self, childRoot, N=3): |
|---|
| 276 |
""" |
|---|
| 277 |
Attaches a new child interpreter worker using the supplied I{childRoot} |
|---|
| 278 |
PB root reference. |
|---|
| 279 |
|
|---|
| 280 |
Tries to load all of the currently registered jobs on the worker. If an |
|---|
| 281 |
unexpected failure (not a simple job-loading exception) arises, the |
|---|
| 282 |
worker is not hired. |
|---|
| 283 |
|
|---|
| 284 |
The default number (three) of job runs that the worker is willing to |
|---|
| 285 |
queue up on its end can be overridden with the I{N} keyword. |
|---|
| 286 |
|
|---|
| 287 |
Returns a deferred that fires with the worker's ID, or C{None} if not |
|---|
| 288 |
hired. |
|---|
| 289 |
""" |
|---|
| 290 |
def jobTried(status): |
|---|
| 291 |
if status: |
|---|
| 292 |
d = self._runRegisterClasses(worker) |
|---|
| 293 |
if jobID in self.updates: |
|---|
| 294 |
d.addCallback(lambda _: self._runUpdate(jobID, worker)) |
|---|
| 295 |
return d |
|---|
| 296 |
mutable.append(None) |
|---|
| 297 |
|
|---|
| 298 |
def allDone(null): |
|---|
| 299 |
if len(mutable): |
|---|
| 300 |
d = self.queue.detachWorker(worker) |
|---|
| 301 |
d.addCallback(lambda _: None) |
|---|
| 302 |
return d |
|---|
| 303 |
return worker.ID |
|---|
| 304 |
|
|---|
| 305 |
mutable = [] |
|---|
| 306 |
worker = ChildWorker(childRoot, N) |
|---|
| 307 |
self.queue.attachWorker(worker) |
|---|
| 308 |
dList = [] |
|---|
| 309 |
for jobID, jobInfo in self.jobs.iteritems(): |
|---|
| 310 |
jobCode = jobInfo[0] |
|---|
| 311 |
d = childRoot.callRemote('newJob', jobID, jobCode) |
|---|
| 312 |
d.addBoth(self.jobTried, jobID, worker) |
|---|
| 313 |
d.addCallback(jobTried) |
|---|
| 314 |
dList.append(d) |
|---|
| 315 |
return defer.DeferredList(dList).addCallback(allDone) |
|---|
| 316 |
|
|---|
| 317 |
def detachChild(self, childID): |
|---|
| 318 |
""" |
|---|
| 319 |
Detaches and terminates the child interpreter worker specified by the |
|---|
| 320 |
supplied I{childID}. |
|---|
| 321 |
""" |
|---|
| 322 |
return self.queue.detachWorker(childID, reassign=True, crash=True) |
|---|
| 323 |
|
|---|
| 324 |
def new(self, jobCode, niceness=0): |
|---|
| 325 |
""" |
|---|
| 326 |
Registers a new job for execution on qualified child interpreters. |
|---|
| 327 |
|
|---|
| 328 |
@param jobCode: A string containing Python code that defines the job |
|---|
| 329 |
and its namespace. |
|---|
| 330 |
|
|---|
| 331 |
@keyword niceness: Scheduling niceness for all calls of the job. |
|---|
| 332 |
|
|---|
| 333 |
@type niceness: An integer between -20 and 20, with lower numbers |
|---|
| 334 |
having higher scheduling priority as in UNIX C{nice} and C{renice}. |
|---|
| 335 |
|
|---|
| 336 |
@return: A deferred that fires with a unique ID for the job. |
|---|
| 337 |
|
|---|
| 338 |
""" |
|---|
| 339 |
jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 |
|---|
| 340 |
self.callsPending[jobID] = {} |
|---|
| 341 |
self.jobs[jobID] = [jobCode, niceness] |
|---|
| 342 |
|
|---|
| 343 |
dList = [] |
|---|
| 344 |
for worker in self.queue.workers(): |
|---|
| 345 |
d = worker.remoteCaller('newJob', jobID, jobCode) |
|---|
| 346 |
d.addBoth(self.jobTried, jobID, worker) |
|---|
| 347 |
dList.append(d) |
|---|
| 348 |
d = defer.DeferredList(dList) |
|---|
| 349 |
d.addCallback(lambda _: jobID) |
|---|
| 350 |
return d |
|---|
| 351 |
|
|---|
| 352 |
def _runUpdate(self, jobID, worker): |
|---|
| 353 |
dList = [] |
|---|
| 354 |
for funcName, args, kw, workersUpdated in self.updates[jobID]: |
|---|
| 355 |
if worker.ID in workersUpdated: |
|---|
| 356 |
continue |
|---|
| 357 |
d = worker.remoteCaller('runJob', jobID, funcName, *args, **kw) |
|---|
| 358 |
d.addCallback(lambda _: workersUpdated.append(worker.ID)) |
|---|
| 359 |
dList.append(d) |
|---|
| 360 |
return defer.DeferredList(dList) |
|---|
| 361 |
|
|---|
| 362 |
def update(self, jobID, callName, *args, **kw): |
|---|
| 363 |
""" |
|---|
| 364 |
Appends a new task to the update list for the specified I{jobID}. Runs |
|---|
| 365 |
the new update task on all workers currently attached and ensures that |
|---|
| 366 |
all new workers run the task for that job before they run any other |
|---|
| 367 |
tasks for it. |
|---|
| 368 |
|
|---|
| 369 |
The updates are run via a direct remoteCall to each worker, not through |
|---|
| 370 |
the queue. Because of the disconnect between queued and direct calls, |
|---|
| 371 |
it is likely but not guaranteed that any jobs you have queued when this |
|---|
| 372 |
method is called will run on a particular worker B{after} this update |
|---|
| 373 |
is run. Wait for the deferred from this method to fire before queuing |
|---|
| 374 |
any jobs that need the update to be in place before running. |
|---|
| 375 |
|
|---|
| 376 |
If you don't want the task saved to the update list, but only run on |
|---|
| 377 |
the workers currently attached, set the I{ephemeral} keyword C{True}. |
|---|
| 378 |
""" |
|---|
| 379 |
ephemeral = kw.pop('ephemeral', False) |
|---|
| 380 |
if ephemeral: |
|---|
| 381 |
dList = [ |
|---|
| 382 |
worker.remoteCaller('runJob', jobID, callName, *args, **kw) |
|---|
| 383 |
for worker in self.queue.workers()] |
|---|
| 384 |
else: |
|---|
| 385 |
if jobID not in self.updates: |
|---|
| 386 |
self.updates[jobID] = [] |
|---|
| 387 |
self.updates[jobID].append([callName, args, kw, []]) |
|---|
| 388 |
dList = [ |
|---|
| 389 |
self._runUpdate(jobID, worker) |
|---|
| 390 |
for worker in self.queue.workers()] |
|---|
| 391 |
return defer.DeferredList(dList) |
|---|
| 392 |
|
|---|
| 393 |
def _runRegisterClasses(self, worker): |
|---|
| 394 |
stringReps = [] |
|---|
| 395 |
for stringRep, registeredWorkers in self.registeredClasses.iteritems(): |
|---|
| 396 |
if worker.ID in registeredWorkers: |
|---|
| 397 |
continue |
|---|
| 398 |
registeredWorkers.append(worker.ID) |
|---|
| 399 |
stringReps.append(stringRep) |
|---|
| 400 |
return worker.remoteCaller('registerClasses', *stringReps) |
|---|
| 401 |
|
|---|
| 402 |
def registerClasses(self, *args): |
|---|
| 403 |
""" |
|---|
| 404 |
Instructs my current and future nodes to register the classes specified |
|---|
| 405 |
by the argument(s) as self-unjellyable and allowable past PB |
|---|
| 406 |
security. The classes will be registered for B{all} jobs, and are |
|---|
| 407 |
specified by their string representations:: |
|---|
| 408 |
|
|---|
| 409 |
<package(s).module.class> |
|---|
| 410 |
|
|---|
| 411 |
Use judiciously! |
|---|
| 412 |
|
|---|
| 413 |
""" |
|---|
| 414 |
for stringRep in args: |
|---|
| 415 |
if stringRep not in self.registeredClasses: |
|---|
| 416 |
self.registeredClasses[stringRep] = [] |
|---|
| 417 |
dList = [ |
|---|
| 418 |
self._runRegisterClasses(worker) |
|---|
| 419 |
for worker in self.queue.workers()] |
|---|
| 420 |
return defer.DeferredList(dList) |
|---|
| 421 |
|
|---|
| 422 |
def run(self, jobID, callName, *args, **kw): |
|---|
| 423 |
""" |
|---|
| 424 |
Runs the specified I{jobID} by putting a call to the specified callable |
|---|
| 425 |
object in the job's namespace, with any supplied arguments and |
|---|
| 426 |
keywords, into the queue. |
|---|
| 427 |
|
|---|
| 428 |
Scheduling of the job is impacted by the niceness of the job itself. As |
|---|
| 429 |
with UNIX niceness, the value should be an integer where 0 is normal |
|---|
| 430 |
scheduling, negative numbers are higher priority, and positive numbers |
|---|
| 431 |
are lower priority. Calls for a job having niceness N+10 are dispatched |
|---|
| 432 |
at approximately half the rate of calls for a job with niceness N. |
|---|
| 433 |
|
|---|
| 434 |
All keywords except for the following are passed to the call: |
|---|
| 435 |
|
|---|
| 436 |
- B{timeout}: A timeout interval in seconds from when a worker |
|---|
| 437 |
gets a task assignment for the call, after which the call will be |
|---|
| 438 |
retried. |
|---|
| 439 |
|
|---|
| 440 |
@note: The task object generated contains the name of a callable (as a |
|---|
| 441 |
string) for the first element of its I{callTuple} attribute, instead |
|---|
| 442 |
of a callable itself. |
|---|
| 443 |
|
|---|
| 444 |
@return: A deferred to the eventual result of the call when it is |
|---|
| 445 |
eventually pulled from the queue and run. |
|---|
| 446 |
|
|---|
| 447 |
""" |
|---|
| 448 |
def queueJob(doNext=False): |
|---|
| 449 |
if doNext: |
|---|
| 450 |
kw['doNext'] = True |
|---|
| 451 |
dq = self.queue.call(callName, *args, **kw) |
|---|
| 452 |
dq.addErrback(lambda failure: (False, failure.getTraceback())) |
|---|
| 453 |
dq.addCallback(jobRan) |
|---|
| 454 |
|
|---|
| 455 |
def jobRan(result): |
|---|
| 456 |
status, result = result |
|---|
| 457 |
if status: |
|---|
| 458 |
if d in self.callsPending.get(jobID, []): |
|---|
| 459 |
del self.callsPending[jobID][d] |
|---|
| 460 |
d.callback(result) |
|---|
| 461 |
elif result == 'Timeout': |
|---|
| 462 |
log("Timeout on job %d, retrying", jobID) |
|---|
| 463 |
tryAgain() |
|---|
| 464 |
else: |
|---|
| 465 |
log("Error running job %d:\n%s", jobID, result) |
|---|
| 466 |
tryAgain() |
|---|
| 467 |
|
|---|
| 468 |
def tryAgain(): |
|---|
| 469 |
if jobID in self.callsPending: |
|---|
| 470 |
retryCount, callName, args, kw = self.callsPending[jobID][d] |
|---|
| 471 |
if retryCount < self.maxRetries: |
|---|
| 472 |
self.callsPending[jobID][d][0] = retryCount + 1 |
|---|
| 473 |
queueJob(True) |
|---|
| 474 |
return |
|---|
| 475 |
d.callback(None) |
|---|
| 476 |
|
|---|
| 477 |
jobID = int(jobID) |
|---|
| 478 |
if jobID not in self.jobs: |
|---|
| 479 |
raise ValueError("No job '%s' registered" % jobID) |
|---|
| 480 |
kw['series'] = jobID |
|---|
| 481 |
kw['niceness'] = self.jobs[jobID][1] |
|---|
| 482 |
d = defer.Deferred() |
|---|
| 483 |
self.callsPending[jobID][d] = [0, callName, args, kw] |
|---|
| 484 |
queueJob() |
|---|
| 485 |
return d |
|---|
| 486 |
|
|---|
| 487 |
def cancel(self, jobID): |
|---|
| 488 |
""" |
|---|
| 489 |
Cancels the specified I{jobID} and any jobs that may be queued for |
|---|
| 490 |
it. If the job doesn't exist, no error is raised. |
|---|
| 491 |
""" |
|---|
| 492 |
self.queue.cancelSeries(jobID) |
|---|
| 493 |
self.jobs.pop(jobID, None) |
|---|
| 494 |
self.updates.pop(jobID, None) |
|---|
| 495 |
self.callsPending.pop(jobID, None) |
|---|
| 496 |
dList = [ |
|---|
| 497 |
worker.remoteCaller('forgetJob', jobID) |
|---|
| 498 |
for worker in self.queue.workers()] |
|---|
| 499 |
return defer.DeferredList(dList) |
|---|