Changeset 91
- Timestamp:
- 10/04/07 18:42:02 (1 year ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (8 diffs)
- projects/AsynQueue/trunk/asynqueue/tasks.py (modified) (2 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_base.py (modified) (1 diff)
- projects/AsynQueue/trunk/asynqueue/test/test_jobs.py (modified) (10 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_processworker.py (modified) (1 diff)
- projects/AsynQueue/trunk/asynqueue/test/test_tasks.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/jobs.py
r90 r91 30 30 import base, workers 31 31 32 VERBOSE = False 33 def log(msgProto, *args): 34 if VERBOSE: 35 print msgProto % args 36 37 38 class TrustError(Exception): 39 pass 40 32 41 33 42 class ChildRoot(pb.Root): … … 48 57 return result 49 58 59 def oops(self, *arg): 60 """ 61 Returns a C{False} status code for a remote call along with a string 62 traceback of the exception raised. You can supply your own exception or 63 L{Failure} instance. If you don't, the current exception will be used. 64 """ 65 if arg and isinstance(arg[0], Failure): 66 failureObject = arg[0] 67 else: 68 failureObject = Failure(*arg) 69 return False, failureObject.getTraceback() 70 50 71 def remote_newJob(self, jobID, jobCode): 51 72 """ … … 57 78 my process exits. 58 79 59 Returns C{True} if the I{jobCode} was accepted and executed OK, or 60 C{False} otherwise. 80 Returns a tuple containing a status value and the result of loading the 81 job. If all went well, the status is C{True} and the result is a list 82 of names of the callable objects in the job's namespace. If there was a 83 problem, the status is C{False} and the result is a string traceback of 84 the exception that was raised. 61 85 """ 62 86 if not self.trusted: 63 return False87 return self.oops(TrustError) 64 88 try: 65 89 namespace = {} 66 90 exec jobCode in namespace 67 91 except: 68 return False92 return self.oops() 69 93 self.jobs[jobID] = namespace 70 return True 94 return True, [x[0] for x in namespace.iteritems() if callable(x[1])] 71 95 72 96 def remote_runJob(self, jobID, callName, *args, **kw): … … 74 98 Calls the object that is present in the namespace of I{jobID} under the 75 99 specified I{callName}, with any supplied arguments and 76 keywords. Returns the result of the call, which may be a deferred. 100 keywords. 101 102 Returns a tuple containing a status value and the result of the 103 call. loading the job. If all went well, the status is C{True} and the 104 result is the result, of whatever type. If there was a problem, the 105 status is C{False} and the result is a string traceback of the 106 exception that was raised. 77 107 """ 78 108 if not self.trusted: 79 r aise RuntimeError("Caller is not trusted at this time")109 return self.oops(TrustError) 80 110 calledObject = self.jobs[jobID].get(callName, None) 81 111 if callable(calledObject): 82 return calledObject(*args, **kw) 83 raise AttributeError( 112 d = defer.maybeDeferred(calledObject, *args, **kw) 113 d.addCallback(lambda x: (True, x)) 114 d.addErrback(self.oops) 115 return d 116 return self.oops( 117 AttributeError( 84 118 "No callable object '%s' defined in namespace for job %d" \ 85 % (callName, jobID)) 119 % (callName, jobID))) 86 120 87 121 def remote_exit(self): … … 109 143 kw['noTypeCheck'] = True 110 144 workers.RemoteCallWorker.__init__(self, *args, **kw) 111 145 112 146 def _runNow(self, null, task): 113 147 funcName, args, kw = task.callTuple … … 153 187 return self.queue.shutdown() 154 188 189 def jobTried(self, result, jobID, worker): 190 """ 191 Callback for loading a new job or running a call on an existing one. 192 193 If the worker's root reference raised an unexpected failure, returns 194 C{False}. If everything went OK, returns C{True}. If there was a 195 failure that may not have been the worker's fault, returns C{None}. 196 """ 197 if hasattr(result, 'check'): 198 # Oops, failure from a bogus root reference. 199 log("Worker %d supplied nonconforming root reference", worker.ID) 200 return False 201 if result[0]: 202 msg = "Callable objects: %s" % ", ".join(result[1]) 203 log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg) 204 self.queue.qualifyWorker(worker, jobID) 205 return True 206 log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1]) 207 return None 208 155 209 def attachChild(self, childRoot, N=3): 156 210 """ … … 158 212 PB root reference. 159 213 214 Tries to load all of the currently registered jobs on the worker. If an 215 unexpected failure (not a simple job-loading exception) arises, the 216 worker is not hired. 217 160 218 The default number (3) of job runs that the worker is willing to queue 161 219 up on its end can be overridden with the I{N} keyword. 162 163 Returns a deferred that fires with a unique ID for the interpreter when 164 all of the currently registered jobs have been tried on it. 165 """ 166 def tried(success, jobID): 167 if success: 168 self.queue.qualifyWorker(worker, jobID) 169 170 def oops(failure): 171 if failure.check(flavors.NoSuchMethod): 172 result[0] = Failure( 173 TypeError("Supplied root reference doesn't conform")) 174 220 221 Returns a deferred that fires with the worker's ID, or C{None} if not 222 hired. 223 """ 224 def jobTried(status): 225 if status is False: 226 mutable.append(None) 227 228 def allDone(null): 229 if len(mutable): 230 d = self.queue.detachWorker(worker) 231 d.addCallback(lambda _: None) 232 return d 233 return worker.ID 234 235 mutable = [] 175 236 worker = ChildWorker(childRoot, N) 176 result = [self.queue.attachWorker(worker)]237 self.queue.attachWorker(worker) 177 238 dList = [] 178 239 for jobID, jobInfo in self.jobs.iteritems(): 179 240 jobCode = jobInfo[0] 180 241 d = childRoot.callRemote('newJob', jobID, jobCode) 181 d.add Callback(tried, jobID)182 d.add Errback(oops)242 d.addBoth(self.jobTried, jobID, worker) 243 d.addCallback(jobTried) 183 244 dList.append(d) 184 return defer.DeferredList(dList).addCallback( lambda _: result[0])245 return defer.DeferredList(dList).addCallback(allDone) 185 246 186 247 def detachChild(self, childID): … … 206 267 207 268 """ 208 def tried(success, worker):209 if success:210 self.queue.qualifyWorker(worker, jobID)211 212 269 jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 213 self.jobs[jobID] = (jobCode, niceness)270 self.jobs[jobID] = [jobCode, niceness] 214 271 215 272 dList = [] 216 273 for worker in self.queue.workers(): 217 274 d = worker.remoteCaller('newJob', jobID, jobCode) 218 d.add Callback(tried, worker)275 d.addBoth(self.jobTried, jobID, worker) 219 276 dList.append(d) 220 277 d = defer.DeferredList(dList) projects/AsynQueue/trunk/asynqueue/tasks.py
r55 r91 290 290 request an initial assignment from each queue. 291 291 292 @return: An integer ID uniquely identifying the worker. 293 292 The method generates an integer ID uniquely identifying the worker, and 293 gives the worker an C{ID} attribute with the ID for its own reference, 294 The ID is returned as well. 294 295 """ 295 296 if not IWorker.providedBy(worker): … … 305 306 for series in qualifications: 306 307 self.assignmentFactory.request(worker, series) 307 workerID = getattr(self, '_workerCounter', 0) + 1308 workerID = worker.ID = getattr(self, '_workerCounter', 0) + 1 308 309 self._workerCounter = workerID 309 310 self.workers[workerID] = worker projects/AsynQueue/trunk/asynqueue/test/test_base.py
r90 r91 35 35 class Test_Priority(TestCase): 36 36 def setUp(self): 37 self.heap = Priority()37 self.heap = base.Priority() 38 38 39 def test_getInOrder(self): 40 dList = [] 41 for item in (2,1,4,0,3): 42 self.heap.put(item) 43 for item in xrange(5): 44 d = self.heap.get() 45 d.addCallback(self.failUnlessEqual, item) 46 dList.append(d) 47 return defer.DeferredList(dList) 48 49 def test_getBeforePut(self): 50 dList, items = [], [] 51 for item in xrange(5): 52 self.heap.put(item) 53 for item in xrange(5): 54 d = self.heap.get() 55 d.addCallback(items.append) 56 dList.append(d) 57 return defer.DeferredList(dList).addCallback( 58 lambda _: self.failUnlessEqual(sum(items), 10)) 59 39 60 def test_cancel(self): 40 self.fail("Need to test this!") 61 dList, items = [], [] 62 for item in xrange(5): 63 self.heap.put(item) 64 self.heap.cancel(lambda x: x is 2) 65 for item in xrange(4): 66 d = self.heap.get() 67 d.addCallback(items.append) 68 dList.append(d) 69 return defer.DeferredList(dList).addCallback( 70 lambda _: self.failUnlessEqual(items, [0,1,3,4])) 41 71 42 72 projects/AsynQueue/trunk/asynqueue/test/test_jobs.py
r79 r91 33 33 def test(a, b, c=0): 34 34 return a + 2*b + 3*c 35 36 def bogusable(x): 37 return 1.0 / x 38 35 39 """ 40 41 42 class Test_ChildRoot(mock.TestCase): 43 def setUp(self): 44 self.root = jobs.ChildRoot() 45 self.root.trusted = True 46 47 def test_newJob_OK(self): 48 result = self.root.remote_newJob(JOB_ID, JOB_CODE) 49 self.failUnlessEqual(result[0], True) 50 self.failUnlessElementsEqual(result[1], ['test', 'bogusable']) 51 52 def test_newJob_bogus(self): 53 bogusJobCode = JOB_CODE + "\nbogus\n" 54 result = self.root.remote_newJob(JOB_ID, bogusJobCode) 55 self.failUnlessEqual(result[0], False) 56 self.failUnless('bogus' in result[1]) 57 58 def test_runJob_OK(self): 59 def check(result): 60 self.failUnlessEqual(result[0], True) 61 self.failUnlessEqual(result[1], 600) 62 63 self.root.remote_newJob(JOB_ID, JOB_CODE) 64 d = self.root.remote_runJob(JOB_ID, 'test', 100, 100, 100) 65 d.addCallback(check) 66 return d 67 68 def test_runJob_bogus(self): 69 def check(result): 70 self.failUnlessEqual(result[0], False) 71 self.failUnless('ZeroDivisionError' in result[1]) 72 73 self.root.remote_newJob(JOB_ID, JOB_CODE) 74 d = self.root.remote_runJob(JOB_ID, 'bogusable', 0) 75 d.addCallback(check) 76 return d 36 77 37 78 … … 51 92 exec jobCode in namespace 52 93 self.jobs[jobID] = namespace 53 result = True 94 result = ( 95 True, [x[0] for x in namespace.iteritems() if callable(x[1])]) 54 96 elif not self.bogus and called == 'runJob': 55 97 namespace = self.jobs[args[0]] 56 98 calledObject = namespace[args[1]] 57 result = calledObject(*args[2:], **kw) 99 try: 100 result = (True, calledObject(*args[2:], **kw)) 101 except: 102 result = (False, failure.Failure().getTraceback()) 58 103 elif called == 'exit': 59 104 result = None … … 83 128 def setUp(self): 84 129 self.root = Mock_Root() 130 self.root.clearCalls() 85 131 self.worker = jobs.ChildWorker(self.root, noTypeCheck=True) 86 132 return self.root.callRemote('newJob', JOB_ID, JOB_CODE) … … 89 135 return self.worker.stop() 90 136 91 def test_SingleTask (self):137 def test_SingleTask_OK(self): 92 138 def checkResult(result): 93 self.failUnlessEqual(result, 1+2*2+3*3)139 self.failUnlessEqual(result, (True, 1+2*2+3*3)) 94 140 self.nextCall(self.root.calls) 95 141 self.nextCall('Root.callRemote') … … 103 149 return task.d 104 150 151 def test_SingleTask_Fail(self): 152 def checkResult(result): 153 self.failUnlessEqual(result[0], False) 154 self.failUnless('ZeroDivisionError' in result[1]) 155 156 task = Mock_Task('bogusable', (0,), {}, 0, JOB_ID) 157 self.worker.run(task) 158 task.d.addCallback(checkResult) 159 return task.d 160 105 161 106 162 class Test_JobManager(mock.TestCase): … … 116 172 def attached(childID): 117 173 self.childID = childID 174 return childID 118 175 119 176 self.root = Mock_Root(bogus) … … 129 186 130 187 def test_attachChild_bogus(self): 131 def check(result):132 self.failUnless(isinstance(result, failure.Failure))133 134 188 self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 135 return self._attach(bogus=True).add Both(check)189 return self._attach(bogus=True).addCallback(self.failUnlessEqual, None) 136 190 137 191 def test_new(self): … … 148 202 def test_run_one(self): 149 203 def check(result): 150 self.failUnlessEqual(result, 50)204 self.failUnlessEqual(result, (True, 50)) 151 205 152 206 d = self._attach() … … 167 221 yield wfd 168 222 results.append(wfd.getResult()) 169 self.failUnlessEqual( results, range(10))223 self.failUnlessEqual([x[1] for x in results], range(10)) 170 224 171 225 @defer.deferredGenerator … … 182 236 dList.append(d) 183 237 yield defer.waitForDeferred(defer.DeferredList(dList)) 184 self.failUnlessEqual( results, range(10))238 self.failUnlessEqual([x[1] for x in results], range(10)) projects/AsynQueue/trunk/asynqueue/test/test_processworker.py
r79 r91 45 45 self.mgr.children = {1:process} 46 46 d = root.callRemote('newJob', JOB_ID, JOB_CODE) 47 d.addCallback(self.failUnlessEqual, True)47 d.addCallback(self.failUnlessEqual, (True, ['test'])) 48 48 return d 49 49 projects/AsynQueue/trunk/asynqueue/test/test_tasks.py
r55 r91 172 172 if isinstance(x, defer.Deferred)], [True]) 173 173 174 def testHireSetWorkerID(self): 175 worker = MockWorker() 176 workerID = self.mgr.hire(worker) 177 self.failUnlessEqual(getattr(worker, 'ID', None), workerID) 178 174 179 def testHireClassQualifications(self): 175 180 class CQWorker(MockWorker):
