Changeset 92

Show
Ignore:
Timestamp:
10/04/07 21:29:25 (1 year ago)
Author:
edsuom
Message:

Job-dispatching work

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/coreworker

    r89 r92  
    2929 
    3030# Let 'er rip... 
    31 main.BaseWorker() 
     31main.BaseManager() 
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r91 r92  
    189189    def jobTried(self, result, jobID, worker): 
    190190        """ 
    191         Callback for loading a new job or running a call on an existing one
     191        Callback from loading a new job
    192192 
    193193        If the worker's root reference raised an unexpected failure, returns 
     
    199199            log("Worker %d supplied nonconforming root reference", worker.ID) 
    200200            return False 
    201         if result[0]: 
    202             msg = "Callable objects: %s" % ", ".join(result[1]) 
    203             log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg) 
    204             self.queue.qualifyWorker(worker, jobID) 
    205             return True 
    206         log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1]) 
    207         return None 
     201        if isinstance(result, (list, tuple)): 
     202            if result[0]: 
     203                msg = "Callable objects: %s" % ", ".join(result[1]) 
     204                log("Job %d loaded OK on worker %s\n%s", jobID, worker.ID, msg) 
     205                self.queue.qualifyWorker(worker, jobID) 
     206                return True 
     207            log("Job %d failed on worker %d:\n%s", jobID, worker.ID, result[1]) 
     208            return None 
     209        # Not a failure or status,result tuple, so just pass it along 
     210        return result 
    208211 
    209212    def attachChild(self, childRoot, N=3): 
     
    299302 
    300303        """ 
     304        def jobRan(result): 
     305            status, result = result 
     306            if status: 
     307                return result 
     308            log("Error running job %d:\n%s", jobID, result) 
     309             
    301310        jobID = int(jobID) 
    302311        if jobID not in self.jobs: 
     
    304313        kw['series'] = jobID 
    305314        kw['niceness'] = self.jobs[jobID][1] 
    306         return self.queue.call(callName, *args, **kw) 
     315        d = self.queue.call(callName, *args, **kw) 
     316        d.addBoth(jobRan) 
     317        return d 
    307318 
    308319    def cancel(self, jobID): 
  • projects/AsynQueue/trunk/asynqueue/test/test_base.py

    r91 r92  
    129129        return d 
    130130 
    131     def test_cancelSeries(self): 
    132         self.fail("Need to test this!") 
  • projects/AsynQueue/trunk/asynqueue/test/test_jobs.py

    r91 r92  
    201201 
    202202    def test_run_one(self): 
    203         def check(result): 
    204             self.failUnlessEqual(result, (True, 50)) 
    205          
    206203        d = self._attach() 
    207204        d.addCallback(lambda _: self.mgr.new(JOB_CODE)) 
    208205        d.addCallback(self.mgr.run, 'test', 10, 20) 
    209         d.addCallback(check
     206        d.addCallback(self.failUnlessEqual, 50
    210207        return d 
    211208 
     
    221218            yield wfd 
    222219            results.append(wfd.getResult()) 
    223         self.failUnlessEqual([x[1] for x in results], range(10)) 
     220        self.failUnlessEqual(results, range(10)) 
    224221 
    225222    @defer.deferredGenerator 
     
    236233            dList.append(d) 
    237234        yield defer.waitForDeferred(defer.DeferredList(dList)) 
    238         self.failUnlessEqual([x[1] for x in results], range(10)) 
     235        self.failUnlessEqual(results, range(10))