Show
Ignore:
Timestamp:
11/26/07 16:30:56 (1 year ago)
Author:
edsuom
Message:

Task reassignment after disconnections is working and tested, and a timeout feature has been added to allow any stray tasks that got lost in space after a disconnect to be retried

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/base.py

    r90 r112  
    189189                    break 
    190190                wfd = defer.waitForDeferred(self.mgr.assignment(task)) 
    191                 yield wfd 
     191                yield wfd; wfd.getResult() 
    192192            # Clean up after the loop exits 
    193193            wfd = defer.waitForDeferred(self.mgr.shutdown(self.timeout)) 
     
    195195            self.heap.shutdown() 
    196196            # The result of the runner is a list of any unfinished tasks. 
    197             yield wfd.getResult() 
     197            result = [] 
     198            try: 
     199                result = wfd.getResult() 
     200            except: 
     201                pass 
     202            yield result 
    198203         
    199204        if self.isRunning(): 
     
    248253            if worker == workerOrID: 
    249254                return thisID 
    250         raise ValueError("No such worker") 
    251255     
    252256    def detachWorker(self, workerOrID, reassign=False, crash=False): 
     
    261265        See L{tasks.WorkerManager.terminate}. 
    262266        """ 
    263         def terminated(unfinishedTasks): 
    264             for task in unfinishedTasks: 
    265                 self.mgr.assignment(task) 
    266  
    267267        ID = self._getWorkerID(workerOrID) 
     268        if ID is None: 
     269            return 
    268270        if crash: 
    269             d = self.mgr.terminate(ID, crash=True
     271            d = self.mgr.terminate(ID, crash=True, reassign=reassign
    270272        else: 
    271             d = self.mgr.terminate(ID, self.timeout) 
    272         if reassign: 
    273             d.addCallback(terminated) 
     273            d = self.mgr.terminate(ID, self.timeout, reassign=reassign) 
    274274        return d 
    275275