Changeset 126

Show
Ignore:
Timestamp:
02/22/08 22:41:33 (9 months ago)
Author:
edsuom
Message:

Worker specialties, improved timouts

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/base.py

    r112 r126  
    2626from zope.interface import implements 
    2727from twisted.python import failure 
    28 from twisted.internet import defer, reactor, interfaces 
     28from twisted.internet import reactor, interfaces, defer 
     29# Use C Deferreds if possible, for efficiency 
     30try: 
     31    from twisted.internet import cdefer 
     32except: 
     33    pass 
     34else: 
     35    defer.Deferred = cdefer.Deferred 
    2936 
    3037import tasks 
     
    328335          task has been queued up. 
    329336 
     337        @keyword timeout: A timeout interval in seconds from when a worker gets 
     338          a task assignment for the call, after which the call will be retried. 
     339 
    330340        """ 
    331341        def oneLessPending(result): 
     
    351361        niceness = kw.pop('niceness', 0) 
    352362        series = kw.pop('series', None) 
    353         task = self._taskFactory.new(func, args, kw, niceness, series) 
     363        timeout = kw.pop('timeout', None) 
     364        task = self._taskFactory.new(func, args, kw, niceness, series, timeout) 
    354365        if kw.pop('doNext', False): 
    355366            task.priority = -1000000 
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r116 r126  
    2525from zope.interface import implements, Interface 
    2626from twisted.internet import defer, reactor 
     27from twisted.python import reflect 
     28from twisted.python.rebuild import rebuild 
    2729from twisted.python.failure import Failure 
    28 from twisted.python import reflect 
    2930from twisted.spread import pb, flavors 
     31# Use C Deferreds if possible, for efficiency 
     32try: 
     33    from twisted.internet import cdefer 
     34except: 
     35    pass 
     36else: 
     37    defer.Deferred = cdefer.Deferred 
    3038 
    3139import base, workers 
     
    8997            if cls.__module__ not in modules: 
    9098                modules.append(cls.__module__) 
    91         # Try to reload the modules for the classes in case they've changed 
     99        # Try to build the modules for the classes in case they've changed 
    92100        # since the last run 
    93101        for module in modules: 
    94102            try: 
    95                 reload(reflect.namedModule(module)
     103                rebuild(reflect.namedModule(module), doLog=False
    96104            except: 
    97105                pass 
     
    221229     
    222230    """ 
    223     maxRetries = 2 
     231    maxRetries = 1 
    224232     
    225233    def __init__(self, queue=None): 
     
    409417        All keywords except for the following are passed to the call: 
    410418 
    411           - B{timeout}: A timeout interval in seconds after which the call will 
    412             be retried. 
     419          - B{timeout}: A timeout interval in seconds from when a worker 
     420            gets a task assignment for the call, after which the call will be 
     421            retried. 
    413422           
    414423        @note: The task object generated contains the name of a callable (as a 
     
    424433                kw['doNext'] = True 
    425434            dq = self.queue.call(callName, *args, **kw) 
    426             if timeout: 
    427                 dq.setTimeout( 
    428                     timeout, 
    429                     timeoutFunc=lambda d: jobRan((False, "Timeout"))) 
    430             dq.addErrback(jobFailed) 
     435            dq.addErrback(lambda failure: (False, failure.getTraceback())) 
    431436            dq.addCallback(jobRan) 
    432          
     437 
    433438        def jobRan(result): 
    434439            status, result = result 
    435             if status and jobID in self.callsPending
    436                 if d in self.callsPending[jobID]
     440            if status
     441                if d in self.callsPending.get(jobID, [])
    437442                    del self.callsPending[jobID][d] 
    438443                    d.callback(result) 
     444            elif result == 'Timeout': 
     445                log("Timeout on job %d, retrying", jobID) 
     446                tryAgain() 
    439447            else: 
    440448                log("Error running job %d:\n%s", jobID, result) 
    441449                tryAgain() 
    442  
    443         def jobFailed(failure): 
    444             return False, failure.getTraceback() 
    445450 
    446451        def tryAgain(): 
     
    456461        if jobID not in self.jobs: 
    457462            raise ValueError("No job '%s' registered" % jobID) 
    458         timeout = kw.pop('timeout', None) 
    459463        kw['series'] = jobID 
    460464        kw['niceness'] = self.jobs[jobID][1] 
  • projects/AsynQueue/trunk/asynqueue/tasks.py

    r112 r126  
    2424# Imports 
    2525from twisted.internet import defer, reactor 
     26# Use C Deferreds if possible, for efficiency 
     27try: 
     28    from twisted.internet import cdefer 
     29except: 
     30    pass 
     31else: 
     32    defer.Deferred = cdefer.Deferred 
    2633 
    2734from workers import IWorker 
     
    4653 
    4754    """ 
    48     def __init__(self, f, args, kw, priority, series): 
     55    def __init__(self, f, args, kw, priority, series, timeout=None): 
    4956        if not isinstance(args, (tuple, list)): 
    5057            raise TypeError("Second argument 'args' isn't a sequence") 
     
    5562        self.series = series 
    5663        self.d = defer.Deferred() 
     64        self.timeout = timeout 
     65 
     66    def startTimer(self): 
     67        if self.timeout: 
     68            self.callID = reactor.callLater(self.timeout, self.timedout) 
     69        else: 
     70            self.callID = None 
    5771 
    5872    def callback(self, result): 
     73        if self.callID: 
     74            self.callID.cancel() 
     75            self.callID = None 
    5976        if not self.d.called: 
    6077            self.d.callback(result) 
    6178 
    6279    def errback(self, result): 
     80        if self.callID: 
     81            self.callID.cancel() 
     82            self.callID = None 
    6383        self.d.errback(result) 
     84 
     85    def timedout(self): 
     86        if not self.d.called: 
     87            self.d.callback((False, "Timeout")) 
     88        self.callID = None 
    6489     
    6590    def __repr__(self): 
     
    105130        self.seriesNumbers = {} 
    106131 
    107     def new(self, func, args, kw, niceness, series=None): 
     132    def new(self, func, args, kw, niceness, series=None, timeout=None): 
    108133        """ 
    109134        Call this to obtain a L{Task} instance that will run in the specified 
     
    131156        positivized = niceness + 20 
    132157        priority = self._serial(series) * (1 + (float(positivized)/10)**2) 
    133         return self.TaskClass(func, args, kw, priority, series
     158        return self.TaskClass(func, args, kw, priority, series, timeout
    134159     
    135160    def _serial(self, series): 
     
    176201        """ 
    177202        self.d.callback(None) 
     203        self.task.startTimer() 
    178204        return worker.run(self.task) 
    179205 
  • projects/AsynQueue/trunk/asynqueue/test/util.py

    r112 r126  
    3838 
    3939class MockTask(object): 
    40     def __init__(self, f, args, kw, priority, series): 
     40    def __init__(self, f, args, kw, priority, series, timeout=None): 
    4141        self.ran = False 
    4242        self.callTuple = (f, args, kw) 
     
    5252    def __str__(self): 
    5353        return str(self.callTuple[0]) 
     54 
     55    def startTimer(self): 
     56        pass 
    5457 
    5558 
  • projects/AsynQueue/trunk/asynqueue/workers.py

    r112 r126  
    118118    """ 
    119119    I implement an L{IWorker} that runs tasks in a dedicated worker thread. 
     120 
     121    You can define one or more specialties that I am qualified to handle with 
     122    string arguments. 
    120123    """ 
    121124    implements(IWorker) 
    122125    cQualified = [] 
    123126 
    124     def __init__(self): 
     127    def __init__(self, *specialties): 
    125128        import threading 
    126         self.iQualified = [] 
     129        self.iQualified = list(specialties) 
    127130        self.event = threading.Event() 
    128131        self.thread = threading.Thread(target=self._loop) 
  • projects/AsynQueue/trunk/setup.py

    r79 r126  
    3030 
    3131### Define setup options 
    32 kw = {'version':'0.2', 
     32kw = {'version':'0.3', 
    3333      'license':'GPL', 
    3434      'platforms':'OS Independent', 
     
    4747kw['keywords'] = [ 
    4848    'Twisted', 'asynchronous', 'threads', 
    49     'taskqueue', 'queue', 'priority', 'tasks', 'jobs'] 
     49    'taskqueue', 'queue', 'priority', 'tasks', 'jobs', 'nodes', 'cluster'] 
     50 
    5051 
    5152kw['classifiers'] = [ 
    5253    'Development Status :: 4 - Beta', 
     54 
    5355    'Intended Audience :: Developers', 
     56    'Intended Audience :: Science/Research', 
     57 
    5458    'License :: OSI Approved :: GNU General Public License (GPL)', 
    5559    'Operating System :: OS Independent', 
    5660    'Programming Language :: Python', 
     61 
     62    'Topic :: System :: Distributed Computing', 
     63    'Topic :: Software Development :: Object Brokering', 
    5764    'Topic :: Software Development :: Libraries :: Python Modules', 
    5865    ] 
     66 
    5967 
    6068kw['description'] = " ".join("""