Changeset 106
- Timestamp:
- 11/22/07 12:57:15 (1 year ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (7 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_jobs.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/jobs.py
r103 r106 26 26 from twisted.internet import defer, reactor 27 27 from twisted.python.failure import Failure 28 from twisted.python.reflect import namedObject 28 29 from twisted.spread import pb, flavors 29 30 … … 68 69 failureObject = Failure(*arg) 69 70 return False, failureObject.getTraceback() 71 72 def remote_allowClasses(self, *args): 73 """ 74 Instructs my broker to allow the classes specified by the argument(s). 75 76 The classes will be allowed for B{all} jobs, and are specified by their 77 string representations:: 78 79 <package(s).module.class> 80 81 """ 82 for stringRep in args: 83 # Load the class for the string representation 84 cls = namedObject(stringRep) 85 # Allow instances of the class, including its type and module 86 pb.globalSecurity.allowInstancesOf(cls) 70 87 71 88 def remote_newJob(self, jobID, jobCode): … … 93 110 self.jobs[jobID] = namespace 94 111 return True, [x[0] for x in namespace.iteritems() if callable(x[1])] 95 112 96 113 def remote_runJob(self, jobID, callName, *args, **kw): 97 114 """ … … 184 201 self.jobs = {} 185 202 self.updates = {} 203 self.allowedClasses = {} 186 204 if queue is None: 187 205 self.queue = base.TaskQueue() … … 229 247 worker is not hired. 230 248 231 The default number ( 3) of job runs that the worker is willing to queue232 up on its end can be overridden with the I{N} keyword.233 249 The default number (three) of job runs that the worker is willing to 250 queue up on its end can be overridden with the I{N} keyword. 251 234 252 Returns a deferred that fires with the worker's ID, or C{None} if not 235 253 hired. 236 254 """ 237 255 def jobTried(status): 238 if status is False: 239 mutable.append(None) 240 elif jobID in self.updates: 241 return self._runUpdate(jobID, worker) 256 if status: 257 d = self._runAllowClasses(worker) 258 if jobID in self.updates: 259 d.addCallback(lambda _: self._runUpdate(jobID, worker)) 260 return d 261 mutable.append(None) 242 262 243 263 def allDone(null): … … 306 326 def update(self, jobID, callName, *args, **kw): 307 327 """ 308 Appends a new task to the update list for the specified I{jobID}, 309 running the new update task on all workers currently attached. 328 Appends a new task to the update list for the specified I{jobID}. Runs 329 the new update task on all workers currently attached and ensures that 330 all new workers run the task for that job before they run any other 331 tasks for it. 310 332 """ 311 333 if jobID not in self.updates: … … 314 336 dList = [ 315 337 self._runUpdate(jobID, worker) for worker in self.queue.workers()] 338 return defer.DeferredList(dList) 339 340 def _runAllowClasses(self, worker): 341 stringReps = [] 342 for stringRep, allowingWorkers in self.allowedClasses.iteritems(): 343 if worker.ID in allowingWorkers: 344 continue 345 allowingWorkers.append(worker.ID) 346 stringReps.append(stringRep) 347 return worker.remoteCaller('allowClasses', *stringReps) 348 349 def allowClasses(self, *args): 350 """ 351 Instructs my current and future nodes to allow the classes specified by 352 the argument(s). The classes will be allowed for B{all} jobs. The 353 classes are specified by their string representations:: 354 355 <package(s).module.class> 356 357 """ 358 for stringRep in args: 359 if stringRep not in self.allowedClasses: 360 self.allowedClasses[stringRep] = [] 361 dList = [ 362 self._runAllowClasses(worker) for worker in self.queue.workers()] 316 363 return defer.DeferredList(dList) 317 364 projects/AsynQueue/trunk/asynqueue/test/test_jobs.py
r103 r106 22 22 """ 23 23 24 from twisted.internet import defer 24 from twisted.internet import defer, reactor 25 25 from twisted.python import failure 26 from twisted.spread import pb, flavors 26 from twisted.spread import pb, flavors, jelly 27 27 28 28 import mock, jobs … … 49 49 50 50 51 class Test_ChildRoot(mock.TestCase): 51 class Thingy(jelly.Jellyable, jelly.Unjellyable): 52 def setFoo(self, foo): 53 self.foo = foo 54 def getFoo(self): 55 return self.foo 56 57 58 class Test_ChildRoot_allowClasses(mock.TestCase): 59 class TestableChildRoot(jobs.ChildRoot): 60 def remote_take(self, thing): 61 self.thing = thing 62 return True 63 64 def setUp(self): 65 self.root = self.TestableChildRoot() 66 self.root.trusted = True 67 self.server = reactor.listenTCP(0, pb.PBServerFactory(self.root)) 68 clientFactory = pb.PBClientFactory() 69 reactor.connectTCP( 70 "127.0.0.1", self.server.getHost().port, clientFactory) 71 d = clientFactory.getRootObject() 72 d.addCallback(lambda x: setattr(self, 'ref', x)) 73 return d 74 75 def tearDown(self): 76 self.ref.broker.transport.loseConnection() 77 return self.server.stopListening() 78 79 def test_allowClasses(self): 80 def check(result): 81 self.failUnless(result) 82 self.failUnless(isinstance(self.root.thing, Thingy)) 83 84 thingy = Thingy() 85 stringReps = ["%s.Thingy" % thingy.__module__] 86 d = self.ref.callRemote("allowClasses", *stringReps) 87 d.addCallback(lambda _: self.ref.callRemote('take', thingy)) 88 d.addCallback(check) 89 return d 90 91 92 class Test_ChildRoot_Other(mock.TestCase): 52 93 def setUp(self): 53 94 self.root = jobs.ChildRoot() … … 92 133 self.callbacks = [] 93 134 self.jobs = {} 135 self.allowedClasses = [] 94 136 95 137 def notifyOnDisconnect(self, callback): … … 111 153 except: 112 154 result = (False, failure.Failure().getTraceback()) 155 elif called == 'allowClasses': 156 self.allowedClasses.extend(list(args)) 157 result = None 113 158 elif called == 'exit': 114 159 result = None … … 232 277 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 233 278 d.addCallback(gotJobID) 279 return d 280 281 def test_allowClasses(self): 282 def check(null): 283 self.failUnlessElementsEqual(self.root.allowedClasses, stringReps) 284 285 stringReps = ['foo.bar.SomeClass', 'foo.bar.AnotherClass'] 286 d = self._attach() 287 d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 288 d.addCallback(lambda _: self.mgr.allowClasses(*stringReps)) 289 d.addCallback(check) 234 290 return d 235 291
