Changeset 112

Show
Ignore:
Timestamp:
11/26/07 16:30:56 (1 year ago)
Author:
edsuom
Message:

Task reassignment after disconnections is working and tested, and a timeout feature has been added to allow any stray tasks that got lost in space after a disconnect to be retried

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/branches/debug_reassign/asynqueue/base.py

    r90 r112  
    189189                    break 
    190190                wfd = defer.waitForDeferred(self.mgr.assignment(task)) 
    191                 yield wfd 
     191                yield wfd; wfd.getResult() 
    192192            # Clean up after the loop exits 
    193193            wfd = defer.waitForDeferred(self.mgr.shutdown(self.timeout)) 
     
    195195            self.heap.shutdown() 
    196196            # The result of the runner is a list of any unfinished tasks. 
    197             yield wfd.getResult() 
     197            result = [] 
     198            try: 
     199                result = wfd.getResult() 
     200            except: 
     201                pass 
     202            yield result 
    198203         
    199204        if self.isRunning(): 
     
    248253            if worker == workerOrID: 
    249254                return thisID 
    250         raise ValueError("No such worker") 
    251255     
    252256    def detachWorker(self, workerOrID, reassign=False, crash=False): 
     
    261265        See L{tasks.WorkerManager.terminate}. 
    262266        """ 
    263         def terminated(unfinishedTasks): 
    264             for task in unfinishedTasks: 
    265                 self.mgr.assignment(task) 
    266  
    267267        ID = self._getWorkerID(workerOrID) 
     268        if ID is None: 
     269            return 
    268270        if crash: 
    269             d = self.mgr.terminate(ID, crash=True
     271            d = self.mgr.terminate(ID, crash=True, reassign=reassign
    270272        else: 
    271             d = self.mgr.terminate(ID, self.timeout) 
    272         if reassign: 
    273             d.addCallback(terminated) 
     273            d = self.mgr.terminate(ID, self.timeout, reassign=reassign) 
    274274        return d 
    275275 
  • projects/AsynQueue/branches/debug_reassign/asynqueue/jobs.py

    r110 r112  
    3737 
    3838 
     39from debug import dTrace 
     40 
     41 
    3942class TrustError(Exception): 
    4043    pass 
     
    136139            % (callName, jobID))) 
    137140 
    138     def remote_exit(self): 
     141    def remote_exit(self, stopReactor=False): 
    139142        """ 
    140143        Terminates my child worker process, calling and waiting for any 
     
    147150            if callable(possibleShutdownFunction): 
    148151                dList.append(defer.maybeDeferred(possibleShutdownFunction)) 
    149         return defer.DeferredList(dList).addCallback(lambda _: reactor.stop()) 
     152        d = defer.DeferredList(dList) 
     153        if stopReactor: 
     154            d.addCallback(lambda _: reactor.stop()) 
     155        return d 
    150156 
    151157 
     
    172178        d = workers.RemoteCallWorker.stop(self) 
    173179        d.addBoth(lambda _: self.remoteCaller('exit')) 
     180        d.addErrback(lambda _: None) 
    174181        return d 
    175182 
     
    214221        Shuts down my task queue, returning a deferred that fires when the 
    215222        queue has emptied and all interpreter workers have finished and been 
    216         terminated. 
    217         """ 
    218         return self.queue.shutdown() 
     223        terminated. The task queue shutdown takes care of shutting down 
     224        everything else, including any attached workers. 
     225        """ 
     226        return self.queue.shutdown().addCallback(lambda _: dTrace()) 
    219227 
    220228    def jobTried(self, result, jobID, worker): 
     
    306314        """ 
    307315        jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 
    308         print "NEW", jobID 
    309316        self.callsPending[jobID] = {} 
    310317        self.jobs[jobID] = [jobCode, niceness] 
     
    384391        at approximately half the rate of calls for a job with niceness N. 
    385392 
     393        All keywords except for the following are passed to the call: 
     394 
     395          - B{timeout}: A timeout interval in seconds after which the call will 
     396            be retried. 
     397           
    386398        @note: The task object generated contains the name of a callable (as a 
    387399          string) for the first element of its I{callTuple} attribute, instead 
     
    392404 
    393405        """ 
    394         def jobRan(result, d): 
     406        def queueJob(doNext=False): 
     407            if doNext: 
     408                kw['doNext'] = True 
     409            dq = self.queue.call(callName, *args, **kw) 
     410            print "QJ", timeout 
     411            if timeout: 
     412                dq.setTimeout( 
     413                    timeout, 
     414                    timeoutFunc=lambda d: jobRan((False, "Timeout"))) 
     415            dq.addErrback(jobFailed) 
     416            dq.addCallback(jobRan) 
     417         
     418        def jobRan(result): 
    395419            status, result = result 
    396             retryCount, callName, args, kw = self.callsPending[jobID].pop(d) 
    397420            if status: 
    398                 return result 
    399             log("Error running job %d:\n%s", jobID, result) 
    400             if retryCount < self.maxRetries: 
    401                 kw['retryCount'] = retryCount + 1 
    402                 return self.run(jobID, callName, *args, **kw) 
    403  
    404         def jobFailed(failure, d): 
    405             # TODO: Do something different here, as it's a more drastic 
    406             # situation 
    407             self.callsPending[jobID].pop(d) 
    408             log("Unexpected error running job %d:\n%s", 
    409                 jobID, failure.getTraceback()) 
    410          
     421                del self.callsPending[jobID][d] 
     422                d.callback(result) 
     423            else: 
     424                log("Error running job %d:\n%s", jobID, result) 
     425                tryAgain() 
     426 
     427        def jobFailed(failure): 
     428            return False, failure.getTraceback() 
     429 
     430        def tryAgain(): 
     431            if jobID in self.callsPending: 
     432                retryCount, callName, args, kw = self.callsPending[jobID][d] 
     433                print "TA", retryCount, callName 
     434                if True or retryCount < self.maxRetries: 
     435                    self.callsPending[jobID][d][0] = retryCount + 1 
     436                    queueJob(True) 
     437                    return 
     438            d.callback(None) 
     439 
    411440        jobID = int(jobID) 
    412441        if jobID not in self.jobs: 
    413442            raise ValueError("No job '%s' registered" % jobID) 
     443        timeout = kw.pop('timeout', None) 
    414444        kw['series'] = jobID 
    415445        kw['niceness'] = self.jobs[jobID][1] 
    416         retryCount = kw.pop('retryCount', 0) 
    417         d = self.queue.call(callName, *args, **kw) 
    418         d.addCallback(jobRan, d) 
    419         d.addErrback(jobFailed, d) 
    420         self.callsPending[jobID][d] = retryCount, callName, args, kw 
     446        d = defer.Deferred() 
     447        self.callsPending[jobID][d] = [0, callName, args, kw] 
     448        queueJob() 
    421449        return d 
    422450 
     
    430458        self.updates.pop(jobID, None) 
    431459        self.callsPending.pop(jobID, None) 
    432          
    433          
  • projects/AsynQueue/branches/debug_reassign/asynqueue/tasks.py

    r91 r112  
    2828from errors import ImplementationError 
    2929 
     30import debug 
    3031 
    3132class Task(object): 
    3233    """ 
    3334    I represent a task that has been dispatched to a queue for running with a 
    34     given scheduling I{niceness}. I generate a C{Deferred}, accessible as an 
    35     attribute I{d}, firing it when the task is finally run and its result is 
    36     obtained. 
     35    given scheduling I{niceness}. 
     36 
     37    I generate a C{Deferred} that you fire by calling either my L{callback} or 
     38    L{errback} with a result or failure, respectively, when the the task is 
     39    finally run and its result is obtained. You can call the deferred's 
     40    versions of those methods directly, but my versions deal with things like 
     41    repeated callbacks, which happen sometimes with task timeouts. 
    3742     
    3843    @ivar d: A deferred to the eventual result of the task. 
     
    5257        self.d = defer.Deferred() 
    5358 
     59    def called(self): 
     60        return self.d.called 
     61 
     62    def callback(self, result): 
     63        if not self.d.called: 
     64            self.d.callback(result) 
     65 
     66    def errback(self, result): 
     67        self.d.errback(result) 
     68     
    5469    def __repr__(self): 
    5570        """ 
     
    309324        self._workerCounter = workerID 
    310325        self.workers[workerID] = worker 
     326        worker.setResignator( 
     327            lambda : self.terminate(worker.ID, crash=True, reassign=True)) 
    311328        return workerID 
    312329     
    313     def terminate(self, workerID, timeout=None, crash=False): 
     330    def terminate(self, workerID, timeout=None, crash=False, reassign=False): 
    314331        """ 
    315332        Removes a worker from my work force, canceling all of its unfullfilled 
     
    324341 
    325342        @return: A deferred that fires when the worker has been removed, 
    326           gracefully or not, with a list of the worker's unfinished tasks. 
     343          gracefully or not, with a list of any tasks left unfinished and not 
     344          reassigned. 
    327345         
    328346        """ 
     
    338356                return [] 
    339357            return result 
     358 
     359        def reassignTasks(tasks): 
     360            for task in tasks: 
     361                debug.write("RT: %d", id(task.d)) 
     362                self.assignmentFactory.new(task) 
     363            return [] 
    340364         
    341365        worker = self.workers.pop(workerID, None) 
     
    345369        self.assignmentFactory.cancelRequests(worker) 
    346370        if crash: 
    347             return defer.succeed(worker.crash()) 
    348         d = worker.stop() 
    349         if timeout: 
    350             callID = reactor.callLater(timeout, crashTheWorker, worker, d) 
    351             d.addCallback(stopped) 
     371            d = defer.succeed(worker.crash()) 
    352372        else: 
    353             # No tasks left unfinished if deferred fires without timeout 
    354             d.addCallback(lambda _: []) 
     373            d = worker.stop() 
     374            if timeout: 
     375                callID = reactor.callLater(timeout, crashTheWorker, worker, d) 
     376                d.addCallback(stopped) 
     377            else: 
     378                # No tasks left unfinished if deferred fires without timeout 
     379                d.addCallback(lambda _: []) 
     380        if reassign: 
     381            d.addCallback(reassignTasks) 
    355382        return d 
    356383     
  • projects/AsynQueue/branches/debug_reassign/asynqueue/test/mock.py

    r79 r112  
    2626from twisted.internet import defer, reactor 
    2727from twisted.trial import unittest 
     28 
     29 
     30DELAY = 0.05 
     31 
     32def deferToLater(*args, **kw): 
     33    delay = kw.pop('delay', DELAY) 
     34    if args: 
     35        arg = args[0] 
     36    else: 
     37        arg = None 
     38    d = defer.Deferred() 
     39    if callable(arg): 
     40        d.addCallback(lambda _: arg(*args[1:], **kw)) 
     41        reactor.callLater(delay, d.callback, None) 
     42    else: 
     43        reactor.callLater(delay, d.callback, arg) 
     44    return d 
     45 
     46def fireLater(d, value=None, delay=None): 
     47    """ 
     48    Fires the supplied deferred I{d} with I{value}, or C{None} if no value 
     49    supplied, after the standard or specified I{delay}. 
     50    """ 
     51    if delay is None: 
     52        delay = DELAY 
     53    reactor.callLater(delay, d.callback, value) 
    2854 
    2955 
     
    122148 
    123149    def deferToLater(self, *args, **kw): 
    124         delay = kw.pop('delay', self.delay) 
    125         if args: 
    126             arg = args[0] 
    127         else: 
    128             arg = None 
    129         d = defer.Deferred() 
    130         if callable(arg): 
    131             d.addCallback(lambda _: arg(*args[1:], **kw)) 
    132             reactor.callLater(delay, d.callback, None) 
    133         else: 
    134             reactor.callLater(delay, d.callback, arg) 
    135         return d 
    136  
    137     def fireLater(self, d, value=None, delay=None): 
    138         """ 
    139         Fires the supplied deferred I{d} with I{value}, or C{None} if no value 
    140         supplied, after the standard or specified I{delay}. 
    141         """ 
    142         if delay is None: 
    143             delay = self.delay 
    144         reactor.callLater(delay, d.callback, value) 
     150        kw['delay'] = self.delay 
     151        return deferToLater(*args, **kw) 
     152     
     153    def fireLater(self, *args, **kw): 
     154        kw['delay'] = self.delay 
     155        return fireLater(*args, **kw) 
    145156 
    146157 
  • projects/AsynQueue/branches/debug_reassign/asynqueue/test/test_jobs.py

    r110 r112  
    2828import mock, jobs 
    2929 
     30#import twisted.internet.base 
     31#twisted.internet.base.DelayedCall.debug = True 
     32 
    3033JOB_ID = 1 
    3134 
    3235JOB_CODE = """ 
    3336G = [] 
     37TRIES = [] 
    3438 
    3539def setup(x): 
     
    4549def bogusable(x): 
    4650    return 1.0 / x 
     51 
     52def failFirstTime(): 
     53    TRIES.append(None) 
     54    if len(TRIES) == 1: 
     55        raise Exception 
     56    return len(TRIES) 
    4757 
    4858""" 
     
    99109        self.failUnlessEqual(result[0], True) 
    100110        self.failUnlessElementsEqual( 
    101             result[1], ['setup', 'total', 'test', 'bogusable']) 
     111            result[1], 
     112            ['setup', 'total', 'test', 'bogusable', 'failFirstTime']) 
    102113     
    103114    def test_newJob_bogus(self): 
     
    131142    def __init__(self, bogus=False): 
    132143        self.bogus = bogus 
     144        self.jobs = {} 
    133145        self.callbacks = [] 
    134         self.jobs = {} 
    135146        self.registeredClasses = [] 
    136147 
     
    215226 
    216227 
    217 class Test_JobManager(mock.TestCase): 
     228class Test_JobManager_Basics(mock.TestCase): 
    218229    def setUp(self): 
    219230        jobs.ChildWorker._noTypeCheck = True 
     
    248259        return self._attach(bogus=True).addCallback(self.failUnlessEqual, None) 
    249260 
     261    def test_registerClasses(self): 
     262        def check(null): 
     263            self.failUnlessElementsEqual( 
     264                self.root.registeredClasses, stringReps) 
     265 
     266        stringReps = ['foo.bar.SomeClass', 'foo.bar.AnotherClass'] 
     267        d = self._attach() 
     268        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
     269        d.addCallback(lambda _: self.mgr.registerClasses(*stringReps)) 
     270        d.addCallback(check) 
     271        return d 
     272 
     273 
     274class JobManagerBC(mock.TestCase): 
     275    class TestableChildRoot(jobs.ChildRoot): 
     276        trusted = True 
     277     
     278    def setUp(self): 
     279        self.servers = [] 
     280        self.mgr = jobs.JobManager() 
     281        return self.getReferenceToRoot() 
     282     
     283    def getReferenceToRoot(self): 
     284        def got(ref): 
     285            self.ref = ref 
     286            return ref 
     287         
     288        root = self.TestableChildRoot() 
     289        server = reactor.listenTCP(0, pb.PBServerFactory(root)) 
     290        self.servers.append(server) 
     291        clientFactory = pb.PBClientFactory() 
     292        reactor.connectTCP( 
     293            "127.0.0.1", server.getHost().port, clientFactory) 
     294        return clientFactory.getRootObject().addCallback(got) 
     295     
     296    def tearDown(self): 
     297        def closeConnection(null): 
     298            self.ref.broker.transport.loseConnection() 
     299            for server in self.servers: 
     300                server.stopListening() 
     301         
     302        return self.mgr.shutdown().addCallback(closeConnection) 
     303     
     304    def _newJob(self): 
     305        self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 
     306        self.mgr.callsPending[JOB_ID] = {} 
     307 
     308    def _attach(self, ref): 
     309        def attached(childID): 
     310            self.childID = childID 
     311            return childID 
     312         
     313        return self.mgr.attachChild(ref, N=1).addCallback(attached) 
     314 
     315 
     316class Test_JobManager_Admin(JobManagerBC): 
    250317    def test_attachChild_withUpdate(self): 
    251318        self._newJob() 
     
    253320        d1 = self.mgr.update(JOB_ID, 'setup', 1) 
    254321        # The actual attachment event chain 
    255         d2 = self._attach(
     322        d2 = self._attach(self.ref
    256323        d2.addCallback(self.mgr.run, 'setup', 2) 
    257324        d2.addCallback(self.failUnlessEqual, [1, 2]) 
     
    260327 
    261328    def test_new(self): 
     329        def attached(null): 
     330            self.failUnlessEqual(getattr(self, 'childID', None), 1) 
     331            return self.mgr.new(JOB_CODE).addCallback(check) 
     332         
    262333        def check(jobID): 
    263334            self.failUnlessEqual(jobID, 1) 
     
    265336            self.failUnlessEqual(worker.iQualified, [1]) 
    266337         
    267         d = self._attach() 
    268         d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
    269         d.addCallback(check) 
    270         return d 
     338        return self._attach(self.ref).addCallback(attached) 
    271339 
    272340    def test_run_updates(self): 
     
    278346            return d 
    279347         
    280         d = self._attach(
     348        d = self._attach(self.ref
    281349        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
    282350        d.addCallback(gotJobID) 
    283351        return d 
    284352 
    285     def test_registerClasses(self): 
    286         def check(null): 
    287             self.failUnlessElementsEqual( 
    288                 self.root.registeredClasses, stringReps) 
    289  
    290         stringReps = ['foo.bar.SomeClass', 'foo.bar.AnotherClass'] 
    291         d = self._attach() 
    292         d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
    293         d.addCallback(lambda _: self.mgr.registerClasses(*stringReps)) 
    294         d.addCallback(check) 
    295         return d 
    296  
     353    def test_disconnect_cancel(self): 
     354        def gotJobID(jobID): 
     355            self.ref.broker.transport.loseConnection() 
     356            d = self.mgr.run(jobID, 'test', 0, 0).addCallback(runOver) 
     357            self.mgr.cancel(jobID) 
     358            return d 
     359 
     360        def runOver(result): 
     361            print "Job Canceled" 
     362         
     363        d = self._attach(self.ref) 
     364        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
     365        d.addCallback(gotJobID) 
     366        return d 
     367         
     368    def test_disconnect_reassign(self): 
     369        def gotJobID(jobID): 
     370            self.ref.broker.transport.loseConnection() 
     371            return mock.deferToLater(delay=1.0).addCallback(delayDone, jobID) 
     372 
     373        def delayDone(null, jobID): 
     374            d = self.mgr.run(jobID, 'test', 1, 2) 
     375            d.addCallback(self.failUnlessEqual, 5) 
     376            self.getReferenceToRoot().addCallback(self._attach) 
     377            return d 
     378         
     379        d = self._attach(self.ref) 
     380        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
     381        d.addCallback(gotJobID) 
     382        return d 
     383 
     384 
     385 
     386class Test_JobManager_Run(JobManagerBC): 
    297387    def test_run_one(self): 
    298         d = self._attach(
     388        d = self._attach(self.ref
    299389        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
    300390        d.addCallback(self.mgr.run, 'test', 10, 20) 
     
    305395    def test_run_several_sequentially(self): 
    306396        results = [] 
    307         yield defer.waitForDeferred(self._attach()) 
     397        yield defer.waitForDeferred(self._attach(self.ref)) 
    308398        wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 
    309399        yield wfd 
     
    318408    def test_run_several_queued(self): 
    319409        results = [] 
    320         yield defer.waitForDeferred(self._attach()) 
     410        yield defer.waitForDeferred(self._attach(self.ref)) 
    321411        wfd = defer.waitForDeferred(self.mgr.new(JOB_CODE)) 
    322412        yield wfd 
     
    329419        yield defer.waitForDeferred(defer.DeferredList(dList)) 
    330420        self.failUnlessEqual(results, range(10)) 
     421 
     422    def test_retry_after_failure(self): 
     423        d = self._attach(self.ref) 
     424        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
     425        d.addCallback(self.mgr.run, 'failFirstTime') 
     426        d.addCallback(self.failUnlessEqual, 2) 
     427        return d 
     428         
  • projects/AsynQueue/branches/debug_reassign/asynqueue/test/util.py

    r2 r112  
    6565        self.iQualified = [] 
    6666 
     67    def setResignator(self, callableObject): 
     68        pass 
     69 
    6770    def run(self, task): 
    6871        def ran(result, d): 
  • projects/AsynQueue/branches/debug_reassign/asynqueue/workers.py

    r95 r112  
    2929import errors 
    3030 
     31import debug 
    3132 
    3233class IWorker(Interface): 
     
    149150                # call will not reach this point. 
    150151            except Exception, e: 
    151                 reactor.callFromThread(task.d.errback, failure.Failure(e)) 
     152                reactor.callFromThread(task.errback, failure.Failure(e)) 
    152153            else: 
    153                 reactor.callFromThread(task.d.callback, result) 
     154                reactor.callFromThread(task.callback, result) 
    154155        # Broken out of loop, ready for the thread to end 
    155156        reactor.callFromThread(self.d.callback, None) 
     
    203204        thread. 
    204205        """ 
    205         if self.task is not None and not self.task.d.called: 
     206        if self.task is not None and not self.task.called: 
    206207            result = [self.task] 
    207208        else: 
     
    259260        job = (task, d) 
    260261        self.jobs.append(job) 
    261         d.addCallback(self.doneTrying, job) 
    262         d.addErrback(self.oops) 
     262        d.addBoth(self.doneTrying, job) 
    263263        # The task's deferred is NOT returned! 
    264264 
    265     def oops(self, failure): 
    266         if failure.check(*self.disconnectErrors): 
    267             self.resign() 
    268         else: 
    269             return failure 
    270      
    271265    def doneTrying(self, result, job): 
     266        if hasattr(result, 'getTraceback'): 
     267            print "OOPS", result.getTraceback() 
     268            if result.check(*self.disconnectErrors): 
     269                # This was a disconnect error, so bail out now; don't remove 
     270                # the job or signal the run request queue that the job is done. 
     271                return 
     272        print "DT" 
    272273        self.jobs.remove(job) 
    273274        self.runRequestQueue.put(None) 
    274275        task = job[0] 
    275         task.d.callback(result) 
     276        task.callback(result) 
    276277     
    277278    def resign(self, *null): 
     
    296297        running and I can accept another one. 
    297298        """ 
    298         if getattr(self, 'isShuttingDown', False): 
    299             raise errors.QueueRunError 
     299        debug.write("RUN: %d", id(task.d)) 
     300        #if getattr(self, 'isShuttingDown', False): 
     301        #    raise errors.QueueRunError 
    300302        return self.runRequestQueue.get().addCallback(self.runNow, task) 
    301303     
  • projects/AsynQueue/trunk/asynqueue/base.py

    r90 r112  
    189189                    break 
    190190                wfd = defer.waitForDeferred(self.mgr.assignment(task)) 
    191                 yield wfd 
     191                yield wfd; wfd.getResult() 
    192192            # Clean up after the loop exits 
    193193            wfd = defer.waitForDeferred(self.mgr.shutdown(self.timeout)) 
     
    195195            self.heap.shutdown() 
    196196            # The result of the runner is a list of any unfinished tasks. 
    197             yield wfd.getResult() 
     197            result = [] 
     198            try: 
     199                result = wfd.getResult() 
     200            except: 
     201                pass 
     202            yield result 
    198203         
    199204        if self.isRunning(): 
     
    248253            if worker == workerOrID: 
    249254                return thisID 
    250         raise ValueError("No such worker") 
    251255     
    252256    def detachWorker(self, workerOrID, reassign=False, crash=False): 
     
    261265        See L{tasks.WorkerManager.terminate}. 
    262266        """ 
    263         def terminated(unfinishedTasks): 
    264             for task in unfinishedTasks: 
    265                 self.mgr.assignment(task) 
    266  
    267267        ID = self._getWorkerID(workerOrID) 
     268        if ID is None: 
     269            return 
    268270        if crash: 
    269             d = self.mgr.terminate(ID, crash=True
     271            d = self.mgr.terminate(ID, crash=True, reassign=reassign
    270272        else: 
    271             d = self.mgr.terminate(ID, self.timeout) 
    272         if reassign: 
    273             d.addCallback(terminated) 
     273            d = self.mgr.terminate(ID, self.timeout, reassign=reassign) 
    274274        return d 
    275275 
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r110 r112  
    136136            % (callName, jobID))) 
    137137 
    138     def remote_exit(self): 
     138    def remote_exit(self, stopReactor=False): 
    139139        """ 
    140140        Terminates my child worker process, calling and waiting for any 
     
    147147            if callable(possibleShutdownFunction): 
    148148                dList.append(defer.maybeDeferred(possibleShutdownFunction)) 
    149         return defer.DeferredList(dList).addCallback(lambda _: reactor.stop()) 
     149        d = defer.DeferredList(dList) 
     150        if stopReactor: 
     151            d.addCallback(lambda _: reactor.stop()) 
     152        return d 
    150153 
    151154 
     
    172175        d = workers.RemoteCallWorker.stop(self) 
    173176        d.addBoth(lambda _: self.remoteCaller('exit')) 
     177        d.addErrback(lambda _: None) 
    174178        return d 
    175179 
     
    214218        Shuts down my task queue, returning a deferred that fires when the 
    215219        queue has emptied and all interpreter workers have finished and been 
    216         terminated. 
     220        terminated. The task queue shutdown takes care of shutting down 
     221        everything else, including any attached workers. 
    217222        """ 
    218223        return self.queue.shutdown() 
     
    306311        """ 
    307312        jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1 
    308         print "NEW", jobID 
    309313        self.callsPending[jobID] = {} 
    310314        self.jobs[jobID] = [jobCode, niceness] 
     
    384388        at approximately half the rate of calls for a job with niceness N. 
    385389 
     390        All keywords except for the following are passed to the call: 
     391 
     392          - B{timeout}: A timeout interval in seconds after which the call will 
     393            be retried. 
     394           
    386395        @note: The task object generated contains the name of a callable (as a 
    387396          string) for the first element of its I{callTuple} attribute, instead 
     
    392401 
    393402        """ 
    394         def jobRan(result, d): 
     403        def queueJob(doNext=False): 
     404            if doNext: 
     405                kw['doNext'] = True 
     406            dq = self.queue.call(callName, *args, **kw) 
     407            if timeout: 
     408                dq.setTimeout( 
     409                    timeout, 
     410                    timeoutFunc=lambda d: jobRan((False, "Timeout"))) 
     411            dq.addErrback(jobFailed) 
     412            dq.addCallback(jobRan) 
     413         
     414        def jobRan(result): 
    395415            status, result = result 
    396             retryCount, callName, args, kw = self.callsPending[jobID].pop(d) 
    397416            if status: 
    398                 return result 
    399             log("Error running job %d:\n%s", jobID, result) 
    400             if retryCount < self.maxRetries: 
    401                 kw['retryCount'] = retryCount + 1 
    402                 return self.run(jobID, callName, *args, **kw) 
    403  
    404         def jobFailed(failure, d): 
    405             # TODO: Do something different here, as it's a more drastic 
    406             # situation 
    407             self.callsPending[jobID].pop(d) 
    408             log("Unexpected error running job %d:\n%s", 
    409                 jobID, failure.getTraceback()) 
    410          
     417                del self.callsPending[jobID][d] 
     418                d.callback(result) 
     419            else: 
     420                log("Error running job %d:\n%s", jobID, result) 
     421                tryAgain() 
     422 
     423        def jobFailed(failure): 
     424            return False, failure.getTraceback() 
     425 
     426        def tryAgain(): 
     427            if jobID in self.callsPending: 
     428                retryCount, callName, args, kw = self.callsPending[jobID][d] 
     429                if retryCount < self.maxRetries: 
     430                    self.callsPending[jobID][d][0] = retryCount + 1 
     431                    queueJob(True) 
     432                    return 
     433            d.callback(None) 
     434 
    411435        jobID = int(jobID) 
    412436        if jobID not in self.jobs: 
    413437            raise ValueError("No job '%s' registered" % jobID) 
     438        timeout = kw.pop('timeout', None) 
    414439        kw['series'] = jobID 
    415440        kw['niceness'] = self.jobs[jobID][1] 
    416         retryCount = kw.pop('retryCount', 0) 
    417         d = self.queue.call(callName, *args, **kw) 
    418         d.addCallback(jobRan, d) 
    419         d.addErrback(jobFailed, d) 
    420         self.callsPending[jobID][d] = retryCount, callName, args, kw 
     441        d = defer.Deferred() 
     442        self.callsPending[jobID][d] = [0, callName, args, kw] 
     443        queueJob() 
    421444        return d 
    422445 
     
    430453        self.updates.pop(jobID, None) 
    431454        self.callsPending.pop(jobID, None) 
    432          
    433          
  • projects/AsynQueue/trunk/asynqueue/tasks.py

    r91 r112  
    3232    """ 
    3333    I represent a task that has been dispatched to a queue for running with a 
    34     given scheduling I{niceness}. I generate a C{Deferred}, accessible as an 
    35     attribute I{d}, firing it when the task is finally run and its result is 
    36     obtained. 
     34    given scheduling I{niceness}. 
     35 
     36    I generate a C{Deferred} that you fire by calling either my L{callback} or 
     37    L{errback} with a result or failure, respectively, when the the task is 
     38    finally run and its result is obtained. You can call the deferred's 
     39    versions of those methods directly, but my versions deal with things like 
     40    repeated callbacks, which happen sometimes with task timeouts. 
    3741     
    3842    @ivar d: A deferred to the eventual result of the task. 
     
    5256        self.d = defer.Deferred() 
    5357 
     58    def callback(self, result): 
     59        if not self.d.called: 
     60            self.d.callback(result) 
     61 
     62    def errback(self, result): 
     63        self.d.errback(result) 
     64     
    5465    def __repr__(self): 
    5566        """ 
     
    309320        self._workerCounter = workerID 
    310321        self.workers[workerID] = worker 
     322        worker.setResignator( 
     323            lambda : self.terminate(worker.ID, crash=True, reassign=True)) 
    311324        return workerID 
    312325     
    313     def terminate(self, workerID, timeout=None, crash=False): 
     326    def terminate(self, workerID, timeout=None, crash=False, reassign=False): 
    314327        """ 
    315328        Removes a worker from my work force, canceling all of its unfullfilled 
     
    324337 
    325338        @return: A deferred that fires when the worker has been removed, 
    326           gracefully or not, with a list of the worker's unfinished tasks. 
     339          gracefully or not, with a list of any tasks left unfinished and not 
     340          reassigned. 
    327341         
    328342        """ 
     
    338352                return [] 
    339353            return result 
     354 
     355        def reassignTasks(tasks): 
     356            for task in tasks: 
     357                self.assignmentFactory.new(task) 
     358            return [] 
    340359         
    341360        worker = self.workers.pop(workerID, None) 
     
    345364        self.assignmentFactory.cancelRequests(worker) 
    346365        if crash: 
    347             return defer.succeed(worker.crash()) 
    348         d = worker.stop() 
    349         if timeout: 
    350             callID = reactor.callLater(timeout, crashTheWorker, worker, d) 
    351             d.addCallback(stopped) 
     366            d = defer.succeed(worker.crash()) 
    352367        else: 
    353             # No tasks left unfinished if deferred fires without timeout 
    354             d.addCallback(lambda _: []) 
     368            d = worker.stop(