root/projects/AsynQueue/trunk/asynqueue/jobs.py

Revision 213, 18.7 kB (checked in by edsuom, 5 months ago)

Improved job updating, unit tests for processworker

Line 
1 # AsynQueue:
2 # Asynchronous task queueing based on the Twisted framework, with task
3 # prioritization and a powerful worker/manager interface.
4 #
5 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
6 #
7 # This program is free software; you can redistribute it and/or modify it under
8 # the terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # This program is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # this program; if not, write to the Free Software Foundation, Inc., 51
18 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
19
20 """
21 Running jobs on python interpreters that are connected as children via
22 Twisted's Perspective Broker.
23 """
24
25 from zope.interface import implements, Interface
26 from twisted.internet import defer, reactor
27 from twisted.python import reflect
28 from twisted.python.rebuild import rebuild
29 from twisted.python.failure import Failure
30 from twisted.spread import pb, flavors
31 # Use C Deferreds if possible, for efficiency
32 try:
33     from twisted.internet import cdefer
34 except:
35     pass
36 else:
37     defer.Deferred = cdefer.Deferred
38
39 import base, workers
40
41 VERBOSE = True
42 def log(msgProto, *args):
43     if VERBOSE:
44         print msgProto % args
45
46
47 class TrustError(Exception):
48     pass
49
50
51 class ChildRoot(pb.Root):
52     """
53     I am the root resource for one child worker interpreter.
54
55     @ivar trusted: Set C{True} when the 'parent' python interpreter is trusted
56       to provide code that I can run without concerns.
57     
58     """
59     def __getattr__(self, name):
60         if name == 'jobs':
61             result = self.jobs = {}
62         elif name == 'trusted':
63             result = self.trusted = False
64         else:
65             raise AttributeError("No such attribute '%s'" % name)
66         return result
67
68     def oops(self, *arg):
69         """
70         Returns a C{False} status code for a remote call along with a string
71         traceback of the exception raised. You can supply your own exception or
72         L{Failure} instance. If you don't, the current exception will be used.
73         """
74         if arg and isinstance(arg[0], Failure):
75             failureObject = arg[0]
76         else:
77             failureObject = Failure(*arg)
78         return False, failureObject.getTraceback()
79
80     def remote_registerClasses(self, *args):
81         """
82         Instructs my broker to register the classes specified by the
83         argument(s).
84
85         The classes will be registered for B{all} jobs, and are specified by
86         their string representations::
87         
88             <package(s).module.class>
89         
90         """
91         modules = []
92         for stringRep in args:
93             # Load the class for the string representation
94             cls = reflect.namedObject(stringRep)
95             # Register instances of the class, including its type and module
96             pb.setUnjellyableForClass(stringRep, cls)
97             if cls.__module__ not in modules:
98                 modules.append(cls.__module__)
99         # Try to build the modules for the classes in case they've changed
100         # since the last run
101         for module in modules:
102             try:
103                 rebuild(reflect.namedModule(module), doLog=False)
104             except:
105                 pass
106    
107     def remote_newJob(self, jobID, jobCode):
108         """
109         Registers the job identified by the specified integer I{jobID} and
110         represented by Python code contained in the string I{jobCode}.
111
112         The namespace can include a no-argument function C{shutdown}, which may
113         return a deferred. If it is present, the function will be called before
114         my process exits.
115
116         Returns a tuple containing a status value and the result of loading the
117         job. If all went well, the status is C{True} and the result is a list
118         of names of the callable objects in the job's namespace. If there was a
119         problem, the status is C{False} and the result is a string traceback of
120         the exception that was raised.
121         """
122         if not self.trusted:
123             return self.oops(TrustError)
124         try:
125             namespace = {}
126             exec jobCode in namespace
127         except:
128             return self.oops()
129         self.jobs[jobID] = namespace
130         return True, [x[0] for x in namespace.iteritems() if callable(x[1])]
131
132     def remote_runJob(self, jobID, callName, *args, **kw):
133         """
134         Calls the object that is present in the namespace of I{jobID} under the
135         specified I{callName}, with any supplied arguments and
136         keywords.
137
138         Returns a tuple containing a status value and the result of the
139         call. loading the job.  If all went well, the status is C{True} and the
140         result is the result, of whatever type. If there was a problem, the
141         status is C{False} and the result is a string traceback of the
142         exception that was raised.
143         """
144         if not self.trusted:
145             return self.oops(TrustError)
146         calledObject = self.jobs[jobID].get(callName, None)
147         if callable(calledObject):
148             d = defer.maybeDeferred(calledObject, *args, **kw)
149             d.addCallback(lambda x: (True, x))
150             d.addErrback(self.oops)
151             return d
152         return self.oops(
153             AttributeError(
154             "No callable object '%s' defined in namespace for job %d" \
155             % (callName, jobID)))
156
157     def remote_forgetJob(self, jobID):
158         """
159         Call this with the I{jobID} of a job that is done and I will forget its
160         namespace, thus freeing up memory.
161         """
162         if jobID in self.jobs:
163             del self.jobs[jobID]
164    
165     def remote_exit(self, stopReactor=False):
166         """
167         Terminates my child worker process, calling and waiting for any
168         C{shutdown} methods present in my jobs' namespaces before stopping my
169         reactor.
170         """
171         dList = []
172         for namespace in self.jobs.itervalues():
173             possibleShutdownFunction = namespace.get('shutdown', None)
174             if callable(possibleShutdownFunction):
175                 dList.append(defer.maybeDeferred(possibleShutdownFunction))
176         d = defer.DeferredList(dList)
177         if stopReactor:
178             d.addCallback(lambda _: reactor.stop())
179         return d
180
181
182 class ChildWorker(workers.RemoteCallWorker):
183     """
184     I implement an I{IWorker} that runs tasks, up to I{N} pending at a time, in
185     a particular job on a particular child interpreter.
186     """
187     def __init__(self, *args, **kw):
188         # For unit testing purposes
189         if getattr(self, '_noTypeCheck', False):
190             kw['noTypeCheck'] = True
191         workers.RemoteCallWorker.__init__(self, *args, **kw)
192
193     def runNow(self, null, task):
194         funcName, args, kw = task.callTuple
195         d = self.remoteCaller('runJob', task.series, funcName, *args, **kw)
196         job = (task, d)
197         self.jobs.append(job)
198         d.addBoth(self.doneTrying, job)
199         # This task's deferred is NOT returned!
200
201     def stop(self):
202         d = workers.RemoteCallWorker.stop(self)
203         d.addBoth(lambda _: self.remoteCaller('exit'))
204         d.addErrback(lambda _: None)
205         return d
206
207
208 class JobManager(object):
209     """
210     I keep jobs running on python interpreters that are attached as children,
211     maintaining a pipeline of no fewer than I{N} calls pending on each
212     interpreter worker to minimize the effects of network latency for the PB
213     connection to the interpreters and balance the load across the workers
214     while still permitting some priority queueing of jobs by niceness.
215
216     You can supply an instance of L{base.TaskQueue} to the constructor. I will
217     instantiate my own if not.
218
219     I maintain a dict I{updates} of update tasks to perform for each jobID
220     before any (further) runs for that job. Each sequence has four elements::
221
222         [funcName, args, kw, workersUpdated]
223
224     When a worker runs a given update task, that worker's ID is appended to the
225     I{workersUpdate} list that is the fourth element of I{updates}. That will
226     indicate that it needs not run the update task again.
227
228     @ivar queue: The TaskQueue instance I'm using.
229     
230     """
231     maxRetries = 1
232    
233     def __init__(self, queue=None):
234         self.jobs  = {}
235         self.updates = {}
236         self.callsPending = {}
237         self.registeredClasses = {}
238         if queue is None:
239             self.queue = base.TaskQueue()
240         else:
241             self.queue = queue
242    
243     def shutdown(self):
244         """
245         Shuts down my task queue, returning a deferred that fires when the
246         queue has emptied and all interpreter workers have finished and been
247         terminated. The task queue shutdown takes care of shutting down
248         everything else, including any attached workers.
249         """
250         return self.queue.shutdown()
251
252     def jobTried(self, result, jobID, worker):
253         """
254         Callback from loading a new job.
255
256         If the worker's root reference raised an unexpected failure, returns
257         C{False}. If everything went OK, returns C{True}. If there was a
258         failure that may not have been the worker's fault, returns C{None}.
259         """
260         if hasattr(result, 'check'):
261             # Oops, failure from a bogus root reference.
262             log("Worker %d supplied nonconforming root reference", worker.ID)
263             return False
264         if isinstance(result, (list, tuple)):
265             if result[0]:
266                 msg = "Callable objects: %s" % ", ".join(result[1])
267                 log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg)
268                 self.queue.qualifyWorker(worker, jobID)
269                 return True
270             log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1])
271             return None
272         # Not a failure or status,result tuple, so just pass it along
273         return result
274
275     def attachChild(self, childRoot, N=3):
276         """
277         Attaches a new child interpreter worker using the supplied I{childRoot}
278         PB root reference.
279
280         Tries to load all of the currently registered jobs on the worker. If an
281         unexpected failure (not a simple job-loading exception) arises, the
282         worker is not hired.
283
284         The default number (three) of job runs that the worker is willing to
285         queue up on its end can be overridden with the I{N} keyword.
286         
287         Returns a deferred that fires with the worker's ID, or C{None} if not
288         hired.
289         """
290         def jobTried(status):
291             if status:
292                 d = self._runRegisterClasses(worker)
293                 if jobID in self.updates:
294                     d.addCallback(lambda _: self._runUpdate(jobID, worker))
295                 return d
296             mutable.append(None)
297        
298         def allDone(null):
299             if len(mutable):
300                 d = self.queue.detachWorker(worker)
301                 d.addCallback(lambda _: None)
302                 return d
303             return worker.ID
304
305         mutable = []
306         worker = ChildWorker(childRoot, N)
307         self.queue.attachWorker(worker)
308         dList = []
309         for jobID, jobInfo in self.jobs.iteritems():
310             jobCode = jobInfo[0]
311             d = childRoot.callRemote('newJob', jobID, jobCode)
312             d.addBoth(self.jobTried, jobID, worker)
313             d.addCallback(jobTried)
314             dList.append(d)
315         return defer.DeferredList(dList).addCallback(allDone)
316    
317     def detachChild(self, childID):
318         """
319         Detaches and terminates the child interpreter worker specified by the
320         supplied I{childID}.
321         """
322         return self.queue.detachWorker(childID, reassign=True, crash=True)
323
324     def new(self, jobCode, niceness=0):
325         """
326         Registers a new job for execution on qualified child interpreters.
327         
328         @param jobCode: A string containing Python code that defines the job
329           and its namespace.
330
331         @keyword niceness: Scheduling niceness for all calls of the job.
332
333         @type niceness: An integer between -20 and 20, with lower numbers
334           having higher scheduling priority as in UNIX C{nice} and C{renice}.
335         
336         @return: A deferred that fires with a unique ID for the job.
337     
338         """
339         jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1
340         self.callsPending[jobID] = {}
341         self.jobs[jobID] = [jobCode, niceness]
342        
343         dList = []
344         for worker in self.queue.workers():
345             d = worker.remoteCaller('newJob', jobID, jobCode)
346             d.addBoth(self.jobTried, jobID, worker)
347             dList.append(d)
348         d = defer.DeferredList(dList)
349         d.addCallback(lambda _: jobID)
350         return d
351
352     def _runUpdate(self, jobID, worker):
353         dList = []
354         for funcName, args, kw, workersUpdated in self.updates[jobID]:
355             if worker.ID in workersUpdated:
356                 continue
357             d = worker.remoteCaller('runJob', jobID, funcName, *args, **kw)
358             d.addCallback(lambda _: workersUpdated.append(worker.ID))
359             dList.append(d)
360         return defer.DeferredList(dList)
361    
362     def update(self, jobID, callName, *args, **kw):
363         """
364         Appends a new task to the update list for the specified I{jobID}. Runs
365         the new update task on all workers currently attached and ensures that
366         all new workers run the task for that job before they run any other
367         tasks for it.
368
369         The updates are run via a direct remoteCall to each worker, not through
370         the queue. Because of the disconnect between queued and direct calls,
371         it is likely but not guaranteed that any jobs you have queued when this
372         method is called will run on a particular worker B{after} this update
373         is run. Wait for the deferred from this method to fire before queuing
374         any jobs that need the update to be in place before running.
375
376         If you don't want the task saved to the update list, but only run on
377         the workers currently attached, set the I{ephemeral} keyword C{True}.
378         """
379         ephemeral = kw.pop('ephemeral', False)
380         if ephemeral:
381             dList = [
382                 worker.remoteCaller('runJob', jobID, callName, *args, **kw)
383                 for worker in self.queue.workers()]
384         else:
385             if jobID not in self.updates:
386                 self.updates[jobID] = []
387             self.updates[jobID].append([callName, args, kw, []])
388             dList = [
389                 self._runUpdate(jobID, worker)
390                 for worker in self.queue.workers()]
391         return defer.DeferredList(dList)
392
393     def _runRegisterClasses(self, worker):
394         stringReps = []
395         for stringRep, registeredWorkers in self.registeredClasses.iteritems():
396             if worker.ID in registeredWorkers:
397                 continue
398             registeredWorkers.append(worker.ID)
399             stringReps.append(stringRep)
400         return worker.remoteCaller('registerClasses', *stringReps)
401    
402     def registerClasses(self, *args):
403         """
404         Instructs my current and future nodes to register the classes specified
405         by the argument(s) as self-unjellyable and allowable past PB
406         security. The classes will be registered for B{all} jobs, and are
407         specified by their string representations::
408         
409             <package(s).module.class>
410
411         Use judiciously!
412         
413         """
414         for stringRep in args:
415             if stringRep not in self.registeredClasses:
416                 self.registeredClasses[stringRep] = []
417         dList = [
418             self._runRegisterClasses(worker)
419             for worker in self.queue.workers()]
420         return defer.DeferredList(dList)
421    
422     def run(self, jobID, callName, *args, **kw):
423         """
424         Runs the specified I{jobID} by putting a call to the specified callable
425         object in the job's namespace, with any supplied arguments and
426         keywords, into the queue.
427
428         Scheduling of the job is impacted by the niceness of the job itself. As
429         with UNIX niceness, the value should be an integer where 0 is normal
430         scheduling, negative numbers are higher priority, and positive numbers
431         are lower priority. Calls for a job having niceness N+10 are dispatched
432         at approximately half the rate of calls for a job with niceness N.
433
434         All keywords except for the following are passed to the call:
435
436           - B{timeout}: A timeout interval in seconds from when a worker
437             gets a task assignment for the call, after which the call will be
438             retried.
439           
440         @note: The task object generated contains the name of a callable (as a
441           string) for the first element of its I{callTuple} attribute, instead
442           of a callable itself.
443         
444         @return: A deferred to the eventual result of the call when it is
445           eventually pulled from the queue and run.
446
447         """
448         def queueJob(doNext=False):
449             if doNext:
450                 kw['doNext'] = True
451             dq = self.queue.call(callName, *args, **kw)
452             dq.addErrback(lambda failure: (False, failure.getTraceback()))
453             dq.addCallback(jobRan)
454
455         def jobRan(result):
456             status, result = result
457             if status:
458                 if d in self.callsPending.get(jobID, []):
459                     del self.callsPending[jobID][d]
460                     d.callback(result)
461             elif result == 'Timeout':
462                 log("Timeout on job %d, retrying", jobID)
463                 tryAgain()
464             else:
465                 log("Error running job %d:\n%s", jobID, result)
466                 tryAgain()
467
468         def tryAgain():
469             if jobID in self.callsPending:
470                 retryCount, callName, args, kw = self.callsPending[jobID][d]
471                 if retryCount < self.maxRetries:
472                     self.callsPending[jobID][d][0] = retryCount + 1
473                     queueJob(True)
474                     return
475             d.callback(None)
476
477         jobID = int(jobID)
478         if jobID not in self.jobs:
479             raise ValueError("No job '%s' registered" % jobID)
480         kw['series'] = jobID
481         kw['niceness'] = self.jobs[jobID][1]
482         d = defer.Deferred()
483         self.callsPending[jobID][d] = [0, callName, args, kw]
484         queueJob()
485         return d
486
487     def cancel(self, jobID):
488         """
489         Cancels the specified I{jobID} and any jobs that may be queued for
490         it. If the job doesn't exist, no error is raised.
491         """
492         self.queue.cancelSeries(jobID)
493         self.jobs.pop(jobID, None)
494         self.updates.pop(jobID, None)
495         self.callsPending.pop(jobID, None)
496         dList = [
497             worker.remoteCaller('forgetJob', jobID)
498             for worker in self.queue.workers()]
499         return defer.DeferredList(dList)
Note: See TracBrowser for help on using the browser.