Changeset 95

Show
Ignore:
Timestamp:
10/18/07 01:19:48 (1 year ago)
Author:
edsuom
Message:

Prototype code for job updating...needs testing

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r93 r95  
    144144        workers.RemoteCallWorker.__init__(self, *args, **kw) 
    145145 
    146     def _runNow(self, null, task): 
     146    def runNow(self, null, task): 
    147147        funcName, args, kw = task.callTuple 
    148148        d = self.remoteCaller('runJob', task.series, funcName, *args, **kw) 
    149149        job = (task, d) 
    150150        self.jobs.append(job) 
    151         d.addBoth(self._doneTrying, job) 
     151        d.addBoth(self.doneTrying, job) 
    152152        # This task's deferred is NOT returned! 
    153153 
     
    174174    def __init__(self, queue=None): 
    175175        self.jobs  = {} 
     176        self.updates = {} 
    176177        if queue is None: 
    177178            self.queue = base.TaskQueue() 
     
    228229            if status is False: 
    229230                mutable.append(None) 
     231            elif jobID in self.updates: 
     232                return self._runUpdate(jobID, worker) 
    230233         
    231234        def allDone(null): 
     
    281284        d.addCallback(lambda _: jobID) 
    282285        return d 
     286 
     287    def _runUpdate(self, jobID, worker): 
     288        funcName, args, kw = self.updates[jobID] 
     289        return worker.remoteCaller('runJob', jobID, funcName, *args, **kw) 
     290 
     291    def update(self, jobID, callName, *args, **kw): 
     292        """ 
     293        """ 
     294        self.updates[jobID] = callName, args, kw 
     295        dList = [ 
     296            self._runUpdate(jobID, worker) 
     297            for worker in self.queue.workers()] 
     298        return defer.DeferredList(dList) 
    283299     
    284300    def run(self, jobID, callName, *args, **kw): 
     
    307323                return result 
    308324            log("Error running job %d:\n%s", jobID, result) 
    309              
     325 
     326        def jobFailed(failure): 
     327            log("Unexpected error running job %d:\n%s", 
     328                jobID, failure.getTraceback()) 
     329         
    310330        jobID = int(jobID) 
    311331        if jobID not in self.jobs: 
     
    314334        kw['niceness'] = self.jobs[jobID][1] 
    315335        d = self.queue.call(callName, *args, **kw) 
    316         d.addBoth(jobRan
     336        d.addCallbacks(jobRan, jobFailed
    317337        return d 
    318338 
     
    324344        self.queue.cancelSeries(jobID) 
    325345        self.jobs.pop(jobID, None) 
    326          
    327          
     346        self.updates.pop(jobID, None) 
     347         
     348         
  • projects/AsynQueue/trunk/asynqueue/test/test_jobs.py

    r92 r95  
    189189        return self._attach(bogus=True).addCallback(self.failUnlessEqual, None) 
    190190 
     191    def test_attachChild_withUpdate(self): 
     192        self.fail( 
     193            "Test that attached child gets updated before running any jobs") 
     194 
    191195    def test_new(self): 
    192196        def check(jobID): 
     
    234238        yield defer.waitForDeferred(defer.DeferredList(dList)) 
    235239        self.failUnlessEqual(results, range(10)) 
     240 
     241    def test_update(self): 
     242        self.fail("Test updates before next job runs") 
  • projects/AsynQueue/trunk/asynqueue/workers.py

    r79 r95  
    2525from twisted.python import failure 
    2626from twisted.internet import defer, reactor 
     27from twisted.spread import pb 
    2728 
    2829import errors 
     
    225226 
    226227    def __init__(self, remoteReference, N=3, noTypeCheck=False): 
    227         from twisted.spread import pb 
     228        self.N = N 
     229        self.iQualified = [] 
     230        self.remoteCaller = remoteReference.callRemote 
    228231        # Check supplied remote reference object 
    229232        if not noTypeCheck: 
     
    233236                raise TypeError( 
    234237                    "You must construct me with a PB RemoteReference") 
     238        self.startup(remoteReference) 
     239 
     240    def startup(self, remoteReference): 
     241        """ 
     242        Starts things up with the remote reference in hand. Useful to have this 
     243        as a separate method when you're subclassing and doing difference 
     244        constructor stuff. 
     245        """ 
    235246        # Setup resignation-upon-disconnect 
    236247        self.resignators = [] 
    237248        self.disconnectErrors = (pb.DeadReferenceError, pb.PBConnectionLost) 
    238         remoteReference.notifyOnDisconnect(self._resign) 
    239         # Setup attributes 
    240         self.N = N 
    241         self.iQualified = [] 
    242         self.remoteCaller = remoteReference.callRemote 
     249        remoteReference.notifyOnDisconnect(self.resign) 
    243250        # Prepare the run request queue 
    244251        self.jobs = [] 
     
    247254            self.runRequestQueue.put(None) 
    248255 
    249     def _runNow(self, null, task): 
     256    def runNow(self, null, task): 
    250257        suffix, args, kw = task.callTuple 
    251258        d = self.remoteCaller(suffix, *args, **kw) 
    252259        job = (task, d) 
    253260        self.jobs.append(job) 
    254         d.addCallback(self._doneTrying, job) 
    255         d.addErrback(self._oops) 
     261        d.addCallback(self.doneTrying, job) 
     262        d.addErrback(self.oops) 
    256263        # The task's deferred is NOT returned! 
    257264 
    258     def _oops(self, failure): 
     265    def oops(self, failure): 
    259266        if failure.check(*self.disconnectErrors): 
    260             self._resign() 
     267            self.resign() 
    261268        else: 
    262269            return failure 
    263270     
    264     def _doneTrying(self, result, job): 
     271    def doneTrying(self, result, job): 
    265272        self.jobs.remove(job) 
    266273        self.runRequestQueue.put(None) 
     
    268275        task.d.callback(result) 
    269276     
    270     def _resign(self, *null): 
     277    def resign(self, *null): 
    271278        while self.resignators: 
    272279            callableObject = self.resignators.pop() 
     
    291298        if getattr(self, 'isShuttingDown', False): 
    292299            raise errors.QueueRunError 
    293         return self.runRequestQueue.get().addCallback(self._runNow, task) 
     300        return self.runRequestQueue.get().addCallback(self.runNow, task) 
    294301     
    295302    def stop(self): 
     
    329336        self.suffixCache = [] 
    330337 
    331     def _names(self, items): 
     338    def names(self, items): 
    332339        nameListing = [x.__name__ for x in items] 
    333340        nameListing[-1] = "or " + nameListing[-1] 
     
    335342        return joinString.join(nameListing) 
    336343 
    337     def _checkSuffix(self, suffix): 
     344    def checkSuffix(self, suffix): 
    338345        for interface in self.interfaces: 
    339346            for attrName in interface: 
     
    341348                    self.suffixCache.append(suffix) 
    342349                    return 
    343         names = self._names(self.interfaces) 
     350        names = self.names(self.interfaces) 
    344351        raise AttributeError( 
    345352            "No remote method *_%s provided by interface %s" % (suffix, names)) 
    346353 
    347     def _runNow(self, null, task): 
     354    def runNow(self, null, task): 
    348355        suffix, args, kw = task.callTuple 
    349356        if suffix not in self.suffixCache: 
    350             self._checkSuffix(suffix) 
     357            self.checkSuffix(suffix) 
    351358        d = self.remoteCaller(suffix, *args, **kw) 
    352359        job = (task, d) 
    353360        self.jobs.append(job) 
    354         d.addBoth(self._doneTrying, job) 
     361        d.addBoth(self.doneTrying, job) 
    355362        # The task's deferred is NOT returned!