Changeset 91

Show
Ignore:
Timestamp:
10/04/07 18:42:02 (1 year ago)
Author:
edsuom
Message:

Fixes, improvements, and tests for asynqueue.jobs

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r90 r91  
    3030import base, workers 
    3131 
     32VERBOSE = False 
     33def log(msgProto, *args): 
     34    if VERBOSE: 
     35        print msgProto % args 
     36 
     37 
     38class TrustError(Exception): 
     39    pass 
     40 
    3241 
    3342class ChildRoot(pb.Root): 
     
    4857        return result 
    4958 
     59    def oops(self, *arg): 
     60        """ 
     61        Returns a C{False} status code for a remote call along with a string 
     62        traceback of the exception raised. You can supply your own exception or 
     63        L{Failure} instance. If you don't, the current exception will be used. 
     64        """ 
     65        if arg and isinstance(arg[0], Failure): 
     66            failureObject = arg[0] 
     67        else: 
     68            failureObject = Failure(*arg) 
     69        return False, failureObject.getTraceback() 
     70 
    5071    def remote_newJob(self, jobID, jobCode): 
    5172        """ 
     
    5778        my process exits. 
    5879 
    59         Returns C{True} if the I{jobCode} was accepted and executed OK, or 
    60         C{False} otherwise. 
     80        Returns a tuple containing a status value and the result of loading the 
     81        job. If all went well, the status is C{True} and the result is a list 
     82        of names of the callable objects in the job's namespace. If there was a 
     83        problem, the status is C{False} and the result is a string traceback of 
     84        the exception that was raised. 
    6185        """ 
    6286        if not self.trusted: 
    63             return False 
     87            return self.oops(TrustError) 
    6488        try: 
    6589            namespace = {} 
    6690            exec jobCode in namespace 
    6791        except: 
    68             return False 
     92            return self.oops() 
    6993        self.jobs[jobID] = namespace 
    70         return True 
     94        return True, [x[0] for x in namespace.iteritems() if callable(x[1])] 
    7195     
    7296    def remote_runJob(self, jobID, callName, *args, **kw): 
     
    7498        Calls the object that is present in the namespace of I{jobID} under the 
    7599        specified I{callName}, with any supplied arguments and 
    76         keywords. Returns the result of the call, which may be a deferred. 
     100        keywords. 
     101 
     102        Returns a tuple containing a status value and the result of the 
     103        call. loading the job.  If all went well, the status is C{True} and the 
     104        result is the result, of whatever type. If there was a problem, the 
     105        status is C{False} and the result is a string traceback of the 
     106        exception that was raised. 
    77107        """ 
    78108        if not self.trusted: 
    79             raise RuntimeError("Caller is not trusted at this time"
     109            return self.oops(TrustError
    80110        calledObject = self.jobs[jobID].get(callName, None) 
    81111        if callable(calledObject): 
    82             return calledObject(*args, **kw) 
    83         raise AttributeError( 
     112            d = defer.maybeDeferred(calledObject, *args, **kw) 
     113            d.addCallback(lambda x: (True, x)) 
     114            d.addErrback(self.oops) 
     115            return d 
     116        return self.oops( 
     117            AttributeError( 
    84118            "No callable object '%s' defined in namespace for job %d" \ 
    85             % (callName, jobID)) 
     119            % (callName, jobID))) 
    86120 
    87121    def remote_exit(self): 
     
    109143            kw['noTypeCheck'] = True 
    110144        workers.RemoteCallWorker.__init__(self, *args, **kw) 
    111      
     145 
    112146    def _runNow(self, null, task): 
    113147        funcName, args, kw = task.callTuple 
     
    153187        return self.queue.shutdown() 
    154188 
     189    def jobTried(self, result, jobID, worker): 
     190        """ 
     191        Callback for loading a new job or running a call on an existing one. 
     192 
     193        If the worker's root reference raised an unexpected failure, returns 
     194        C{False}. If everything went OK, returns C{True}. If there was a 
     195        failure that may not have been the worker's fault, returns C{None}. 
     196        """ 
     197        if hasattr(result, 'check'): 
     198            # Oops, failure from a bogus root reference. 
     199            log("Worker %d supplied nonconforming root reference", worker.ID) 
     200            return False 
     201        if result[0]: 
     202            msg = "Callable objects: %s" % ", ".join(result[1]) 
     203            log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg) 
     204            self.queue.qualifyWorker(worker, jobID) 
     205            return True 
     206        log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1]) 
     207        return None 
     208 
    155209    def attachChild(self, childRoot, N=3): 
    156210        """ 
     
    158212        PB root reference. 
    159213 
     214        Tries to load all of the currently registered jobs on the worker. If an 
     215        unexpected failure (not a simple job-loading exception) arises, the 
     216        worker is not hired. 
     217 
    160218        The default number (3) of job runs that the worker is willing to queue 
    161219        up on its end can be overridden with the I{N} keyword. 
    162          
    163         Returns a deferred that fires with a unique ID for the interpreter when 
    164         all of the currently registered jobs have been tried on it. 
    165         """ 
    166         def tried(success, jobID): 
    167             if success: 
    168                 self.queue.qualifyWorker(worker, jobID) 
    169          
    170         def oops(failure): 
    171             if failure.check(flavors.NoSuchMethod): 
    172                 result[0] = Failure( 
    173                     TypeError("Supplied root reference doesn't conform")) 
    174  
     220 
     221        Returns a deferred that fires with the worker's ID, or C{None} if not 
     222        hired. 
     223        """ 
     224        def jobTried(status): 
     225            if status is False: 
     226                mutable.append(None) 
     227         
     228        def allDone(null): 
     229            if len(mutable): 
     230                d = self.queue.detachWorker(worker) 
     231                d.addCallback(lambda _: None) 
     232                return d 
     233            return worker.ID 
     234 
     235        mutable = [] 
    175236        worker = ChildWorker(childRoot, N) 
    176         result = [self.queue.attachWorker(worker)] 
     237        self.queue.attachWorker(worker) 
    177238        dList = [] 
    178239        for jobID, jobInfo in self.jobs.iteritems(): 
    179240            jobCode = jobInfo[0] 
    180241            d = childRoot.callRemote('newJob', jobID, jobCode) 
    181             d.addCallback(tried, jobID
    182             d.addErrback(oops
     242            d.addBoth(self.jobTried, jobID, worker
     243            d.addCallback(jobTried
    183244            dList.append(d) 
    184         return defer.DeferredList(dList).addCallback(lambda _: result[0]
     245        return defer.DeferredList(dList).addCallback(allDone
    185246     
    186247    def detachChild(self, childID): 
     
    206267     
    207268        """ 
    208         def tried(success, worker): 
    209             if success: 
    210                 self.queue.qualifyWorker(worker, jobID) 
    211  
    212269        jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 
    213         self.jobs[jobID] = (jobCode, niceness) 
     270        self.jobs[jobID] = [jobCode, niceness] 
    214271         
    215272        dList = [] 
    216273        for worker in self.queue.workers(): 
    217274            d = worker.remoteCaller('newJob', jobID, jobCode) 
    218             d.addCallback(tried, worker) 
     275            d.addBoth(self.jobTried, jobID, worker) 
    219276            dList.append(d) 
    220277        d = defer.DeferredList(dList) 
  • projects/AsynQueue/trunk/asynqueue/tasks.py

    r55 r91  
    290290        request an initial assignment from each queue. 
    291291 
    292         @return: An integer ID uniquely identifying the worker. 
    293          
     292        The method generates an integer ID uniquely identifying the worker, and 
     293        gives the worker an C{ID} attribute with the ID for its own reference, 
     294        The ID is returned as well. 
    294295        """ 
    295296        if not IWorker.providedBy(worker): 
     
    305306        for series in qualifications: 
    306307            self.assignmentFactory.request(worker, series) 
    307         workerID = getattr(self, '_workerCounter', 0) + 1 
     308        workerID = worker.ID = getattr(self, '_workerCounter', 0) + 1 
    308309        self._workerCounter = workerID 
    309310        self.workers[workerID] = worker 
  • projects/AsynQueue/trunk/asynqueue/test/test_base.py

    r90 r91  
    3535class Test_Priority(TestCase): 
    3636    def setUp(self): 
    37         self.heap = Priority() 
     37        self.heap = base.Priority() 
    3838 
     39    def test_getInOrder(self): 
     40        dList = [] 
     41        for item in (2,1,4,0,3): 
     42            self.heap.put(item) 
     43        for item in xrange(5): 
     44            d = self.heap.get() 
     45            d.addCallback(self.failUnlessEqual, item) 
     46            dList.append(d) 
     47        return defer.DeferredList(dList) 
     48 
     49    def test_getBeforePut(self): 
     50        dList, items = [], [] 
     51        for item in xrange(5): 
     52            self.heap.put(item) 
     53        for item in xrange(5): 
     54            d = self.heap.get() 
     55            d.addCallback(items.append) 
     56            dList.append(d) 
     57        return defer.DeferredList(dList).addCallback( 
     58            lambda _: self.failUnlessEqual(sum(items), 10)) 
     59     
    3960    def test_cancel(self): 
    40         self.fail("Need to test this!") 
     61        dList, items = [], [] 
     62        for item in xrange(5): 
     63            self.heap.put(item) 
     64        self.heap.cancel(lambda x: x is 2) 
     65        for item in xrange(4): 
     66            d = self.heap.get() 
     67            d.addCallback(items.append) 
     68            dList.append(d) 
     69        return defer.DeferredList(dList).addCallback( 
     70            lambda _: self.failUnlessEqual(items, [0,1,3,4])) 
    4171 
    4272 
  • projects/AsynQueue/trunk/asynqueue/test/test_jobs.py

    r79 r91  
    3333def test(a, b, c=0): 
    3434    return a + 2*b + 3*c 
     35 
     36def bogusable(x): 
     37    return 1.0 / x 
     38 
    3539""" 
     40 
     41 
     42class Test_ChildRoot(mock.TestCase): 
     43    def setUp(self): 
     44        self.root = jobs.ChildRoot() 
     45        self.root.trusted = True 
     46 
     47    def test_newJob_OK(self): 
     48        result = self.root.remote_newJob(JOB_ID, JOB_CODE) 
     49        self.failUnlessEqual(result[0], True) 
     50        self.failUnlessElementsEqual(result[1], ['test', 'bogusable']) 
     51     
     52    def test_newJob_bogus(self): 
     53        bogusJobCode = JOB_CODE + "\nbogus\n" 
     54        result = self.root.remote_newJob(JOB_ID, bogusJobCode) 
     55        self.failUnlessEqual(result[0], False) 
     56        self.failUnless('bogus' in result[1]) 
     57 
     58    def test_runJob_OK(self): 
     59        def check(result): 
     60            self.failUnlessEqual(result[0], True) 
     61            self.failUnlessEqual(result[1], 600) 
     62 
     63        self.root.remote_newJob(JOB_ID, JOB_CODE) 
     64        d = self.root.remote_runJob(JOB_ID, 'test', 100, 100, 100) 
     65        d.addCallback(check) 
     66        return d 
     67 
     68    def test_runJob_bogus(self): 
     69        def check(result): 
     70            self.failUnlessEqual(result[0], False) 
     71            self.failUnless('ZeroDivisionError' in result[1]) 
     72 
     73        self.root.remote_newJob(JOB_ID, JOB_CODE) 
     74        d = self.root.remote_runJob(JOB_ID, 'bogusable', 0) 
     75        d.addCallback(check) 
     76        return d 
    3677 
    3778 
     
    5192            exec jobCode in namespace 
    5293            self.jobs[jobID] = namespace 
    53             result = True 
     94            result = ( 
     95                True, [x[0] for x in namespace.iteritems() if callable(x[1])]) 
    5496        elif not self.bogus and called == 'runJob': 
    5597            namespace = self.jobs[args[0]] 
    5698            calledObject = namespace[args[1]] 
    57             result = calledObject(*args[2:], **kw) 
     99            try: 
     100                result = (True, calledObject(*args[2:], **kw)) 
     101            except: 
     102                result = (False, failure.Failure().getTraceback()) 
    58103        elif called == 'exit': 
    59104            result = None 
     
    83128    def setUp(self): 
    84129        self.root = Mock_Root() 
     130        self.root.clearCalls() 
    85131        self.worker = jobs.ChildWorker(self.root, noTypeCheck=True) 
    86132        return self.root.callRemote('newJob', JOB_ID, JOB_CODE) 
     
    89135        return self.worker.stop() 
    90136 
    91     def test_SingleTask(self): 
     137    def test_SingleTask_OK(self): 
    92138        def checkResult(result): 
    93             self.failUnlessEqual(result, 1+2*2+3*3
     139            self.failUnlessEqual(result, (True, 1+2*2+3*3)
    94140            self.nextCall(self.root.calls) 
    95141            self.nextCall('Root.callRemote') 
     
    103149        return task.d 
    104150 
     151    def test_SingleTask_Fail(self): 
     152        def checkResult(result): 
     153            self.failUnlessEqual(result[0], False) 
     154            self.failUnless('ZeroDivisionError' in result[1]) 
     155 
     156        task = Mock_Task('bogusable', (0,), {}, 0, JOB_ID) 
     157        self.worker.run(task) 
     158        task.d.addCallback(checkResult) 
     159        return task.d 
     160 
    105161 
    106162class Test_JobManager(mock.TestCase): 
     
    116172        def attached(childID): 
    117173            self.childID = childID 
     174            return childID 
    118175         
    119176        self.root = Mock_Root(bogus) 
     
    129186 
    130187    def test_attachChild_bogus(self): 
    131         def check(result): 
    132             self.failUnless(isinstance(result, failure.Failure)) 
    133  
    134188        self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 
    135         return self._attach(bogus=True).addBoth(check
     189        return self._attach(bogus=True).addCallback(self.failUnlessEqual, None
    136190 
    137191    def test_new(self): 
     
    148202    def test_run_one(self): 
    149203        def check(result): 
    150             self.failUnlessEqual(result, 50
     204            self.failUnlessEqual(result, (True, 50)
    151205         
    152206        d = self._attach() 
     
    167221            yield wfd 
    168222            results.append(wfd.getResult()) 
    169         self.failUnlessEqual(results, range(10)) 
     223        self.failUnlessEqual([x[1] for x in results], range(10)) 
    170224 
    171225    @defer.deferredGenerator 
     
    182236            dList.append(d) 
    183237        yield defer.waitForDeferred(defer.DeferredList(dList)) 
    184         self.failUnlessEqual(results, range(10)) 
     238        self.failUnlessEqual([x[1] for x in results], range(10)) 
  • projects/AsynQueue/trunk/asynqueue/test/test_processworker.py

    r79 r91  
    4545            self.mgr.children = {1:process} 
    4646            d = root.callRemote('newJob', JOB_ID, JOB_CODE) 
    47             d.addCallback(self.failUnlessEqual, True
     47            d.addCallback(self.failUnlessEqual, (True, ['test'])
    4848            return d 
    4949         
  • projects/AsynQueue/trunk/asynqueue/test/test_tasks.py

    r55 r91  
    172172                 if isinstance(x, defer.Deferred)], [True]) 
    173173 
     174    def testHireSetWorkerID(self): 
     175        worker = MockWorker() 
     176        workerID = self.mgr.hire(worker) 
     177        self.failUnlessEqual(getattr(worker, 'ID', None), workerID) 
     178 
    174179    def testHireClassQualifications(self): 
    175180        class CQWorker(MockWorker):