| 1 |
# AsynQueue: |
|---|
| 2 |
# Asynchronous task queueing based on the Twisted framework, with task |
|---|
| 3 |
# prioritization and a powerful worker/manager interface. |
|---|
| 4 |
# |
|---|
| 5 |
# Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com |
|---|
| 6 |
# |
|---|
| 7 |
# This program is free software; you can redistribute it and/or modify it under |
|---|
| 8 |
# the terms of the GNU General Public License as published by the Free Software |
|---|
| 9 |
# Foundation; either version 2 of the License, or (at your option) any later |
|---|
| 10 |
# version. |
|---|
| 11 |
# |
|---|
| 12 |
# This program is distributed in the hope that it will be useful, but WITHOUT |
|---|
| 13 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 14 |
# FOR A PARTICULAR PURPOSE. See the file COPYING for more details. |
|---|
| 15 |
# |
|---|
| 16 |
# You should have received a copy of the GNU General Public License along with |
|---|
| 17 |
# this program; if not, write to the Free Software Foundation, Inc., 51 |
|---|
| 18 |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA |
|---|
| 19 |
|
|---|
| 20 |
""" |
|---|
| 21 |
The task queue and its immediate support staff. |
|---|
| 22 |
""" |
|---|
| 23 |
|
|---|
| 24 |
# Imports |
|---|
| 25 |
import heapq |
|---|
| 26 |
from zope.interface import implements |
|---|
| 27 |
from twisted.python import failure |
|---|
| 28 |
from twisted.internet import reactor, interfaces, defer |
|---|
| 29 |
# Use C Deferreds if possible, for efficiency |
|---|
| 30 |
try: |
|---|
| 31 |
from twisted.internet import cdefer |
|---|
| 32 |
except: |
|---|
| 33 |
pass |
|---|
| 34 |
else: |
|---|
| 35 |
defer.Deferred = cdefer.Deferred |
|---|
| 36 |
|
|---|
| 37 |
import tasks |
|---|
| 38 |
from errors import QueueRunError, ImplementationError |
|---|
| 39 |
|
|---|
| 40 |
|
|---|
| 41 |
class Priority(object): |
|---|
| 42 |
""" |
|---|
| 43 |
I provide simple, asynchronous access to a priority heap. |
|---|
| 44 |
""" |
|---|
| 45 |
def __init__(self): |
|---|
| 46 |
self.heap = [] |
|---|
| 47 |
self.pendingGetCalls = [] |
|---|
| 48 |
|
|---|
| 49 |
def shutdown(self): |
|---|
| 50 |
""" |
|---|
| 51 |
Shuts down the priority heap, firing errbacks of the deferreds of any |
|---|
| 52 |
get requests that will not be fulfilled. |
|---|
| 53 |
""" |
|---|
| 54 |
if self.pendingGetCalls: |
|---|
| 55 |
msg = "No more items forthcoming" |
|---|
| 56 |
theFailure = failure.Failure(QueueRunError(msg)) |
|---|
| 57 |
for d in self.pendingGetCalls: |
|---|
| 58 |
d.errback(theFailure) |
|---|
| 59 |
|
|---|
| 60 |
def get(self): |
|---|
| 61 |
""" |
|---|
| 62 |
Gets an item with the highest priority (lowest value) from the heap, |
|---|
| 63 |
returning a deferred that fires when the item becomes available. |
|---|
| 64 |
""" |
|---|
| 65 |
if len(self.heap): |
|---|
| 66 |
d = defer.succeed(heapq.heappop(self.heap)) |
|---|
| 67 |
else: |
|---|
| 68 |
d = defer.Deferred() |
|---|
| 69 |
self.pendingGetCalls.insert(0, d) |
|---|
| 70 |
return d |
|---|
| 71 |
|
|---|
| 72 |
def put(self, item): |
|---|
| 73 |
""" |
|---|
| 74 |
Adds the supplied I{item} to the heap, firing the oldest getter |
|---|
| 75 |
deferred if any L{get} calls are pending. |
|---|
| 76 |
""" |
|---|
| 77 |
heapq.heappush(self.heap, item) |
|---|
| 78 |
if len(self.pendingGetCalls): |
|---|
| 79 |
d = self.pendingGetCalls.pop() |
|---|
| 80 |
d.callback(heapq.heappop(self.heap)) |
|---|
| 81 |
|
|---|
| 82 |
def cancel(self, selector): |
|---|
| 83 |
""" |
|---|
| 84 |
Removes all pending items from the heap that the supplied I{selector} |
|---|
| 85 |
function selects. The function must take an item as its sole argument |
|---|
| 86 |
and return C{True} if it selects the item for queue removal. |
|---|
| 87 |
""" |
|---|
| 88 |
for item in self.heap: |
|---|
| 89 |
if selector(item): |
|---|
| 90 |
self.heap.remove(item) |
|---|
| 91 |
# Fix up the possibly mangled heap list |
|---|
| 92 |
heapq.heapify(self.heap) |
|---|
| 93 |
|
|---|
| 94 |
|
|---|
| 95 |
class LoadInfoProducer(object): |
|---|
| 96 |
""" |
|---|
| 97 |
I produce information about the current load of a task queue. The |
|---|
| 98 |
information consists of the number of tasks currently queued, and |
|---|
| 99 |
is written as a single integer to my consumers as a single integer |
|---|
| 100 |
whenever a task is queued up and again when it is completed. |
|---|
| 101 |
|
|---|
| 102 |
@ivar consumer: A list of the consumers for whom I'm producing |
|---|
| 103 |
information. |
|---|
| 104 |
|
|---|
| 105 |
""" |
|---|
| 106 |
implements(interfaces.IPushProducer) |
|---|
| 107 |
|
|---|
| 108 |
def __init__(self): |
|---|
| 109 |
self.queued = 0 |
|---|
| 110 |
self.producing = True |
|---|
| 111 |
self.consumers = [] |
|---|
| 112 |
|
|---|
| 113 |
def registerConsumer(self, consumer): |
|---|
| 114 |
""" |
|---|
| 115 |
Call this with a provider of I{interfaces.IConsumer} and I'll |
|---|
| 116 |
produce for it in addition to any others already registered |
|---|
| 117 |
with me. |
|---|
| 118 |
""" |
|---|
| 119 |
consumer.registerProducer(self, True) |
|---|
| 120 |
self.consumers.append(consumer) |
|---|
| 121 |
|
|---|
| 122 |
def shutdown(self): |
|---|
| 123 |
""" |
|---|
| 124 |
Stop me from producing and |
|---|
| 125 |
""" |
|---|
| 126 |
self.producing = False |
|---|
| 127 |
for consumer in self.consumers: |
|---|
| 128 |
consumer.unregisterProducer() |
|---|
| 129 |
|
|---|
| 130 |
def oneLess(self): |
|---|
| 131 |
self._update(-1) |
|---|
| 132 |
|
|---|
| 133 |
def oneMore(self): |
|---|
| 134 |
self._update(+1) |
|---|
| 135 |
|
|---|
| 136 |
def _update(self, increment): |
|---|
| 137 |
self.queued += increment |
|---|
| 138 |
if self.queued < 0: |
|---|
| 139 |
self.queued = 0 |
|---|
| 140 |
if self.producing: |
|---|
| 141 |
for consumer in self.consumers: |
|---|
| 142 |
consumer.write(self.queued) |
|---|
| 143 |
|
|---|
| 144 |
#--- IPushProducer implementation ----------------------------------------- |
|---|
| 145 |
|
|---|
| 146 |
def pauseProducing(self): |
|---|
| 147 |
self.producing = False |
|---|
| 148 |
|
|---|
| 149 |
def resumeProducing(self): |
|---|
| 150 |
self.producing = True |
|---|
| 151 |
|
|---|
| 152 |
def stopProducing(self): |
|---|
| 153 |
self.shutdown() |
|---|
| 154 |
|
|---|
| 155 |
|
|---|
| 156 |
class TaskQueue(object): |
|---|
| 157 |
""" |
|---|
| 158 |
I am a task queue for dispatching arbitrary callables to be run by one or |
|---|
| 159 |
more worker objects. |
|---|
| 160 |
|
|---|
| 161 |
You can construct me with one or more workers, or you can attach them later |
|---|
| 162 |
with L{attachWorker}, in which you'll receive an ID that you can use to |
|---|
| 163 |
detach the worker. |
|---|
| 164 |
|
|---|
| 165 |
@keyword timeout: A number of seconds after which to more drastically |
|---|
| 166 |
terminate my workers if they haven't gracefully shut down by that point. |
|---|
| 167 |
|
|---|
| 168 |
@keyword warn: Set this option C{True} to only warn that a call |
|---|
| 169 |
made after queue shutdown is being ignored, rather than raising |
|---|
| 170 |
an exception. |
|---|
| 171 |
|
|---|
| 172 |
""" |
|---|
| 173 |
def __init__(self, *args, **kw): |
|---|
| 174 |
self._taskFactory = tasks.TaskFactory() |
|---|
| 175 |
self.mgr = tasks.WorkerManager() |
|---|
| 176 |
self.heap = Priority() |
|---|
| 177 |
self.loadInfoProducer = LoadInfoProducer() |
|---|
| 178 |
for worker in args: |
|---|
| 179 |
self.attachWorker(worker) |
|---|
| 180 |
self._startup() |
|---|
| 181 |
self.timeout = kw.get('timeout', None) |
|---|
| 182 |
self.warnOnly = kw.get('warn', False) |
|---|
| 183 |
|
|---|
| 184 |
def _startup(self): |
|---|
| 185 |
""" |
|---|
| 186 |
Starts up a L{defer.deferredGenerator} that runs the queue. This method |
|---|
| 187 |
can only be run once, by the constructor upon instantiation. |
|---|
| 188 |
""" |
|---|
| 189 |
@defer.deferredGenerator |
|---|
| 190 |
def runner(): |
|---|
| 191 |
while True: |
|---|
| 192 |
wfd = defer.waitForDeferred(self.heap.get()) |
|---|
| 193 |
yield wfd |
|---|
| 194 |
task = wfd.getResult() |
|---|
| 195 |
if task is None: |
|---|
| 196 |
break |
|---|
| 197 |
wfd = defer.waitForDeferred(self.mgr.assignment(task)) |
|---|
| 198 |
yield wfd; wfd.getResult() |
|---|
| 199 |
# Clean up after the loop exits |
|---|
| 200 |
wfd = defer.waitForDeferred(self.mgr.shutdown(self.timeout)) |
|---|
| 201 |
yield wfd |
|---|
| 202 |
self.heap.shutdown() |
|---|
| 203 |
# The result of the runner is a list of any unfinished tasks. |
|---|
| 204 |
result = [] |
|---|
| 205 |
try: |
|---|
| 206 |
result = wfd.getResult() |
|---|
| 207 |
except: |
|---|
| 208 |
pass |
|---|
| 209 |
yield result |
|---|
| 210 |
|
|---|
| 211 |
if self.isRunning(): |
|---|
| 212 |
raise QueueRunError("Startup only occurs upon instantiation") |
|---|
| 213 |
self._d = runner() |
|---|
| 214 |
self._triggerID = reactor.addSystemEventTrigger( |
|---|
| 215 |
'before', 'shutdown', self.shutdown) |
|---|
| 216 |
|
|---|
| 217 |
def isRunning(self): |
|---|
| 218 |
""" |
|---|
| 219 |
Returns C{True} if the queue is running, C{False} otherwise. |
|---|
| 220 |
""" |
|---|
| 221 |
return hasattr(self, '_triggerID') |
|---|
| 222 |
|
|---|
| 223 |
def shutdown(self): |
|---|
| 224 |
""" |
|---|
| 225 |
Initiates a shutdown of the queue by putting a lowest-possible priority |
|---|
| 226 |
C{None} object onto the priority heap instead of a task. |
|---|
| 227 |
|
|---|
| 228 |
@return: A deferred that fires when all the workers have shut |
|---|
| 229 |
down, with a list of any tasks left unfinished in the queue. |
|---|
| 230 |
|
|---|
| 231 |
""" |
|---|
| 232 |
def cleanup(unfinishedTasks): |
|---|
| 233 |
if hasattr(self, '_triggerID'): |
|---|
| 234 |
reactor.removeSystemEventTrigger(self._triggerID) |
|---|
| 235 |
del self._triggerID |
|---|
| 236 |
return unfinishedTasks |
|---|
| 237 |
|
|---|
| 238 |
if self.isRunning(): |
|---|
| 239 |
self.heap.put(None) |
|---|
| 240 |
d = self._d |
|---|
| 241 |
else: |
|---|
| 242 |
d = defer.succeed([]) |
|---|
| 243 |
d.addCallback(cleanup) |
|---|
| 244 |
return d |
|---|
| 245 |
|
|---|
| 246 |
def attachWorker(self, worker): |
|---|
| 247 |
""" |
|---|
| 248 |
Registers a new provider of IWorker for working on tasks from |
|---|
| 249 |
the queue, returning an integer ID that uniquely identifies |
|---|
| 250 |
the worker. |
|---|
| 251 |
|
|---|
| 252 |
See L{WorkerManager.hire}. |
|---|
| 253 |
""" |
|---|
| 254 |
return self.mgr.hire(worker) |
|---|
| 255 |
|
|---|
| 256 |
def _getWorkerID(self, workerOrID): |
|---|
| 257 |
if workerOrID in self.mgr.workers: |
|---|
| 258 |
return workerOrID |
|---|
| 259 |
for thisID, worker in self.mgr.workers.iteritems(): |
|---|
| 260 |
if worker == workerOrID: |
|---|
| 261 |
return thisID |
|---|
| 262 |
|
|---|
| 263 |
def detachWorker(self, workerOrID, reassign=False, crash=False): |
|---|
| 264 |
""" |
|---|
| 265 |
Detaches and terminates the worker supplied or specified by its ID. |
|---|
| 266 |
|
|---|
| 267 |
If I{reassign} is set C{True}, any tasks left unfinished by |
|---|
| 268 |
the worker are put into new assignments for other or future |
|---|
| 269 |
workers. Otherwise, they are returned via the deferred's |
|---|
| 270 |
callback. |
|---|
| 271 |
|
|---|
| 272 |
See L{tasks.WorkerManager.terminate}. |
|---|
| 273 |
""" |
|---|
| 274 |
ID = self._getWorkerID(workerOrID) |
|---|
| 275 |
if ID is None: |
|---|
| 276 |
return |
|---|
| 277 |
if crash: |
|---|
| 278 |
d = self.mgr.terminate(ID, crash=True, reassign=reassign) |
|---|
| 279 |
else: |
|---|
| 280 |
d = self.mgr.terminate(ID, self.timeout, reassign=reassign) |
|---|
| 281 |
return d |
|---|
| 282 |
|
|---|
| 283 |
def qualifyWorker(self, worker, series): |
|---|
| 284 |
""" |
|---|
| 285 |
Adds the specified I{series} to the qualifications of the supplied |
|---|
| 286 |
I{worker}. |
|---|
| 287 |
""" |
|---|
| 288 |
if series not in worker.iQualified: |
|---|
| 289 |
worker.iQualified.append(series) |
|---|
| 290 |
self.mgr.assignmentFactory.request(worker, series) |
|---|
| 291 |
|
|---|
| 292 |
def workers(self, ID=None): |
|---|
| 293 |
""" |
|---|
| 294 |
Returns the worker object specified by I{ID}, or C{None} if that worker |
|---|
| 295 |
is not employed with me. |
|---|
| 296 |
|
|---|
| 297 |
If no ID is specified, a list of the workers currently attached, in no |
|---|
| 298 |
particular order, will be returned instead. |
|---|
| 299 |
""" |
|---|
| 300 |
if ID is None: |
|---|
| 301 |
return self.mgr.workers.values() |
|---|
| 302 |
return self.mgr.workers.get(ID, None) |
|---|
| 303 |
|
|---|
| 304 |
def call(self, func, *args, **kw): |
|---|
| 305 |
""" |
|---|
| 306 |
Puts a call to I{func} with any supplied arguments and keywords into |
|---|
| 307 |
the pipeline, returning a deferred to the eventual result of the call |
|---|
| 308 |
when it is eventually pulled from the pipeline and run. |
|---|
| 309 |
|
|---|
| 310 |
Scheduling of the call is impacted by the I{niceness} keyword that can |
|---|
| 311 |
be included in addition to any keywords for the call. As with UNIX |
|---|
| 312 |
niceness, the value should be an integer where 0 is normal scheduling, |
|---|
| 313 |
negative numbers are higher priority, and positive numbers are lower |
|---|
| 314 |
priority. |
|---|
| 315 |
|
|---|
| 316 |
Tasks in a series of tasks all having niceness N+10 are dequeued and |
|---|
| 317 |
run at approximately half the rate of tasks in another series with |
|---|
| 318 |
niceness N. |
|---|
| 319 |
|
|---|
| 320 |
@keyword niceness: Scheduling niceness, an integer between -20 and 20, |
|---|
| 321 |
with lower numbers having higher scheduling priority as in UNIX |
|---|
| 322 |
C{nice} and C{renice}. |
|---|
| 323 |
|
|---|
| 324 |
@keyword series: A hashable object uniquely identifying a series for |
|---|
| 325 |
this task. Tasks of multiple different series will be run with |
|---|
| 326 |
somewhat concurrent scheduling between the series even if they are |
|---|
| 327 |
dumped into the queue in big batches, whereas tasks within a single |
|---|
| 328 |
series will always run in sequence (except for niceness adjustments). |
|---|
| 329 |
|
|---|
| 330 |
@keyword doNext: Set C{True} to assign highest possible priority, even |
|---|
| 331 |
higher than a deeply queued task with niceness = -20. |
|---|
| 332 |
|
|---|
| 333 |
@keyword doLast: Set C{True} to assign priority so low that any |
|---|
| 334 |
other-priority task gets run before this one, no matter how long this |
|---|
| 335 |
task has been queued up. |
|---|
| 336 |
|
|---|
| 337 |
@keyword timeout: A timeout interval in seconds from when a worker gets |
|---|
| 338 |
a task assignment for the call, after which the call will be retried. |
|---|
| 339 |
|
|---|
| 340 |
""" |
|---|
| 341 |
def oneLessPending(result): |
|---|
| 342 |
self.loadInfoProducer.oneLess() |
|---|
| 343 |
return result |
|---|
| 344 |
|
|---|
| 345 |
if not self.isRunning(): |
|---|
| 346 |
if self.warnOnly: |
|---|
| 347 |
argString = ", ".join([str(x) for x in args]) |
|---|
| 348 |
kwString = ", ".join( |
|---|
| 349 |
["%s=%s" % (str(name), str(value)) |
|---|
| 350 |
for name, value in kw.iteritems()]) |
|---|
| 351 |
if args and kw: |
|---|
| 352 |
sepString = ", " |
|---|
| 353 |
else: |
|---|
| 354 |
sepString = "" |
|---|
| 355 |
print "Queue shut down, ignoring call\n %s(%s%s%s)\n" \ |
|---|
| 356 |
% (str(func), argString, sepString, kwString) |
|---|
| 357 |
else: |
|---|
| 358 |
raise QueueRunError( |
|---|
| 359 |
"Cannot call after the queue has been shut down") |
|---|
| 360 |
self.loadInfoProducer.oneMore() |
|---|
| 361 |
niceness = kw.pop('niceness', 0) |
|---|
| 362 |
series = kw.pop('series', None) |
|---|
| 363 |
timeout = kw.pop('timeout', None) |
|---|
| 364 |
task = self._taskFactory.new(func, args, kw, niceness, series, timeout) |
|---|
| 365 |
if kw.pop('doNext', False): |
|---|
| 366 |
task.priority = -1000000 |
|---|
| 367 |
elif kw.pop('doLast', False): |
|---|
| 368 |
task.priority = 1000000 |
|---|
| 369 |
self.heap.put(task) |
|---|
| 370 |
task.d.addBoth(oneLessPending) |
|---|
| 371 |
return task.d |
|---|
| 372 |
|
|---|
| 373 |
def cancelSeries(self, series): |
|---|
| 374 |
""" |
|---|
| 375 |
Cancels any pending tasks in the specified I{series}, unceremoniously |
|---|
| 376 |
removing them from the queue. |
|---|
| 377 |
""" |
|---|
| 378 |
self.heap.cancel(lambda item: getattr(item, 'series', None) == series) |
|---|
| 379 |
|
|---|
| 380 |
def subscribe(self, consumer): |
|---|
| 381 |
""" |
|---|
| 382 |
Subscribes the supplied provider of L{interfaces.IConsumer} |
|---|
| 383 |
to updates on the number of tasks queued whenever it goes up or down. |
|---|
| 384 |
|
|---|
| 385 |
The figure is the integer number of calls currently pending, i.e., the |
|---|
| 386 |
number of tasks that have been queued up but haven't yet been called |
|---|
| 387 |
plus those that have been called but haven't yet returned a result. |
|---|
| 388 |
""" |
|---|
| 389 |
if interfaces.IConsumer.providedBy(consumer): |
|---|
| 390 |
self.loadInfoProducer.registerConsumer(consumer) |
|---|
| 391 |
else: |
|---|
| 392 |
raise ImplementationError( |
|---|
| 393 |
"Object doesn't provide the IConsumer interface") |
|---|