Changeset 110
- Timestamp:
- 11/23/07 23:01:05 (1 year ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (9 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_jobs.py (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/jobs.py
r106 r110 70 70 return False, failureObject.getTraceback() 71 71 72 def remote_ allowClasses(self, *args):73 """ 74 Instructs my broker to allowthe classes specified by the argument(s).75 76 The classes will be allowed for B{all} jobs, and are specified by their72 def remote_registerClasses(self, *args): 73 """ 74 Instructs my broker to register the classes specified by the argument(s). 75 76 The classes will be registered for B{all} jobs, and are specified by their 77 77 string representations:: 78 78 … … 83 83 # Load the class for the string representation 84 84 cls = namedObject(stringRep) 85 # Allowinstances of the class, including its type and module86 pb. globalSecurity.allowInstancesOf(cls)85 # Register instances of the class, including its type and module 86 pb.setUnjellyableForClass(stringRep, cls) 87 87 88 88 def remote_newJob(self, jobID, jobCode): … … 198 198 199 199 """ 200 maxRetries = 2 201 200 202 def __init__(self, queue=None): 201 203 self.jobs = {} 202 204 self.updates = {} 203 self.allowedClasses = {} 205 self.callsPending = {} 206 self.registeredClasses = {} 204 207 if queue is None: 205 208 self.queue = base.TaskQueue() … … 255 258 def jobTried(status): 256 259 if status: 257 d = self._run AllowClasses(worker)260 d = self._runRegisterClasses(worker) 258 261 if jobID in self.updates: 259 262 d.addCallback(lambda _: self._runUpdate(jobID, worker)) … … 303 306 """ 304 307 jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 308 print "NEW", jobID 309 self.callsPending[jobID] = {} 305 310 self.jobs[jobID] = [jobCode, niceness] 306 311 … … 338 343 return defer.DeferredList(dList) 339 344 340 def _run AllowClasses(self, worker):345 def _runRegisterClasses(self, worker): 341 346 stringReps = [] 342 for stringRep, allowingWorkers in self.allowedClasses.iteritems():343 if worker.ID in allowingWorkers:347 for stringRep, registeredWorkers in self.registeredClasses.iteritems(): 348 if worker.ID in registeredWorkers: 344 349 continue 345 allowingWorkers.append(worker.ID)350 registeredWorkers.append(worker.ID) 346 351 stringReps.append(stringRep) 347 return worker.remoteCaller('allowClasses', *stringReps) 348 349 def allowClasses(self, *args): 350 """ 351 Instructs my current and future nodes to allow the classes specified by 352 the argument(s). The classes will be allowed for B{all} jobs. The 353 classes are specified by their string representations:: 352 return worker.remoteCaller('registerClasses', *stringReps) 353 354 def registerClasses(self, *args): 355 """ 356 Instructs my current and future nodes to register the classes specified 357 by the argument(s) as self-unjellyable and allowable past PB 358 security. The classes will be registered for B{all} jobs, and are 359 specified by their string representations:: 354 360 355 361 <package(s).module.class> 362 363 Use judiciously! 356 364 357 365 """ 358 366 for stringRep in args: 359 if stringRep not in self. allowedClasses:360 self. allowedClasses[stringRep] = []367 if stringRep not in self.registeredClasses: 368 self.registeredClasses[stringRep] = [] 361 369 dList = [ 362 self._runAllowClasses(worker) for worker in self.queue.workers()] 370 self._runRegisterClasses(worker) 371 for worker in self.queue.workers()] 363 372 return defer.DeferredList(dList) 364 373 … … 383 392 384 393 """ 385 def jobRan(result ):394 def jobRan(result, d): 386 395 status, result = result 396 retryCount, callName, args, kw = self.callsPending[jobID].pop(d) 387 397 if status: 388 398 return result 389 399 log("Error running job %d:\n%s", jobID, result) 390 391 def jobFailed(failure): 400 if retryCount < self.maxRetries: 401 kw['retryCount'] = retryCount + 1 402 return self.run(jobID, callName, *args, **kw) 403 404 def jobFailed(failure, d): 405 # TODO: Do something different here, as it's a more drastic 406 # situation 407 self.callsPending[jobID].pop(d) 392 408 log("Unexpected error running job %d:\n%s", 393 409 jobID, failure.getTraceback()) … … 398 414 kw['series'] = jobID 399 415 kw['niceness'] = self.jobs[jobID][1] 416 retryCount = kw.pop('retryCount', 0) 400 417 d = self.queue.call(callName, *args, **kw) 401 d.addCallbacks(jobRan, jobFailed) 418 d.addCallback(jobRan, d) 419 d.addErrback(jobFailed, d) 420 self.callsPending[jobID][d] = retryCount, callName, args, kw 402 421 return d 403 422 … … 410 429 self.jobs.pop(jobID, None) 411 430 self.updates.pop(jobID, None) 412 413 431 self.callsPending.pop(jobID, None) 432 433 projects/AsynQueue/trunk/asynqueue/test/test_jobs.py
r106 r110 56 56 57 57 58 class Test_ChildRoot_ allowClasses(mock.TestCase):58 class Test_ChildRoot_registerClasses(mock.TestCase): 59 59 class TestableChildRoot(jobs.ChildRoot): 60 60 def remote_take(self, thing): … … 77 77 return self.server.stopListening() 78 78 79 def test_ allowClasses(self):79 def test_registerClasses(self): 80 80 def check(result): 81 81 self.failUnless(result) … … 84 84 thingy = Thingy() 85 85 stringReps = ["%s.Thingy" % thingy.__module__] 86 d = self.ref.callRemote(" allowClasses", *stringReps)86 d = self.ref.callRemote("registerClasses", *stringReps) 87 87 d.addCallback(lambda _: self.ref.callRemote('take', thingy)) 88 88 d.addCallback(check) … … 133 133 self.callbacks = [] 134 134 self.jobs = {} 135 self. allowedClasses = []135 self.registeredClasses = [] 136 136 137 137 def notifyOnDisconnect(self, callback): … … 153 153 except: 154 154 result = (False, failure.Failure().getTraceback()) 155 elif called == ' allowClasses':156 self. allowedClasses.extend(list(args))155 elif called == 'registerClasses': 156 self.registeredClasses.extend(list(args)) 157 157 result = None 158 158 elif called == 'exit': … … 224 224 return self.mgr.shutdown() 225 225 226 def _newJob(self): 227 self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 228 self.mgr.callsPending[JOB_ID] = {} 229 226 230 def _attach(self, bogus=False): 227 231 def attached(childID): … … 237 241 self.nextCall('Root.callRemote', ('newJob', JOB_ID, JOB_CODE)) 238 242 239 self. mgr.jobs[JOB_ID] = (JOB_CODE, 0)243 self._newJob() 240 244 return self._attach().addCallback(check) 241 245 242 246 def test_attachChild_bogus(self): 243 self. mgr.jobs[JOB_ID] = (JOB_CODE, 0)247 self._newJob() 244 248 return self._attach(bogus=True).addCallback(self.failUnlessEqual, None) 245 249 246 250 def test_attachChild_withUpdate(self): 247 self. mgr.jobs[JOB_ID] = (JOB_CODE, 0)251 self._newJob() 248 252 # This must run first, on attachment 249 253 d1 = self.mgr.update(JOB_ID, 'setup', 1) … … 279 283 return d 280 284 281 def test_ allowClasses(self):285 def test_registerClasses(self): 282 286 def check(null): 283 self.failUnlessElementsEqual(self.root.allowedClasses, stringReps) 287 self.failUnlessElementsEqual( 288 self.root.registeredClasses, stringReps) 284 289 285 290 stringReps = ['foo.bar.SomeClass', 'foo.bar.AnotherClass'] 286 291 d = self._attach() 287 292 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 288 d.addCallback(lambda _: self.mgr. allowClasses(*stringReps))293 d.addCallback(lambda _: self.mgr.registerClasses(*stringReps)) 289 294 d.addCallback(check) 290 295 return d
