root/projects/AsynQueue/trunk/asynqueue/tasks.py

Revision 126, 16.0 kB (checked in by edsuom, 11 months ago)

Worker specialties, improved timouts

Line 
1 # AsynQueue:
2 # Asynchronous task queueing based on the Twisted framework, with task
3 # prioritization and a powerful worker/manager interface.
4 #
5 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
6 #
7 # This program is free software; you can redistribute it and/or modify it under
8 # the terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # This program is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # this program; if not, write to the Free Software Foundation, Inc., 51
18 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
19
20 """
21 Task management for the task queue workers
22 """
23
24 # Imports
25 from twisted.internet import defer, reactor
26 # Use C Deferreds if possible, for efficiency
27 try:
28     from twisted.internet import cdefer
29 except:
30     pass
31 else:
32     defer.Deferred = cdefer.Deferred
33
34 from workers import IWorker
35 from errors import ImplementationError
36
37
38 class Task(object):
39     """
40     I represent a task that has been dispatched to a queue for running with a
41     given scheduling I{niceness}.
42
43     I generate a C{Deferred} that you fire by calling either my L{callback} or
44     L{errback} with a result or failure, respectively, when the the task is
45     finally run and its result is obtained. You can call the deferred's
46     versions of those methods directly, but my versions deal with things like
47     repeated callbacks, which happen sometimes with task timeouts.
48     
49     @ivar d: A deferred to the eventual result of the task.
50     
51     @ivar series: A hashable object identifying the series of which this task
52       is a part.
53
54     """
55     def __init__(self, f, args, kw, priority, series, timeout=None):
56         if not isinstance(args, (tuple, list)):
57             raise TypeError("Second argument 'args' isn't a sequence")
58         if not isinstance(kw, dict):
59             raise TypeError("Third argument 'kw' isn't a dict")
60         self.callTuple = (f, args, kw)
61         self.priority = priority
62         self.series = series
63         self.d = defer.Deferred()
64         self.timeout = timeout
65
66     def startTimer(self):
67         if self.timeout:
68             self.callID = reactor.callLater(self.timeout, self.timedout)
69         else:
70             self.callID = None
71
72     def callback(self, result):
73         if self.callID:
74             self.callID.cancel()
75             self.callID = None
76         if not self.d.called:
77             self.d.callback(result)
78
79     def errback(self, result):
80         if self.callID:
81             self.callID.cancel()
82             self.callID = None
83         self.d.errback(result)
84
85     def timedout(self):
86         if not self.d.called:
87             self.d.callback((False, "Timeout"))
88         self.callID = None
89    
90     def __repr__(self):
91         """
92         Gives me an informative string representation
93         """
94         func = self.callTuple[0]
95         args = ", ".join([str(x) for x in self.callTuple[1]])
96         kw = "".join(
97             [", %s=%s" % item for item in self.callTuple[2].iteritems()])
98         if func.__class__.__name__ == "function":
99             funcName = func.__name__
100         elif callable(func):
101             funcName = "%s.%s" % (func.__class__.__name__, func.__name__)
102         else:
103             funcName = "<worker call> "
104             args = ("%s, " % func) + args
105         return "Task: %s(%s%s)" % (funcName, args, kw)
106
107     def __cmp__(self, other):
108         """
109         Numeric comparisons between tasks are based on their priority, with
110         higher (lower-numbered) priorities being considered \"less\" and thus
111         sorted first.
112
113         A task will always have a higher priority, i.e., be comparatively
114         I{less}, than a C{None} object, which is used as a shutdown signal
115         instead of a task.
116         """
117         if other is None:
118             return -1
119         return cmp(self.priority, other.priority)
120
121
122 class TaskFactory(object):
123     """
124     I generate L{Task} instances with the right priority setting for effective
125     scheduling between tasks in one or more concurrently running task series.
126     """
127     def __init__(self, TaskClass=Task):
128         # Setting a non-default TaskClass is mostly for testing
129         self.TaskClass = TaskClass
130         self.seriesNumbers = {}
131
132     def new(self, func, args, kw, niceness, series=None, timeout=None):
133         """
134         Call this to obtain a L{Task} instance that will run in the specified
135         I{series} at a priority reflecting the specified I{niceness}.
136
137         The equation for priority has been empirically determined as follows::
138
139             p = k * (1 + nn**2)
140
141         where C{k} is an iterator that increments for each new task and C{nn}
142         is niceness normalized from -20...+20 to the range 0...2.
143         
144         @param func: A callable object to run as the task, the result of which
145           will be sent to the callback for the deferred of the task returned by
146           this method when it fires.
147
148         @param args: A tuple containing any arguments to include in the call.
149
150         @param kw: A dict containing any keywords to include in the call.
151         
152         """
153         if not isinstance(niceness, int) or abs(niceness) > 20:
154             raise ValueError(
155                 "Niceness must be an integer between -20 and +20")
156         positivized = niceness + 20
157         priority = self._serial(series) * (1 + (float(positivized)/10)**2)
158         return self.TaskClass(func, args, kw, priority, series, timeout)
159    
160     def _serial(self, series):
161         """
162         Maintains serial numbers for tasks in one or more separate series, such
163         that the numbers in each series increment independently except that any
164         new series starts at a value greater than the maximum serial number
165         currently found in any series.
166         """
167         if series not in self.seriesNumbers:
168             eachSeries = [0] + self.seriesNumbers.values()
169             maxCurrentSN = max(eachSeries)
170             self.seriesNumbers[series] = maxCurrentSN
171         self.seriesNumbers[series] += 1
172         return float(self.seriesNumbers[series])
173
174
175 class Assignment(object):
176     """
177     I represent the assignment of a single task to whichever worker object
178     accepts me. Deep down, my real role is to provide something to fire the
179     callback of a deferred with instead of just another deferred.
180     
181     @ivar d: A deferred that is instantiated for a given instance of me, which
182       fires when a worker accepts the assigment represented by that instance.
183
184     """
185     # We go through a lot of these objects and they're small, so let's make
186     # them cheap to build
187     __slots__ = ['task', 'd']
188    
189     def __init__(self, task):
190         self.task = task
191         self.d = defer.Deferred()
192
193     def accept(self, worker):
194         """
195         Called when the worker accepts the assignment, firing my
196         deferred.
197         
198         @return: Another deferred that fires when the worker is ready to accept
199           B{another} assignment following this one.
200
201         """
202         self.d.callback(None)
203         self.task.startTimer()
204         return worker.run(self.task)
205
206
207 class AssignmentFactory(object):
208     """
209     I generate L{Assignment} instances for workers to handle particular tasks.
210     """
211     def __init__(self):
212         self.waiting = {}
213         self.pending = {}
214
215     def cancelRequests(self, worker):
216         """
217         """
218         for series, dList in getattr(worker, 'assignments', {}).iteritems():
219             requestsWaiting = self.waiting.get(series, [])
220             for d in dList:
221                 if d in requestsWaiting:
222                     requestsWaiting.remove(d)
223
224     def request(self, worker, series):
225         """
226         Called to request a new assignment in the specified I{series} of tasks
227         for the supplied I{worker}.
228
229         When a new assignment in the series is finally ready in the queue for
230         this worker, the deferred for the assignment request will fire with an
231         instance of me that has been constructed with the task to be assigned.
232
233         If the worker is still gainfully employed when it accepts the
234         assignment, and is not just wrapping up its work after having been
235         fired, the worker will request another assignment when it finishes the
236         task.
237         """
238         def accept(assignment, d_request):
239             worker.assignments[series].remove(d_request)
240             if isinstance(assignment, Assignment):
241                 d = assignment.accept(worker)
242                 if worker.hired:
243                     d.addCallback(lambda _: self.request(worker, series))
244                 return d
245
246         assignments = getattr(worker, 'assignments', {})
247         if self.pending.get(series, []):
248             d = defer.succeed(self.pending[series].pop(0))
249         else:
250             d = defer.Deferred()
251             self.waiting.setdefault(series, []).append(d)
252         assignments.setdefault(series, []).append(d)
253         worker.assignments = assignments
254         # The callback is added to the deferred *after* being appended to the
255         # worker's assignments list for this series. That way, we know that the
256         # callback will be able to remove the deferred even if the deferred
257         # fires immediately due to the queue having a surplus of assignments.
258         d.addCallback(accept, d)
259
260     def new(self, task):
261         """
262         Creates and queues a new assignment for the supplied I{task}, returning
263         a deferred that fires when the assignment has been accepted.
264         """
265         series = task.series
266         assignment = Assignment(task)
267         if self.waiting.get(series, []):
268             self.waiting[series].pop(0).callback(assignment)
269         else:
270             self.pending.setdefault(series, []).append(assignment)
271         return assignment.d
272
273
274 class WorkerManager(object):
275     """
276     I manage one or more providers of L{IWorker} for a particular instance of
277     L{TaskQueue}.
278
279     When a new worker is hired with my L{hire} method, I run the
280     L{Assignment.request} class method to request that the worker be assigned a
281     task from the queue of each task series for which it is qualified.
282
283     When the worker finally gets the assignment, it fires the L{Assignment}
284     object's internal deferred with a reference to itself. That is my cue to
285     have the worker run the assigned task and request another assignment of a
286     task in the same series when it's done, unless I've terminated the worker
287     in the meantime.
288
289     Each worker object maintains a dictionary of deferreds for each of its
290     outstanding assignment requests so that I can cancel them if I terminate
291     the worker. Then I can effectively cancel the assignment requests by firing
292     the deferreds with fake, no-task assignments. See my L{terminate} method.
293     
294     @ivar workers: A C{dict} of worker objects that are currently employed by
295       me, keyed by a unique integer ID code for each worker.
296
297     """
298     def __init__(self):
299         self.workers = {}
300         self.assignmentFactory = AssignmentFactory()
301
302     def shutdown(self, timeout=None):
303         """
304         Shutdown all my workers, then fire them, in turn. Returns a
305         deferred that fires when my entire work force has been
306         terminated. The deferred result is a list of all tasks, if
307         any, that were left unfinished by the work force.
308         """
309         def gotResults(results):
310             unfinishedTasks = []
311             for result in results:
312                 unfinishedTasks.extend(result)
313             return unfinishedTasks
314        
315         dList = []
316         for workerID in self.workers.keys():
317             d = self.terminate(workerID, timeout=timeout)
318             dList.append(d)
319         return defer.gatherResults(dList).addCallback(gotResults)
320
321     def hire(self, worker):
322         """
323         Adds a new worker to my work force.
324
325         Makes sure that there is an assignment request queue for each task
326         series for which the worker is qualified, then has the new worker
327         request an initial assignment from each queue.
328
329         The method generates an integer ID uniquely identifying the worker, and
330         gives the worker an C{ID} attribute with the ID for its own reference,
331         The ID is returned as well.
332         """
333         if not IWorker.providedBy(worker):
334             raise ImplementationError(
335                 "'%s' doesn't provide the IWorker interface" % worker)
336         IWorker.validateInvariants(worker)
337
338         worker.hired = True
339         worker.assignments = {}
340         qualifications = [None] +\
341                          getattr(worker, 'cQualified', []) +\
342                          getattr(worker, 'iQualified', [])
343         for series in qualifications:
344             self.assignmentFactory.request(worker, series)
345         workerID = worker.ID = getattr(self, '_workerCounter', 0) + 1
346         self._workerCounter = workerID
347         self.workers[workerID] = worker
348         worker.setResignator(
349             lambda : self.terminate(worker.ID, crash=True, reassign=True))
350         return workerID
351    
352     def terminate(self, workerID, timeout=None, crash=False, reassign=False):
353         """
354         Removes a worker from my work force, canceling all of its unfullfilled
355         assignment requests back from the queue and then attempting to shut it
356         down gracefully via its C{stop} method.
357
358         The I{timeout} keyword can be set to a number of seconds after which
359         the worker will be terminated rudely via its C{crash} method if it
360         hasn't shut down gracefully by then. If the I{crash} keyword is set
361         C{True}, the worker is crashed right away without waiting for it to run
362         through its pending tasks.
363
364         @return: A deferred that fires when the worker has been removed,
365           gracefully or not, with a list of any tasks left unfinished and not
366           reassigned.
367         
368         """
369         def crashTheWorker(worker, d):
370             unfinished = worker.crash()
371             # Fire deferred with list of unfinished tasks
372             d.callback(unfinished)
373
374         def stopped(result):
375             if callID.active():
376                 callID.cancel()
377                 # No tasks left unfinished if deferred fires normally
378                 return []
379             return result
380
381         def reassignTasks(tasks):
382             for task in tasks:
383                 self.assignmentFactory.new(task)
384             return []
385        
386         worker = self.workers.pop(workerID, None)
387         if worker is None:
388             return defer.succeed([])
389         worker.hired = False
390         self.assignmentFactory.cancelRequests(worker)
391         if crash:
392             d = defer.succeed(worker.crash())
393         else:
394             d = worker.stop()
395             if timeout:
396                 callID = reactor.callLater(timeout, crashTheWorker, worker, d)
397                 d.addCallback(stopped)
398             else:
399                 # No tasks left unfinished if deferred fires without timeout
400                 d.addCallback(lambda _: [])
401         if reassign:
402             d.addCallback(reassignTasks)
403         return d
404    
405     def assignment(self, task):
406         """
407         Generates a new assignment for the supplied I{task}.
408
409         If the worker that runs the task is still working for me when it
410         becomes ready for another task following this one, an assignment
411         request will be entered for it to obtain another task of the same
412         series.
413         
414         @return: A deferred that fires when the task has been assigned to a
415           worker and it has accepted the assignment.
416
417         """
418         return self.assignmentFactory.new(task)
Note: See TracBrowser for help on using the browser.