| 1 |
# AsynQueue: |
|---|
| 2 |
# Asynchronous task queueing based on the Twisted framework, with task |
|---|
| 3 |
# prioritization and a powerful worker/manager interface. |
|---|
| 4 |
# |
|---|
| 5 |
# Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com |
|---|
| 6 |
# |
|---|
| 7 |
# This program is free software; you can redistribute it and/or modify it under |
|---|
| 8 |
# the terms of the GNU General Public License as published by the Free Software |
|---|
| 9 |
# Foundation; either version 2 of the License, or (at your option) any later |
|---|
| 10 |
# version. |
|---|
| 11 |
# |
|---|
| 12 |
# This program is distributed in the hope that it will be useful, but WITHOUT |
|---|
| 13 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 14 |
# FOR A PARTICULAR PURPOSE. See the file COPYING for more details. |
|---|
| 15 |
# |
|---|
| 16 |
# You should have received a copy of the GNU General Public License along with |
|---|
| 17 |
# this program; if not, write to the Free Software Foundation, Inc., 51 |
|---|
| 18 |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA |
|---|
| 19 |
|
|---|
| 20 |
""" |
|---|
| 21 |
Task management for the task queue workers |
|---|
| 22 |
""" |
|---|
| 23 |
|
|---|
| 24 |
# Imports |
|---|
| 25 |
from twisted.internet import defer, reactor |
|---|
| 26 |
# Use C Deferreds if possible, for efficiency |
|---|
| 27 |
try: |
|---|
| 28 |
from twisted.internet import cdefer |
|---|
| 29 |
except: |
|---|
| 30 |
pass |
|---|
| 31 |
else: |
|---|
| 32 |
defer.Deferred = cdefer.Deferred |
|---|
| 33 |
|
|---|
| 34 |
from workers import IWorker |
|---|
| 35 |
from errors import ImplementationError |
|---|
| 36 |
|
|---|
| 37 |
|
|---|
| 38 |
class Task(object): |
|---|
| 39 |
""" |
|---|
| 40 |
I represent a task that has been dispatched to a queue for running with a |
|---|
| 41 |
given scheduling I{niceness}. |
|---|
| 42 |
|
|---|
| 43 |
I generate a C{Deferred} that you fire by calling either my L{callback} or |
|---|
| 44 |
L{errback} with a result or failure, respectively, when the the task is |
|---|
| 45 |
finally run and its result is obtained. You can call the deferred's |
|---|
| 46 |
versions of those methods directly, but my versions deal with things like |
|---|
| 47 |
repeated callbacks, which happen sometimes with task timeouts. |
|---|
| 48 |
|
|---|
| 49 |
@ivar d: A deferred to the eventual result of the task. |
|---|
| 50 |
|
|---|
| 51 |
@ivar series: A hashable object identifying the series of which this task |
|---|
| 52 |
is a part. |
|---|
| 53 |
|
|---|
| 54 |
""" |
|---|
| 55 |
def __init__(self, f, args, kw, priority, series, timeout=None): |
|---|
| 56 |
if not isinstance(args, (tuple, list)): |
|---|
| 57 |
raise TypeError("Second argument 'args' isn't a sequence") |
|---|
| 58 |
if not isinstance(kw, dict): |
|---|
| 59 |
raise TypeError("Third argument 'kw' isn't a dict") |
|---|
| 60 |
self.callTuple = (f, args, kw) |
|---|
| 61 |
self.priority = priority |
|---|
| 62 |
self.series = series |
|---|
| 63 |
self.d = defer.Deferred() |
|---|
| 64 |
self.timeout = timeout |
|---|
| 65 |
|
|---|
| 66 |
def startTimer(self): |
|---|
| 67 |
if self.timeout: |
|---|
| 68 |
self.callID = reactor.callLater(self.timeout, self.timedout) |
|---|
| 69 |
else: |
|---|
| 70 |
self.callID = None |
|---|
| 71 |
|
|---|
| 72 |
def callback(self, result): |
|---|
| 73 |
if self.callID: |
|---|
| 74 |
self.callID.cancel() |
|---|
| 75 |
self.callID = None |
|---|
| 76 |
if not self.d.called: |
|---|
| 77 |
self.d.callback(result) |
|---|
| 78 |
|
|---|
| 79 |
def errback(self, result): |
|---|
| 80 |
if self.callID: |
|---|
| 81 |
self.callID.cancel() |
|---|
| 82 |
self.callID = None |
|---|
| 83 |
self.d.errback(result) |
|---|
| 84 |
|
|---|
| 85 |
def timedout(self): |
|---|
| 86 |
if not self.d.called: |
|---|
| 87 |
self.d.callback((False, "Timeout")) |
|---|
| 88 |
self.callID = None |
|---|
| 89 |
|
|---|
| 90 |
def __repr__(self): |
|---|
| 91 |
""" |
|---|
| 92 |
Gives me an informative string representation |
|---|
| 93 |
""" |
|---|
| 94 |
func = self.callTuple[0] |
|---|
| 95 |
args = ", ".join([str(x) for x in self.callTuple[1]]) |
|---|
| 96 |
kw = "".join( |
|---|
| 97 |
[", %s=%s" % item for item in self.callTuple[2].iteritems()]) |
|---|
| 98 |
if func.__class__.__name__ == "function": |
|---|
| 99 |
funcName = func.__name__ |
|---|
| 100 |
elif callable(func): |
|---|
| 101 |
funcName = "%s.%s" % (func.__class__.__name__, func.__name__) |
|---|
| 102 |
else: |
|---|
| 103 |
funcName = "<worker call> " |
|---|
| 104 |
args = ("%s, " % func) + args |
|---|
| 105 |
return "Task: %s(%s%s)" % (funcName, args, kw) |
|---|
| 106 |
|
|---|
| 107 |
def __cmp__(self, other): |
|---|
| 108 |
""" |
|---|
| 109 |
Numeric comparisons between tasks are based on their priority, with |
|---|
| 110 |
higher (lower-numbered) priorities being considered \"less\" and thus |
|---|
| 111 |
sorted first. |
|---|
| 112 |
|
|---|
| 113 |
A task will always have a higher priority, i.e., be comparatively |
|---|
| 114 |
I{less}, than a C{None} object, which is used as a shutdown signal |
|---|
| 115 |
instead of a task. |
|---|
| 116 |
""" |
|---|
| 117 |
if other is None: |
|---|
| 118 |
return -1 |
|---|
| 119 |
return cmp(self.priority, other.priority) |
|---|
| 120 |
|
|---|
| 121 |
|
|---|
| 122 |
class TaskFactory(object): |
|---|
| 123 |
""" |
|---|
| 124 |
I generate L{Task} instances with the right priority setting for effective |
|---|
| 125 |
scheduling between tasks in one or more concurrently running task series. |
|---|
| 126 |
""" |
|---|
| 127 |
def __init__(self, TaskClass=Task): |
|---|
| 128 |
# Setting a non-default TaskClass is mostly for testing |
|---|
| 129 |
self.TaskClass = TaskClass |
|---|
| 130 |
self.seriesNumbers = {} |
|---|
| 131 |
|
|---|
| 132 |
def new(self, func, args, kw, niceness, series=None, timeout=None): |
|---|
| 133 |
""" |
|---|
| 134 |
Call this to obtain a L{Task} instance that will run in the specified |
|---|
| 135 |
I{series} at a priority reflecting the specified I{niceness}. |
|---|
| 136 |
|
|---|
| 137 |
The equation for priority has been empirically determined as follows:: |
|---|
| 138 |
|
|---|
| 139 |
p = k * (1 + nn**2) |
|---|
| 140 |
|
|---|
| 141 |
where C{k} is an iterator that increments for each new task and C{nn} |
|---|
| 142 |
is niceness normalized from -20...+20 to the range 0...2. |
|---|
| 143 |
|
|---|
| 144 |
@param func: A callable object to run as the task, the result of which |
|---|
| 145 |
will be sent to the callback for the deferred of the task returned by |
|---|
| 146 |
this method when it fires. |
|---|
| 147 |
|
|---|
| 148 |
@param args: A tuple containing any arguments to include in the call. |
|---|
| 149 |
|
|---|
| 150 |
@param kw: A dict containing any keywords to include in the call. |
|---|
| 151 |
|
|---|
| 152 |
""" |
|---|
| 153 |
if not isinstance(niceness, int) or abs(niceness) > 20: |
|---|
| 154 |
raise ValueError( |
|---|
| 155 |
"Niceness must be an integer between -20 and +20") |
|---|
| 156 |
positivized = niceness + 20 |
|---|
| 157 |
priority = self._serial(series) * (1 + (float(positivized)/10)**2) |
|---|
| 158 |
return self.TaskClass(func, args, kw, priority, series, timeout) |
|---|
| 159 |
|
|---|
| 160 |
def _serial(self, series): |
|---|
| 161 |
""" |
|---|
| 162 |
Maintains serial numbers for tasks in one or more separate series, such |
|---|
| 163 |
that the numbers in each series increment independently except that any |
|---|
| 164 |
new series starts at a value greater than the maximum serial number |
|---|
| 165 |
currently found in any series. |
|---|
| 166 |
""" |
|---|
| 167 |
if series not in self.seriesNumbers: |
|---|
| 168 |
eachSeries = [0] + self.seriesNumbers.values() |
|---|
| 169 |
maxCurrentSN = max(eachSeries) |
|---|
| 170 |
self.seriesNumbers[series] = maxCurrentSN |
|---|
| 171 |
self.seriesNumbers[series] += 1 |
|---|
| 172 |
return float(self.seriesNumbers[series]) |
|---|
| 173 |
|
|---|
| 174 |
|
|---|
| 175 |
class Assignment(object): |
|---|
| 176 |
""" |
|---|
| 177 |
I represent the assignment of a single task to whichever worker object |
|---|
| 178 |
accepts me. Deep down, my real role is to provide something to fire the |
|---|
| 179 |
callback of a deferred with instead of just another deferred. |
|---|
| 180 |
|
|---|
| 181 |
@ivar d: A deferred that is instantiated for a given instance of me, which |
|---|
| 182 |
fires when a worker accepts the assigment represented by that instance. |
|---|
| 183 |
|
|---|
| 184 |
""" |
|---|
| 185 |
# We go through a lot of these objects and they're small, so let's make |
|---|
| 186 |
# them cheap to build |
|---|
| 187 |
__slots__ = ['task', 'd'] |
|---|
| 188 |
|
|---|
| 189 |
def __init__(self, task): |
|---|
| 190 |
self.task = task |
|---|
| 191 |
self.d = defer.Deferred() |
|---|
| 192 |
|
|---|
| 193 |
def accept(self, worker): |
|---|
| 194 |
""" |
|---|
| 195 |
Called when the worker accepts the assignment, firing my |
|---|
| 196 |
deferred. |
|---|
| 197 |
|
|---|
| 198 |
@return: Another deferred that fires when the worker is ready to accept |
|---|
| 199 |
B{another} assignment following this one. |
|---|
| 200 |
|
|---|
| 201 |
""" |
|---|
| 202 |
self.d.callback(None) |
|---|
| 203 |
self.task.startTimer() |
|---|
| 204 |
return worker.run(self.task) |
|---|
| 205 |
|
|---|
| 206 |
|
|---|
| 207 |
class AssignmentFactory(object): |
|---|
| 208 |
""" |
|---|
| 209 |
I generate L{Assignment} instances for workers to handle particular tasks. |
|---|
| 210 |
""" |
|---|
| 211 |
def __init__(self): |
|---|
| 212 |
self.waiting = {} |
|---|
| 213 |
self.pending = {} |
|---|
| 214 |
|
|---|
| 215 |
def cancelRequests(self, worker): |
|---|
| 216 |
""" |
|---|
| 217 |
""" |
|---|
| 218 |
for series, dList in getattr(worker, 'assignments', {}).iteritems(): |
|---|
| 219 |
requestsWaiting = self.waiting.get(series, []) |
|---|
| 220 |
for d in dList: |
|---|
| 221 |
if d in requestsWaiting: |
|---|
| 222 |
requestsWaiting.remove(d) |
|---|
| 223 |
|
|---|
| 224 |
def request(self, worker, series): |
|---|
| 225 |
""" |
|---|
| 226 |
Called to request a new assignment in the specified I{series} of tasks |
|---|
| 227 |
for the supplied I{worker}. |
|---|
| 228 |
|
|---|
| 229 |
When a new assignment in the series is finally ready in the queue for |
|---|
| 230 |
this worker, the deferred for the assignment request will fire with an |
|---|
| 231 |
instance of me that has been constructed with the task to be assigned. |
|---|
| 232 |
|
|---|
| 233 |
If the worker is still gainfully employed when it accepts the |
|---|
| 234 |
assignment, and is not just wrapping up its work after having been |
|---|
| 235 |
fired, the worker will request another assignment when it finishes the |
|---|
| 236 |
task. |
|---|
| 237 |
""" |
|---|
| 238 |
def accept(assignment, d_request): |
|---|
| 239 |
worker.assignments[series].remove(d_request) |
|---|
| 240 |
if isinstance(assignment, Assignment): |
|---|
| 241 |
d = assignment.accept(worker) |
|---|
| 242 |
if worker.hired: |
|---|
| 243 |
d.addCallback(lambda _: self.request(worker, series)) |
|---|
| 244 |
return d |
|---|
| 245 |
|
|---|
| 246 |
assignments = getattr(worker, 'assignments', {}) |
|---|
| 247 |
if self.pending.get(series, []): |
|---|
| 248 |
d = defer.succeed(self.pending[series].pop(0)) |
|---|
| 249 |
else: |
|---|
| 250 |
d = defer.Deferred() |
|---|
| 251 |
self.waiting.setdefault(series, []).append(d) |
|---|
| 252 |
assignments.setdefault(series, []).append(d) |
|---|
| 253 |
worker.assignments = assignments |
|---|
| 254 |
# The callback is added to the deferred *after* being appended to the |
|---|
| 255 |
# worker's assignments list for this series. That way, we know that the |
|---|
| 256 |
# callback will be able to remove the deferred even if the deferred |
|---|
| 257 |
# fires immediately due to the queue having a surplus of assignments. |
|---|
| 258 |
d.addCallback(accept, d) |
|---|
| 259 |
|
|---|
| 260 |
def new(self, task): |
|---|
| 261 |
""" |
|---|
| 262 |
Creates and queues a new assignment for the supplied I{task}, returning |
|---|
| 263 |
a deferred that fires when the assignment has been accepted. |
|---|
| 264 |
""" |
|---|
| 265 |
series = task.series |
|---|
| 266 |
assignment = Assignment(task) |
|---|
| 267 |
if self.waiting.get(series, []): |
|---|
| 268 |
self.waiting[series].pop(0).callback(assignment) |
|---|
| 269 |
else: |
|---|
| 270 |
self.pending.setdefault(series, []).append(assignment) |
|---|
| 271 |
return assignment.d |
|---|
| 272 |
|
|---|
| 273 |
|
|---|
| 274 |
class WorkerManager(object): |
|---|
| 275 |
""" |
|---|
| 276 |
I manage one or more providers of L{IWorker} for a particular instance of |
|---|
| 277 |
L{TaskQueue}. |
|---|
| 278 |
|
|---|
| 279 |
When a new worker is hired with my L{hire} method, I run the |
|---|
| 280 |
L{Assignment.request} class method to request that the worker be assigned a |
|---|
| 281 |
task from the queue of each task series for which it is qualified. |
|---|
| 282 |
|
|---|
| 283 |
When the worker finally gets the assignment, it fires the L{Assignment} |
|---|
| 284 |
object's internal deferred with a reference to itself. That is my cue to |
|---|
| 285 |
have the worker run the assigned task and request another assignment of a |
|---|
| 286 |
task in the same series when it's done, unless I've terminated the worker |
|---|
| 287 |
in the meantime. |
|---|
| 288 |
|
|---|
| 289 |
Each worker object maintains a dictionary of deferreds for each of its |
|---|
| 290 |
outstanding assignment requests so that I can cancel them if I terminate |
|---|
| 291 |
the worker. Then I can effectively cancel the assignment requests by firing |
|---|
| 292 |
the deferreds with fake, no-task assignments. See my L{terminate} method. |
|---|
| 293 |
|
|---|
| 294 |
@ivar workers: A C{dict} of worker objects that are currently employed by |
|---|
| 295 |
me, keyed by a unique integer ID code for each worker. |
|---|
| 296 |
|
|---|
| 297 |
""" |
|---|
| 298 |
def __init__(self): |
|---|
| 299 |
self.workers = {} |
|---|
| 300 |
self.assignmentFactory = AssignmentFactory() |
|---|
| 301 |
|
|---|
| 302 |
def shutdown(self, timeout=None): |
|---|
| 303 |
""" |
|---|
| 304 |
Shutdown all my workers, then fire them, in turn. Returns a |
|---|
| 305 |
deferred that fires when my entire work force has been |
|---|
| 306 |
terminated. The deferred result is a list of all tasks, if |
|---|
| 307 |
any, that were left unfinished by the work force. |
|---|
| 308 |
""" |
|---|
| 309 |
def gotResults(results): |
|---|
| 310 |
unfinishedTasks = [] |
|---|
| 311 |
for result in results: |
|---|
| 312 |
unfinishedTasks.extend(result) |
|---|
| 313 |
return unfinishedTasks |
|---|
| 314 |
|
|---|
| 315 |
dList = [] |
|---|
| 316 |
for workerID in self.workers.keys(): |
|---|
| 317 |
d = self.terminate(workerID, timeout=timeout) |
|---|
| 318 |
dList.append(d) |
|---|
| 319 |
return defer.gatherResults(dList).addCallback(gotResults) |
|---|
| 320 |
|
|---|
| 321 |
def hire(self, worker): |
|---|
| 322 |
""" |
|---|
| 323 |
Adds a new worker to my work force. |
|---|
| 324 |
|
|---|
| 325 |
Makes sure that there is an assignment request queue for each task |
|---|
| 326 |
series for which the worker is qualified, then has the new worker |
|---|
| 327 |
request an initial assignment from each queue. |
|---|
| 328 |
|
|---|
| 329 |
The method generates an integer ID uniquely identifying the worker, and |
|---|
| 330 |
gives the worker an C{ID} attribute with the ID for its own reference, |
|---|
| 331 |
The ID is returned as well. |
|---|
| 332 |
""" |
|---|
| 333 |
if not IWorker.providedBy(worker): |
|---|
| 334 |
raise ImplementationError( |
|---|
| 335 |
"'%s' doesn't provide the IWorker interface" % worker) |
|---|
| 336 |
IWorker.validateInvariants(worker) |
|---|
| 337 |
|
|---|
| 338 |
worker.hired = True |
|---|
| 339 |
worker.assignments = {} |
|---|
| 340 |
qualifications = [None] +\ |
|---|
| 341 |
getattr(worker, 'cQualified', []) +\ |
|---|
| 342 |
getattr(worker, 'iQualified', []) |
|---|
| 343 |
for series in qualifications: |
|---|
| 344 |
self.assignmentFactory.request(worker, series) |
|---|
| 345 |
workerID = worker.ID = getattr(self, '_workerCounter', 0) + 1 |
|---|
| 346 |
self._workerCounter = workerID |
|---|
| 347 |
self.workers[workerID] = worker |
|---|
| 348 |
worker.setResignator( |
|---|
| 349 |
lambda : self.terminate(worker.ID, crash=True, reassign=True)) |
|---|
| 350 |
return workerID |
|---|
| 351 |
|
|---|
| 352 |
def terminate(self, workerID, timeout=None, crash=False, reassign=False): |
|---|
| 353 |
""" |
|---|
| 354 |
Removes a worker from my work force, canceling all of its unfullfilled |
|---|
| 355 |
assignment requests back from the queue and then attempting to shut it |
|---|
| 356 |
down gracefully via its C{stop} method. |
|---|
| 357 |
|
|---|
| 358 |
The I{timeout} keyword can be set to a number of seconds after which |
|---|
| 359 |
the worker will be terminated rudely via its C{crash} method if it |
|---|
| 360 |
hasn't shut down gracefully by then. If the I{crash} keyword is set |
|---|
| 361 |
C{True}, the worker is crashed right away without waiting for it to run |
|---|
| 362 |
through its pending tasks. |
|---|
| 363 |
|
|---|
| 364 |
@return: A deferred that fires when the worker has been removed, |
|---|
| 365 |
gracefully or not, with a list of any tasks left unfinished and not |
|---|
| 366 |
reassigned. |
|---|
| 367 |
|
|---|
| 368 |
""" |
|---|
| 369 |
def crashTheWorker(worker, d): |
|---|
| 370 |
unfinished = worker.crash() |
|---|
| 371 |
# Fire deferred with list of unfinished tasks |
|---|
| 372 |
d.callback(unfinished) |
|---|
| 373 |
|
|---|
| 374 |
def stopped(result): |
|---|
| 375 |
if callID.active(): |
|---|
| 376 |
callID.cancel() |
|---|
| 377 |
# No tasks left unfinished if deferred fires normally |
|---|
| 378 |
return [] |
|---|
| 379 |
return result |
|---|
| 380 |
|
|---|
| 381 |
def reassignTasks(tasks): |
|---|
| 382 |
for task in tasks: |
|---|
| 383 |
self.assignmentFactory.new(task) |
|---|
| 384 |
return [] |
|---|
| 385 |
|
|---|
| 386 |
worker = self.workers.pop(workerID, None) |
|---|
| 387 |
if worker is None: |
|---|
| 388 |
return defer.succeed([]) |
|---|
| 389 |
worker.hired = False |
|---|
| 390 |
self.assignmentFactory.cancelRequests(worker) |
|---|
| 391 |
if crash: |
|---|
| 392 |
d = defer.succeed(worker.crash()) |
|---|
| 393 |
else: |
|---|
| 394 |
d = worker.stop() |
|---|
| 395 |
if timeout: |
|---|
| 396 |
callID = reactor.callLater(timeout, crashTheWorker, worker, d) |
|---|
| 397 |
d.addCallback(stopped) |
|---|
| 398 |
else: |
|---|
| 399 |
# No tasks left unfinished if deferred fires without timeout |
|---|
| 400 |
d.addCallback(lambda _: []) |
|---|
| 401 |
if reassign: |
|---|
| 402 |
d.addCallback(reassignTasks) |
|---|
| 403 |
return d |
|---|
| 404 |
|
|---|
| 405 |
def assignment(self, task): |
|---|
| 406 |
""" |
|---|
| 407 |
Generates a new assignment for the supplied I{task}. |
|---|
| 408 |
|
|---|
| 409 |
If the worker that runs the task is still working for me when it |
|---|
| 410 |
becomes ready for another task following this one, an assignment |
|---|
| 411 |
request will be entered for it to obtain another task of the same |
|---|
| 412 |
series. |
|---|
| 413 |
|
|---|
| 414 |
@return: A deferred that fires when the task has been assigned to a |
|---|
| 415 |
worker and it has accepted the assignment. |
|---|
| 416 |
|
|---|
| 417 |
""" |
|---|
| 418 |
return self.assignmentFactory.new(task) |
|---|