Changeset 76
- Timestamp:
- 08/17/07 16:34:03 (1 year ago)
- Files:
-
- projects/AsynCluster/trunk/asyncluster/master/jobs.py (modified) (2 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/client.py (modified) (4 diffs)
- projects/AsynQueue/branches/processworker/asynqueue/jobs.py (added)
- projects/AsynQueue/branches/processworker/asynqueue/processworker.py (moved) (moved from projects/AsynQueue/branches/processworker/asynqueue/pbstdio.py) (4 diffs)
- projects/AsynQueue/branches/processworker/asynqueue/test/mock.py (added)
- projects/AsynQueue/branches/processworker/asynqueue/test/test_jobs.py (added)
- projects/AsynQueue/trunk/asynqueue/workers.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/jobs.py
r2 r76 26 26 from twisted.internet import defer 27 27 28 from twisted_goodies import taskqueue28 import asynqueue 29 29 30 30 31 class NodeWorker( taskqueue.RemoteCallWorker):31 class NodeWorker(asynqueue.RemoteCallWorker): 32 32 """ 33 I implement a L{ taskqueue.IWorker} that runs tasks, up to I{N} pending at a33 I implement a L{asynqueue.IWorker} that runs tasks, up to I{N} pending at a 34 34 time, in a particular job on a particular node. 35 35 """ … … 52 52 def __init__(self): 53 53 self.jobs = {} 54 self.queue = taskqueue.TaskQueue()54 self.queue = asynqueue.TaskQueue() 55 55 56 56 def shutdown(self): projects/AsynCluster/trunk/asyncluster/ndm/client.py
r16 r76 32 32 from twisted.spread import pb 33 33 34 from asynqueue import jobs 35 34 36 35 37 WINDOW_LINGER_TIME = 0.5 … … 43 45 44 46 45 class ClientRoot( pb.Referenceable):47 class ClientRoot(jobs.ChildRoot): 46 48 """ 47 49 I am the root resource for one cluster node. … … 50 52 self.main = main 51 53 self.serverPassword = serverPassword 52 self.trusted = False53 self.jobs = {}54 54 55 55 def remote_reverseLogin(self, password): … … 74 74 """ 75 75 pass 76 77 def remote_newJob(self, jobID, jobCode):78 """79 Registers the job identified by the specified integer I{jobID} and80 represented by Python code contained in the string I{jobCode}.81 82 @return: C{True} if the I{jobCode} was accepted and executed OK,83 C{False} otherwise.84 85 """86 if not self.trusted:87 return False88 try:89 namespace = {}90 exec jobCode in namespace91 except:92 return False93 self.jobs[jobID] = namespace94 return True95 96 def remote_runJob(self, jobID, callName, *args, **kw):97 """98 Runs the specified callable object with any supplied99 arguments and keywords in the namespace of the specified I{jobID}.100 101 @param jobID: An integer uniquely specifying an already-registered job.102 103 @param callName: A string with the name of a callable object in the104 job's namespace.105 106 @args: Any arguments for the call.107 108 @kw: Any keywords for the call.109 110 @return: A deferred to the eventual result of the call.111 112 """113 if self.trusted:114 calledObject = getattr(self.jobs[jobID], callName, None)115 if callable(calledObject):116 self.d_runningJob = defer.maybeDeferred(117 calledObject(*args, **kw))118 return self.d_runningJob119 76 120 77 projects/AsynQueue/branches/processworker/asynqueue/processworker.py
r75 r76 19 19 20 20 """ 21 Perspective Broker (PB) over STDIO 21 An IWorker implementation using Perspective Broker (PB) over STDIO. 22 22 23 23 Based on coding by Konrads Smelkovs … … 25 25 26 26 from twisted.internet import protocol, stdio, reactor 27 28 import base, workers 27 29 28 30 … … 64 66 """ 65 67 self.proto.connectionLost(reason) 68 69 70 class ChildRoot(pb.Root): 71 """ 72 I am the root resource for one child worker process. 73 """ 74 def __init__(self): 75 self.jobs = {} 76 self.queue = base.TaskQueue(workers.ThreadWorker()) 77 # What's this next line for???? 78 self.pbfactory = pb.PBServerFactory(self) 79 80 def remote_newJob(self, jobID, jobCode): 81 """ 82 Registers the job identified by the specified integer I{jobID} and 83 represented by Python code contained in the string I{jobCode}. 84 85 Returns a deferred that fires with C{True} if the I{jobCode} executed 86 OK, or C{False} otherwise. 87 """ 88 def tryJob(): 89 try: 90 namespace = {} 91 exec jobCode in namespace 92 except: 93 return False 94 self.jobs[jobID] = namespace 95 return True 96 97 return self.queue.call(tryJob, niceness=-10) 98 99 def remote_runJob(self, jobID, callName, *args, **kw): 100 """ 101 Runs the specified callable object with any supplied arguments and 102 keywords in the namespace of the specified I{jobID}. 66 103 104 @param jobID: An integer uniquely specifying an already-registered job. 105 106 @param callName: A string with the name of a callable object in the 107 job's namespace. 108 109 @args: Any arguments for the call. 110 111 @kw: Any keywords for the call. 112 113 @return: A deferred to the eventual result of the call. 114 115 """ 116 calledObject = getattr(self.jobs[jobID], callName, None) 117 if callable(calledObject): 118 return self.queue.call(calledObject, *args, **kw) 119 raise AttributeError( 120 "No callable object '%s' defined in namespace for job %d" \ 121 % (callName, jobID)) 122 123 def remote_exit(self): 124 """ 125 Terminates the child worker process. 126 """ 127 self.queue.shutdown().addCallback(lambda _: reactor.stop()) 128 129 130 class ProcessWorker(workers.RemoteCallWorker): 131 """ 132 I implement an L{asynqueue.IWorker} that runs tasks, up to I{N} pending at 133 a time, in a particular job and child process. 134 """ 135 def _runNow(self, null, task): 136 funcName, args, kw = task.callTuple 137 d = self.remoteCaller('runJob', task.series, funcName, *args, **kw) 138 job = (task, d) 139 self.jobs.append(job) 140 d.addBoth(self._doneTrying, job) 141 # This task's deferred is NOT returned! 142 143 67 144 68 145 def PBStdioChildConnector(factory): … … 90 167 return process 91 168 169 170 def runAsChild(): 171 """ 172 This function takes care of everything needed for a Python interpreter to 173 act as a child process worker. 174 """ 175 root = ChildRoot() 176 177 178 if __name__ == '__main__': 179 runAsChild() projects/AsynQueue/trunk/asynqueue/workers.py
r55 r76 114 114 115 115 116 class ThreadWorker :116 class ThreadWorker(object): 117 117 """ 118 118 I implement an L{IWorker} that runs tasks in a dedicated worker thread. … … 212 212 213 213 214 class RemoteCallWorker :214 class RemoteCallWorker(object): 215 215 """ 216 216 Instances of me provide an L{IWorker} that dispatches
