Changeset 110

Show
Ignore:
Timestamp:
11/23/07 23:01:05 (1 year ago)
Author:
edsuom
Message:

Self-jellyable classes are registered, not just allowed

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r106 r110  
    7070        return False, failureObject.getTraceback() 
    7171 
    72     def remote_allowClasses(self, *args): 
    73         """ 
    74         Instructs my broker to allow the classes specified by the argument(s). 
    75  
    76         The classes will be allowed for B{all} jobs, and are specified by their 
     72    def remote_registerClasses(self, *args): 
     73        """ 
     74        Instructs my broker to register the classes specified by the argument(s). 
     75 
     76        The classes will be registered for B{all} jobs, and are specified by their 
    7777        string representations:: 
    7878         
     
    8383            # Load the class for the string representation 
    8484            cls = namedObject(stringRep) 
    85             # Allow instances of the class, including its type and module 
    86             pb.globalSecurity.allowInstancesOf(cls) 
     85            # Register instances of the class, including its type and module 
     86            pb.setUnjellyableForClass(stringRep, cls) 
    8787 
    8888    def remote_newJob(self, jobID, jobCode): 
     
    198198     
    199199    """ 
     200    maxRetries = 2 
     201     
    200202    def __init__(self, queue=None): 
    201203        self.jobs  = {} 
    202204        self.updates = {} 
    203         self.allowedClasses = {} 
     205        self.callsPending = {} 
     206        self.registeredClasses = {} 
    204207        if queue is None: 
    205208            self.queue = base.TaskQueue() 
     
    255258        def jobTried(status): 
    256259            if status: 
    257                 d = self._runAllowClasses(worker) 
     260                d = self._runRegisterClasses(worker) 
    258261                if jobID in self.updates: 
    259262                    d.addCallback(lambda _: self._runUpdate(jobID, worker)) 
     
    303306        """ 
    304307        jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 
     308        print "NEW", jobID 
     309        self.callsPending[jobID] = {} 
    305310        self.jobs[jobID] = [jobCode, niceness] 
    306311         
     
    338343        return defer.DeferredList(dList) 
    339344 
    340     def _runAllowClasses(self, worker): 
     345    def _runRegisterClasses(self, worker): 
    341346        stringReps = [] 
    342         for stringRep, allowingWorkers in self.allowedClasses.iteritems(): 
    343             if worker.ID in allowingWorkers: 
     347        for stringRep, registeredWorkers in self.registeredClasses.iteritems(): 
     348            if worker.ID in registeredWorkers: 
    344349                continue 
    345             allowingWorkers.append(worker.ID) 
     350            registeredWorkers.append(worker.ID) 
    346351            stringReps.append(stringRep) 
    347         return worker.remoteCaller('allowClasses', *stringReps) 
    348      
    349     def allowClasses(self, *args): 
    350         """ 
    351         Instructs my current and future nodes to allow the classes specified by 
    352         the argument(s). The classes will be allowed for B{all} jobs. The 
    353         classes are specified by their string representations:: 
     352        return worker.remoteCaller('registerClasses', *stringReps) 
     353     
     354    def registerClasses(self, *args): 
     355        """ 
     356        Instructs my current and future nodes to register the classes specified 
     357        by the argument(s) as self-unjellyable and allowable past PB 
     358        security. The classes will be registered for B{all} jobs, and are 
     359        specified by their string representations:: 
    354360         
    355361            <package(s).module.class> 
     362 
     363        Use judiciously! 
    356364         
    357365        """ 
    358366        for stringRep in args: 
    359             if stringRep not in self.allowedClasses: 
    360                 self.allowedClasses[stringRep] = [] 
     367            if stringRep not in self.registeredClasses: 
     368                self.registeredClasses[stringRep] = [] 
    361369        dList = [ 
    362             self._runAllowClasses(worker) for worker in self.queue.workers()] 
     370            self._runRegisterClasses(worker) 
     371            for worker in self.queue.workers()] 
    363372        return defer.DeferredList(dList) 
    364373     
     
    383392 
    384393        """ 
    385         def jobRan(result): 
     394        def jobRan(result, d): 
    386395            status, result = result 
     396            retryCount, callName, args, kw = self.callsPending[jobID].pop(d) 
    387397            if status: 
    388398                return result 
    389399            log("Error running job %d:\n%s", jobID, result) 
    390  
    391         def jobFailed(failure): 
     400            if retryCount < self.maxRetries: 
     401                kw['retryCount'] = retryCount + 1 
     402                return self.run(jobID, callName, *args, **kw) 
     403 
     404        def jobFailed(failure, d): 
     405            # TODO: Do something different here, as it's a more drastic 
     406            # situation 
     407            self.callsPending[jobID].pop(d) 
    392408            log("Unexpected error running job %d:\n%s", 
    393409                jobID, failure.getTraceback()) 
     
    398414        kw['series'] = jobID 
    399415        kw['niceness'] = self.jobs[jobID][1] 
     416        retryCount = kw.pop('retryCount', 0) 
    400417        d = self.queue.call(callName, *args, **kw) 
    401         d.addCallbacks(jobRan, jobFailed) 
     418        d.addCallback(jobRan, d) 
     419        d.addErrback(jobFailed, d) 
     420        self.callsPending[jobID][d] = retryCount, callName, args, kw 
    402421        return d 
    403422 
     
    410429        self.jobs.pop(jobID, None) 
    411430        self.updates.pop(jobID, None) 
    412          
    413          
     431        self.callsPending.pop(jobID, None) 
     432         
     433         
  • projects/AsynQueue/trunk/asynqueue/test/test_jobs.py

    r106 r110  
    5656 
    5757 
    58 class Test_ChildRoot_allowClasses(mock.TestCase): 
     58class Test_ChildRoot_registerClasses(mock.TestCase): 
    5959    class TestableChildRoot(jobs.ChildRoot): 
    6060        def remote_take(self, thing): 
     
    7777        return self.server.stopListening() 
    7878     
    79     def test_allowClasses(self): 
     79    def test_registerClasses(self): 
    8080        def check(result): 
    8181            self.failUnless(result) 
     
    8484        thingy = Thingy() 
    8585        stringReps = ["%s.Thingy" % thingy.__module__] 
    86         d = self.ref.callRemote("allowClasses", *stringReps) 
     86        d = self.ref.callRemote("registerClasses", *stringReps) 
    8787        d.addCallback(lambda _: self.ref.callRemote('take', thingy)) 
    8888        d.addCallback(check) 
     
    133133        self.callbacks = [] 
    134134        self.jobs = {} 
    135         self.allowedClasses = [] 
     135        self.registeredClasses = [] 
    136136 
    137137    def notifyOnDisconnect(self, callback): 
     
    153153            except: 
    154154                result = (False, failure.Failure().getTraceback()) 
    155         elif called == 'allowClasses': 
    156             self.allowedClasses.extend(list(args)) 
     155        elif called == 'registerClasses': 
     156            self.registeredClasses.extend(list(args)) 
    157157            result = None 
    158158        elif called == 'exit': 
     
    224224        return self.mgr.shutdown() 
    225225 
     226    def _newJob(self): 
     227        self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 
     228        self.mgr.callsPending[JOB_ID] = {} 
     229 
    226230    def _attach(self, bogus=False): 
    227231        def attached(childID): 
     
    237241            self.nextCall('Root.callRemote', ('newJob', JOB_ID, JOB_CODE)) 
    238242 
    239         self.mgr.jobs[JOB_ID] = (JOB_CODE, 0
     243        self._newJob(
    240244        return self._attach().addCallback(check) 
    241245 
    242246    def test_attachChild_bogus(self): 
    243         self.mgr.jobs[JOB_ID] = (JOB_CODE, 0
     247        self._newJob(
    244248        return self._attach(bogus=True).addCallback(self.failUnlessEqual, None) 
    245249 
    246250    def test_attachChild_withUpdate(self): 
    247         self.mgr.jobs[JOB_ID] = (JOB_CODE, 0
     251        self._newJob(
    248252        # This must run first, on attachment 
    249253        d1 = self.mgr.update(JOB_ID, 'setup', 1) 
     
    279283        return d 
    280284 
    281     def test_allowClasses(self): 
     285    def test_registerClasses(self): 
    282286        def check(null): 
    283             self.failUnlessElementsEqual(self.root.allowedClasses, stringReps) 
     287            self.failUnlessElementsEqual( 
     288                self.root.registeredClasses, stringReps) 
    284289 
    285290        stringReps = ['foo.bar.SomeClass', 'foo.bar.AnotherClass'] 
    286291        d = self._attach() 
    287292        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
    288         d.addCallback(lambda _: self.mgr.allowClasses(*stringReps)) 
     293        d.addCallback(lambda _: self.mgr.registerClasses(*stringReps)) 
    289294        d.addCallback(check) 
    290295        return d