root/projects/AsynQueue/trunk/asynqueue/workers.py

Revision 126, 13.2 kB (checked in by edsuom, 9 months ago)

Worker specialties, improved timouts

Line 
1 # AsynQueue:
2 # Asynchronous task queueing based on the Twisted framework, with task
3 # prioritization and a powerful worker/manager interface.
4 #
5 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
6 #
7 # This program is free software; you can redistribute it and/or modify it under
8 # the terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # This program is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # this program; if not, write to the Free Software Foundation, Inc., 51
18 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
19
20 """
21 The worker interface and some implementors.
22 """
23
24 from zope.interface import implements, invariant, Interface, Attribute
25 from twisted.python import failure
26 from twisted.internet import defer, reactor
27 from twisted.spread import pb
28
29 import errors
30
31
32 class IWorker(Interface):
33     """
34     Provided by worker objects that can have tasks assigned to them for
35     processing.
36
37     All worker objects are considered qualified to run tasks of the default
38     C{None} series. To indicate that subclasses or subclass instances are
39     qualified to run tasks of user-defined series in addition to the default,
40     the hashable object that identifies the additional series must be listed in
41     the C{cQualified} or C{iQualified} class or instance attributes,
42     respectively.
43         
44     """
45     cQualified = Attribute(
46         """
47         A class-attribute list containing all series for which all instances of
48         the subclass are qualified to run tasks.
49         """)
50
51     iQualified = Attribute(
52         """
53         An instance-attribute list containing all series for which the subclass
54         instance is qualified to run tasks.
55         """)
56
57     def _check_qualifications(ob):
58         """
59         Qualification attributes must be present as lists.
60         """
61         for attrName in ('cQualified', 'iQualified'):
62             x = getattr(ob, attrName, None)
63             if not isinstance(x, list):
64                 raise errors.InvariantError(ob)
65     invariant(_check_qualifications)
66
67     def setResignator(callableObject):
68         """
69         Registers the supplied I{callableObject} to be called if the
70         worker deems it necessary to resign, e.g., a remote connection
71         has been lost.
72         """
73
74     def run(task):
75         """
76         Adds the task represented by the specified I{task} object to the list
77         of tasks pending for this worker, to be run however and whenever the
78         worker sees fit.
79
80         Make sure that any callbacks you add to the task's internal deferred
81         object C{task.d} return the callback argument. Otherwise, the result of
82         your task will be lost in the callback chain.
83         
84         @return: A deferred that fires when the worker is ready to be assigned
85           another task.
86
87         """
88
89     def stop():
90         """
91         Attempts to gracefully shut down the worker, returning a deferred that
92         fires when the worker is done with all assigned tasks and will not
93         cause any errors if the reactor is stopped or its object is deleted.
94
95         The deferred returned by your implementation of this method must not
96         fire until B{after} the results of all pending tasks have been
97         obtained. Thus the deferred must be chained to each C{task.d} somehow.
98
99         Make sure that any callbacks you add to the task's internal deferred
100         object C{task.d} return the callback argument. Otherwise, the result of
101         your task will be lost in the callback chain.
102         """
103
104     def crash():
105         """
106         Takes drastic action to shut down the worker, rudely and
107         synchronously.
108
109         @return: A list of I{task} objects, one for each task left
110           uncompleted. You shouldn't have to call this method if no
111           tasks are left pending; the L{shutdown} method should be
112           enough in that case.
113         
114         """
115
116
117 class ThreadWorker(object):
118     """
119     I implement an L{IWorker} that runs tasks in a dedicated worker thread.
120
121     You can define one or more specialties that I am qualified to handle with
122     string arguments.
123     """
124     implements(IWorker)
125     cQualified = []
126
127     def __init__(self, *specialties):
128         import threading
129         self.iQualified = list(specialties)
130         self.event = threading.Event()
131         self.thread = threading.Thread(target=self._loop)
132         self.thread.start()
133
134     def _loop(self):
135         """
136         Runs a loop in a dedicated thread that waits for new tasks. The loop
137         exits when a C{None} object is supplied as a task.
138         """
139         while True:
140             # Wait here on the threading.Event object
141             self.event.wait()
142             task = self.task
143             if task is None:
144                 break
145             # Ready for the task attribute to be set to another task object
146             self.event.clear()
147             reactor.callFromThread(self.d.callback, None)
148             f, args, kw = task.callTuple
149             try:
150                 result = f(*args, **kw)
151                 # If the task causes the thread to hang, the method
152                 # call will not reach this point.
153             except Exception, e:
154                 reactor.callFromThread(task.errback, failure.Failure(e))
155             else:
156                 reactor.callFromThread(task.callback, result)
157         # Broken out of loop, ready for the thread to end
158         reactor.callFromThread(self.d.callback, None)
159
160     def setResignator(self, callableObject):
161         """
162         There's nothing that would make me resign.
163         """
164
165     def run(self, task):
166         """
167         Starts a thread for this worker if one not started already, along with
168         a L{threading.Event} object for cross-thread signaling.
169         """
170         if hasattr(self, 'd') and not self.d.called:
171             raise errors.ImplementationError(
172                 "Task Loop not ready to deal with a task now")
173         self.d = defer.Deferred()
174         self.task = task
175         self.event.set()
176         return self.d
177    
178     def stop(self):
179         """
180         The returned deferred fires when the task loop has ended and its thread
181         terminated.
182         """
183         def joinIfPossible(null):
184             if hasattr(self, 'task'):
185                 self.thread.join()
186
187         if hasattr(self, 'task') and self.task is None:
188             d = defer.succeed(None)
189         else:
190             d = defer.Deferred()
191             if hasattr(self, 'd') and not self.d.called:
192                 d.addCallback(lambda _: self.stop())
193                 self.d.chainDeferred(d)
194             else:
195                 d.addCallback(joinIfPossible)
196                 self.d = d
197                 self.task = None
198                 self.event.set()
199         return d
200
201     def crash(self):
202         """
203         Unfortunately, a thread can only terminate itself, so calling
204         this method only forces firing of the deferred returned from a
205         previous call to L{stop} and returns the task that hung the
206         thread.
207         """
208         if self.task is not None and not self.task.d.called:
209             result = [self.task]
210         else:
211             # This shouldn't happen
212             result = []
213         if hasattr(self, 'd') and not self.d.called:
214             del self.task
215             self.d.callback(None)
216
217
218 class RemoteCallWorker(object):
219     """
220     Instances of me provide an L{IWorker} that dispatches
221     C{callRemote} tasks, no more than I{N} at a time, to a particular
222     I{remoteReference} to a referenceable at a connected PB server.
223
224     @ivar remoteCaller: The I{callRemote} method of the remoteReference.
225     
226     """
227     implements(IWorker)
228     cQualified = []
229
230     def __init__(self, remoteReference, N=3, noTypeCheck=False):
231         self.N = N
232         self.iQualified = []
233         self.remoteCaller = remoteReference.callRemote
234         # Check supplied remote reference object
235         if not noTypeCheck:
236             # Disabling type checking is mostly for unit testing, where a mock
237             # RemoteReference may be used.
238             if not isinstance(remoteReference, pb.RemoteReference):
239                 raise TypeError(
240                     "You must construct me with a PB RemoteReference")
241         self.startup(remoteReference)
242
243     def startup(self, remoteReference):
244         """
245         Starts things up with the remote reference in hand. Useful to have this
246         as a separate method when you're subclassing and doing difference
247         constructor stuff.
248         """
249         # Setup resignation-upon-disconnect
250         self.resignators = []
251         self.disconnectErrors = (pb.DeadReferenceError, pb.PBConnectionLost)
252         remoteReference.notifyOnDisconnect(self.resign)
253         # Prepare the run request queue
254         self.jobs = []
255         self.runRequestQueue = defer.DeferredQueue()
256         for k in xrange(self.N):
257             self.runRequestQueue.put(None)
258
259     def runNow(self, null, task):
260         suffix, args, kw = task.callTuple
261         d = self.remoteCaller(suffix, *args, **kw)
262         job = (task, d)
263         self.jobs.append(job)
264         d.addBoth(self.doneTrying, job)
265         # The task's deferred is NOT returned!
266
267     def doneTrying(self, result, job):
268         if hasattr(result, 'getTraceback'):
269             print "OOPS", result.getTraceback()
270             if result.check(*self.disconnectErrors):
271                 # This was a disconnect error, so bail out now; don't remove
272                 # the job or signal the run request queue that the job is done.
273                 return
274         self.jobs.remove(job)
275         self.runRequestQueue.put(None)
276         task = job[0]
277         task.callback(result)
278    
279     def resign(self, *null):
280         while self.resignators:
281             callableObject = self.resignators.pop()
282             callableObject()
283    
284     def setResignator(self, callableObject):
285         """
286         I will resign upon having one of my tasks turn up a connection
287         fault.
288         """
289         self.resignators.append(callableObject)
290    
291     def run(self, task):
292         """
293         Runs the specified task, which must be a string specifying the suffix
294         portion of a method of the referenceable, e.g., I{'foo'} for
295         C{remote_foo} or C{perspective_foo}.
296
297         Returns a deferred that fires when one of the pending tasks is done
298         running and I can accept another one.
299         """
300         #if getattr(self, 'isShuttingDown', False):
301         #    raise errors.QueueRunError
302         return self.runRequestQueue.get().addCallback(self.runNow, task)
303    
304     def stop(self):
305         """
306         The returned deferred fires when all pending tasks are done.
307         """
308         self.isShuttingDown = True
309         return defer.DeferredList([job[1] for job in self.jobs])
310
311     def crash(self):
312         """
313         Return all tasks not completed by the (disconnected) PB server.
314         """
315         return [job[0] for job in self.jobs]
316
317
318 class RemoteInterfaceWorker(RemoteCallWorker):
319     """
320     Construct an instance of me with a I{remoteReference} and one or more
321     interfaces it provides, as arguments.
322     """
323     def __init__(self, remoteReference, *interfaces, **kw):
324         # Keywords
325         subseries = kw.get('subseries', None)
326         N = kw.get('N', 3)
327         # The base class constructor
328         RemoteCallWorker.__init__(self, remoteReference, N)
329         # Define the interface(s)
330         self.interfaces = interfaces
331         for interface in interfaces:
332             qualification = interface.__name__
333             if subseries:
334                 qualification += ":%s" % subseries
335             self.iQualified.append(qualification)
336         # Caching suffixes of approved remote calls makes for faster error
337         # checking as those calls are repeated
338         self.suffixCache = []
339
340     def names(self, items):
341         nameListing = [x.__name__ for x in items]
342         nameListing[-1] = "or " + nameListing[-1]
343         joinString = (" ", ", ")[len(nameListing) > 2]
344         return joinString.join(nameListing)
345
346     def checkSuffix(self, suffix):
347         for interface in self.interfaces:
348             for attrName in interface:
349                 if attrName.endswith('_'+suffix):
350                     self.suffixCache.append(suffix)
351                     return
352         names = self.names(self.interfaces)
353         raise AttributeError(
354             "No remote method *_%s provided by interface %s" % (suffix, names))
355
356     def runNow(self, null, task):
357         suffix, args, kw = task.callTuple
358         if suffix not in self.suffixCache:
359             self.checkSuffix(suffix)
360         d = self.remoteCaller(suffix, *args, **kw)
361         job = (task, d)
362         self.jobs.append(job)
363         d.addBoth(self.doneTrying, job)
364         # The task's deferred is NOT returned!
Note: See TracBrowser for help on using the browser.