root/projects/AsynQueue/trunk/asynqueue/base.py

Revision 126, 14.1 kB (checked in by edsuom, 9 months ago)

Worker specialties, improved timouts

Line 
1 # AsynQueue:
2 # Asynchronous task queueing based on the Twisted framework, with task
3 # prioritization and a powerful worker/manager interface.
4 #
5 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
6 #
7 # This program is free software; you can redistribute it and/or modify it under
8 # the terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # This program is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # this program; if not, write to the Free Software Foundation, Inc., 51
18 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
19
20 """
21 The task queue and its immediate support staff.
22 """
23
24 # Imports
25 import heapq
26 from zope.interface import implements
27 from twisted.python import failure
28 from twisted.internet import reactor, interfaces, defer
29 # Use C Deferreds if possible, for efficiency
30 try:
31     from twisted.internet import cdefer
32 except:
33     pass
34 else:
35     defer.Deferred = cdefer.Deferred
36
37 import tasks
38 from errors import QueueRunError, ImplementationError
39
40
41 class Priority(object):
42     """
43     I provide simple, asynchronous access to a priority heap.
44     """
45     def __init__(self):
46         self.heap = []
47         self.pendingGetCalls = []
48
49     def shutdown(self):
50         """
51         Shuts down the priority heap, firing errbacks of the deferreds of any
52         get requests that will not be fulfilled.
53         """
54         if self.pendingGetCalls:
55             msg = "No more items forthcoming"
56             theFailure = failure.Failure(QueueRunError(msg))
57             for d in self.pendingGetCalls:
58                 d.errback(theFailure)
59    
60     def get(self):
61         """
62         Gets an item with the highest priority (lowest value) from the heap,
63         returning a deferred that fires when the item becomes available.
64         """
65         if len(self.heap):
66             d = defer.succeed(heapq.heappop(self.heap))
67         else:
68             d = defer.Deferred()
69             self.pendingGetCalls.insert(0, d)
70         return d
71    
72     def put(self, item):
73         """
74         Adds the supplied I{item} to the heap, firing the oldest getter
75         deferred if any L{get} calls are pending.
76         """
77         heapq.heappush(self.heap, item)
78         if len(self.pendingGetCalls):
79             d = self.pendingGetCalls.pop()
80             d.callback(heapq.heappop(self.heap))
81
82     def cancel(self, selector):
83         """
84         Removes all pending items from the heap that the supplied I{selector}
85         function selects. The function must take an item as its sole argument
86         and return C{True} if it selects the item for queue removal.
87         """
88         for item in self.heap:
89             if selector(item):
90                 self.heap.remove(item)
91         # Fix up the possibly mangled heap list
92         heapq.heapify(self.heap)
93
94
95 class LoadInfoProducer(object):
96     """
97     I produce information about the current load of a task queue. The
98     information consists of the number of tasks currently queued, and
99     is written as a single integer to my consumers as a single integer
100     whenever a task is queued up and again when it is completed.
101
102     @ivar consumer: A list of the consumers for whom I'm producing
103       information.
104     
105     """
106     implements(interfaces.IPushProducer)
107    
108     def __init__(self):
109         self.queued = 0
110         self.producing = True
111         self.consumers = []
112
113     def registerConsumer(self, consumer):
114         """
115         Call this with a provider of I{interfaces.IConsumer} and I'll
116         produce for it in addition to any others already registered
117         with me.
118         """
119         consumer.registerProducer(self, True)
120         self.consumers.append(consumer)
121    
122     def shutdown(self):
123         """
124         Stop me from producing and
125         """
126         self.producing = False
127         for consumer in self.consumers:
128             consumer.unregisterProducer()
129    
130     def oneLess(self):
131         self._update(-1)
132    
133     def oneMore(self):
134         self._update(+1)
135    
136     def _update(self, increment):
137         self.queued += increment
138         if self.queued < 0:
139             self.queued = 0
140         if self.producing:
141             for consumer in self.consumers:
142                 consumer.write(self.queued)
143    
144     #--- IPushProducer implementation -----------------------------------------
145    
146     def pauseProducing(self):
147         self.producing = False
148    
149     def resumeProducing(self):
150         self.producing = True
151    
152     def stopProducing(self):
153         self.shutdown()
154
155
156 class TaskQueue(object):
157     """
158     I am a task queue for dispatching arbitrary callables to be run by one or
159     more worker objects.
160
161     You can construct me with one or more workers, or you can attach them later
162     with L{attachWorker}, in which you'll receive an ID that you can use to
163     detach the worker.
164
165     @keyword timeout: A number of seconds after which to more drastically
166       terminate my workers if they haven't gracefully shut down by that point.
167
168     @keyword warn: Set this option C{True} to only warn that a call
169       made after queue shutdown is being ignored, rather than raising
170       an exception.
171
172     """
173     def __init__(self, *args, **kw):
174         self._taskFactory = tasks.TaskFactory()
175         self.mgr = tasks.WorkerManager()
176         self.heap = Priority()
177         self.loadInfoProducer = LoadInfoProducer()
178         for worker in args:
179             self.attachWorker(worker)
180         self._startup()
181         self.timeout = kw.get('timeout', None)
182         self.warnOnly = kw.get('warn', False)
183
184     def _startup(self):
185         """
186         Starts up a L{defer.deferredGenerator} that runs the queue. This method
187         can only be run once, by the constructor upon instantiation.
188         """
189         @defer.deferredGenerator
190         def runner():
191             while True:
192                 wfd = defer.waitForDeferred(self.heap.get())
193                 yield wfd
194                 task = wfd.getResult()
195                 if task is None:
196                     break
197                 wfd = defer.waitForDeferred(self.mgr.assignment(task))
198                 yield wfd; wfd.getResult()
199             # Clean up after the loop exits
200             wfd = defer.waitForDeferred(self.mgr.shutdown(self.timeout))
201             yield wfd
202             self.heap.shutdown()
203             # The result of the runner is a list of any unfinished tasks.
204             result = []
205             try:
206                 result = wfd.getResult()
207             except:
208                 pass
209             yield result
210        
211         if self.isRunning():
212             raise QueueRunError("Startup only occurs upon instantiation")
213         self._d = runner()
214         self._triggerID = reactor.addSystemEventTrigger(
215             'before', 'shutdown', self.shutdown)
216    
217     def isRunning(self):
218         """
219         Returns C{True} if the queue is running, C{False} otherwise.
220         """
221         return hasattr(self, '_triggerID')
222    
223     def shutdown(self):
224         """
225         Initiates a shutdown of the queue by putting a lowest-possible priority
226         C{None} object onto the priority heap instead of a task.
227         
228         @return: A deferred that fires when all the workers have shut
229           down, with a list of any tasks left unfinished in the queue.
230         
231         """
232         def cleanup(unfinishedTasks):
233             if hasattr(self, '_triggerID'):
234                 reactor.removeSystemEventTrigger(self._triggerID)
235                 del self._triggerID
236             return unfinishedTasks
237        
238         if self.isRunning():
239             self.heap.put(None)
240             d = self._d
241         else:
242             d = defer.succeed([])
243         d.addCallback(cleanup)
244         return d
245    
246     def attachWorker(self, worker):
247         """
248         Registers a new provider of IWorker for working on tasks from
249         the queue, returning an integer ID that uniquely identifies
250         the worker.
251
252         See L{WorkerManager.hire}.
253         """
254         return self.mgr.hire(worker)
255
256     def _getWorkerID(self, workerOrID):
257         if workerOrID in self.mgr.workers:
258             return workerOrID
259         for thisID, worker in self.mgr.workers.iteritems():
260             if worker == workerOrID:
261                 return thisID
262    
263     def detachWorker(self, workerOrID, reassign=False, crash=False):
264         """
265         Detaches and terminates the worker supplied or specified by its ID.
266
267         If I{reassign} is set C{True}, any tasks left unfinished by
268         the worker are put into new assignments for other or future
269         workers. Otherwise, they are returned via the deferred's
270         callback.
271         
272         See L{tasks.WorkerManager.terminate}.
273         """
274         ID = self._getWorkerID(workerOrID)
275         if ID is None:
276             return
277         if crash:
278             d = self.mgr.terminate(ID, crash=True, reassign=reassign)
279         else:
280             d = self.mgr.terminate(ID, self.timeout, reassign=reassign)
281         return d
282
283     def qualifyWorker(self, worker, series):
284         """
285         Adds the specified I{series} to the qualifications of the supplied
286         I{worker}.
287         """
288         if series not in worker.iQualified:
289             worker.iQualified.append(series)
290             self.mgr.assignmentFactory.request(worker, series)
291    
292     def workers(self, ID=None):
293         """
294         Returns the worker object specified by I{ID}, or C{None} if that worker
295         is not employed with me.
296
297         If no ID is specified, a list of the workers currently attached, in no
298         particular order, will be returned instead.
299         """
300         if ID is None:
301             return self.mgr.workers.values()
302         return self.mgr.workers.get(ID, None)
303    
304     def call(self, func, *args, **kw):
305         """
306         Puts a call to I{func} with any supplied arguments and keywords into
307         the pipeline, returning a deferred to the eventual result of the call
308         when it is eventually pulled from the pipeline and run.
309
310         Scheduling of the call is impacted by the I{niceness} keyword that can
311         be included in addition to any keywords for the call. As with UNIX
312         niceness, the value should be an integer where 0 is normal scheduling,
313         negative numbers are higher priority, and positive numbers are lower
314         priority.
315
316         Tasks in a series of tasks all having niceness N+10 are dequeued and
317         run at approximately half the rate of tasks in another series with
318         niceness N.
319         
320         @keyword niceness: Scheduling niceness, an integer between -20 and 20,
321           with lower numbers having higher scheduling priority as in UNIX
322           C{nice} and C{renice}.
323
324         @keyword series: A hashable object uniquely identifying a series for
325           this task. Tasks of multiple different series will be run with
326           somewhat concurrent scheduling between the series even if they are
327           dumped into the queue in big batches, whereas tasks within a single
328           series will always run in sequence (except for niceness adjustments).
329         
330         @keyword doNext: Set C{True} to assign highest possible priority, even
331           higher than a deeply queued task with niceness = -20.
332         
333         @keyword doLast: Set C{True} to assign priority so low that any
334           other-priority task gets run before this one, no matter how long this
335           task has been queued up.
336
337         @keyword timeout: A timeout interval in seconds from when a worker gets
338           a task assignment for the call, after which the call will be retried.
339
340         """
341         def oneLessPending(result):
342             self.loadInfoProducer.oneLess()
343             return result
344        
345         if not self.isRunning():
346             if self.warnOnly:
347                 argString = ", ".join([str(x) for x in args])
348                 kwString = ", ".join(
349                     ["%s=%s" % (str(name), str(value))
350                      for name, value in kw.iteritems()])
351                 if args and kw:
352                     sepString = ", "
353                 else:
354                     sepString = ""
355                 print "Queue shut down, ignoring call\n  %s(%s%s%s)\n" \
356                       % (str(func), argString, sepString, kwString)
357             else:
358                 raise QueueRunError(
359                     "Cannot call after the queue has been shut down")
360         self.loadInfoProducer.oneMore()
361         niceness = kw.pop('niceness', 0)
362         series = kw.pop('series', None)
363         timeout = kw.pop('timeout', None)
364         task = self._taskFactory.new(func, args, kw, niceness, series, timeout)
365         if kw.pop('doNext', False):
366             task.priority = -1000000
367         elif kw.pop('doLast', False):
368             task.priority = 1000000
369         self.heap.put(task)
370         task.d.addBoth(oneLessPending)
371         return task.d
372
373     def cancelSeries(self, series):
374         """
375         Cancels any pending tasks in the specified I{series}, unceremoniously
376         removing them from the queue.
377         """
378         self.heap.cancel(lambda item: getattr(item, 'series', None) == series)
379    
380     def subscribe(self, consumer):
381         """
382         Subscribes the supplied provider of L{interfaces.IConsumer}
383         to updates on the number of tasks queued whenever it goes up or down.
384
385         The figure is the integer number of calls currently pending, i.e., the
386         number of tasks that have been queued up but haven't yet been called
387         plus those that have been called but haven't yet returned a result.
388         """
389         if interfaces.IConsumer.providedBy(consumer):
390             self.loadInfoProducer.registerConsumer(consumer)
391         else:
392             raise ImplementationError(
393                 "Object doesn't provide the IConsumer interface")
Note: See TracBrowser for help on using the browser.