Changeset 84
- Timestamp:
- 09/14/07 14:27:32 (1 year ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/jobs.py
r79 r84 24 24 25 25 from zope.interface import implements, Interface 26 from twisted.internet import defer 26 from twisted.internet import defer, reactor 27 27 from twisted.python.failure import Failure 28 28 from twisted.spread import pb, flavors … … 42 42 if name == 'jobs': 43 43 result = self.jobs = {} 44 elif name == 'queue':45 result = self.queue = base.TaskQueue(workers.ThreadWorker())46 44 elif name == 'trusted': 47 45 result = self.trusted = False … … 55 53 represented by Python code contained in the string I{jobCode}. 56 54 55 The namespace can include a no-argument function C{shutdown}, which may 56 return a deferred. If it is present, the function will be called before 57 my process exits. 58 57 59 Returns C{True} if the I{jobCode} was accepted and executed OK, or 58 60 C{False} otherwise. 59 61 """ 60 def tryJob():61 try:62 namespace = {}63 exec jobCode in namespace64 except:65 return False66 self.jobs[jobID] = namespace67 return True68 69 62 if not self.trusted: 70 63 return False 71 return self.queue.call(tryJob, niceness=-10) 64 try: 65 namespace = {} 66 exec jobCode in namespace 67 except: 68 return False 69 self.jobs[jobID] = namespace 70 return True 72 71 73 72 def remote_runJob(self, jobID, callName, *args, **kw): 74 73 """ 75 Runs the specified callable object with any supplied arguments and 76 keywords in the namespace of the specified I{jobID}. 77 78 @param jobID: An integer uniquely specifying an already-registered job. 79 80 @param callName: A string with the name of a callable object in the 81 job's namespace. 82 83 @args: Any arguments for the call. 84 85 @kw: Any keywords for the call. 86 87 @return: A deferred to the eventual result of the call. 88 74 Calls the object that is present in the namespace of I{jobID} under the 75 specified I{callName}, with any supplied arguments and 76 keywords. Returns the result of the call, which may be a deferred. 89 77 """ 90 78 if not self.trusted: … … 92 80 calledObject = getattr(self.jobs[jobID], callName, None) 93 81 if callable(calledObject): 94 return self.queue.call(calledObject,*args, **kw)82 return calledObject(*args, **kw) 95 83 raise AttributeError( 96 84 "No callable object '%s' defined in namespace for job %d" \ … … 99 87 def remote_exit(self): 100 88 """ 101 Terminates the child worker process. 102 """ 103 self.queue.shutdown().addCallback(lambda _: reactor.stop()) 89 Terminates my child worker process, calling and waiting for any 90 C{shutdown} methods present in my jobs' namespaces before stopping my 91 reactor. 92 """ 93 dList = [] 94 for namespace in self.jobs.itervalues(): 95 possibleShutdownFunction = namespace.get('shutdown', None) 96 if callable(possibleShutdownFunction): 97 dList.append(defer.maybeDeferred(possibleShutdownFunction)) 98 return defer.DeferredList(dList).addCallback(lambda _: reactor.stop()) 104 99 105 100 … … 158 153 return self.queue.shutdown() 159 154 160 def attachChild(self, childRoot): 161 """ 162 Attaches a new child interpreter worker. 163 164 @return: A deferred that fires with a unique ID for the interpreter 165 when all of the currently registered jobs have been tried on it. 166 155 def attachChild(self, childRoot, N=3): 156 """ 157 Attaches a new child interpreter worker using the supplied I{childRoot} 158 PB root reference. 159 160 The default number (3) of job runs that the worker is willing to queue 161 up on its end can be overridden with the I{N} keyword. 162 163 Returns a deferred that fires with a unique ID for the interpreter when 164 all of the currently registered jobs have been tried on it. 167 165 """ 168 166 def tried(success, jobID): … … 175 173 TypeError("Supplied root reference doesn't conform")) 176 174 177 worker = ChildWorker(childRoot )175 worker = ChildWorker(childRoot, N) 178 176 result = [self.queue.attachWorker(worker)] 179 177 dList = [] projects/AsynQueue/trunk/asynqueue/processworker.py
r83 r84 90 90 def startup(self, N=1): 91 91 """ 92 Starts I{N} child interpreters and attaches them to my queue. Returns a 93 deferred that fires with a list of the IDs for the interpreters when 94 they have all been started and the queue is ready to accept jobs for 95 them. 92 Starts I{N} child interpreters and attaches instances of 93 L{jobs.ChildWorker} for each of them to my queue. 94 95 The workers are set to accept just one job run at a time because 96 network latency isn't an issue. The PB connection is via STDIO. 97 98 Returns a deferred that fires with a list of the IDs for the 99 interpreters when they have all been started and the queue is ready to 100 accept jobs for them. 96 101 """ 97 102 if not hasattr(self, 'children'): … … 101 106 yield wfd 102 107 process, root = wfd.getResult() 103 wfd = defer.waitForDeferred(self.attachChild(root ))108 wfd = defer.waitForDeferred(self.attachChild(root, 1)) 104 109 yield wfd 105 110 childID = wfd.getResult() projects/Twisted-Goodies/branches/simpleserver-process-wsgi/twisted_goodies/simpleserver/http/wsgi/parent.py
r83 r84 70 70 jobCode = fh.read() 71 71 fh.close() 72 jobCode += '\n' + self.appCode() 72 73 d = self.mgr.new(jobCode) 73 74 d.addCallback(lambda jobID: setattr(self, 'jobID', jobID))
