Changeset 90

Show
Ignore:
Timestamp:
10/02/07 21:02:44 (1 year ago)
Author:
edsuom
Message:

Sketching out a task cancellation feature

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/base.py

    r79 r90  
    7272            d = self.pendingGetCalls.pop() 
    7373            d.callback(heapq.heappop(self.heap)) 
     74 
     75    def cancel(self, selector): 
     76        """ 
     77        Removes all pending items from the heap that the supplied I{selector} 
     78        function selects. The function must take an item as its sole argument 
     79        and return C{True} if it selects the item for queue removal. 
     80        """ 
     81        for item in self.heap: 
     82            if selector(item): 
     83                self.heap.remove(item) 
     84        # Fix up the possibly mangled heap list 
     85        heapq.heapify(self.heap) 
    7486 
    7587 
     
    347359        task.d.addBoth(oneLessPending) 
    348360        return task.d 
     361 
     362    def cancelSeries(self, series): 
     363        """ 
     364        Cancels any pending tasks in the specified I{series}, unceremoniously 
     365        removing them from the queue. 
     366        """ 
     367        self.heap.cancel(lambda item: getattr(item, 'series', None) == series) 
    349368     
    350369    def subscribe(self, consumer): 
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r88 r90  
    248248        kw['niceness'] = self.jobs[jobID][1] 
    249249        return self.queue.call(callName, *args, **kw) 
     250 
     251    def cancel(self, jobID): 
     252        """ 
     253        Cancels the specified I{jobID} and any jobs that may be queued for 
     254        it. If the job doesn't exist, no error is raised. 
     255        """ 
     256        self.queue.cancelSeries(jobID) 
     257        self.jobs.pop(jobID, None) 
     258         
     259         
  • projects/AsynQueue/trunk/asynqueue/test/test_base.py

    r55 r90  
    3333 
    3434 
    35 class TestTaskQueueGeneric(TestCase): 
     35class Test_Priority(TestCase): 
     36    def setUp(self): 
     37        self.heap = Priority() 
     38 
     39    def test_cancel(self): 
     40        self.fail("Need to test this!") 
     41 
     42 
     43class Test_TaskQueue_Generic(TestCase): 
    3644    def setUp(self): 
    3745        self.queue = base.TaskQueue() 
     
    4048        return self.queue.shutdown() 
    4149 
    42     def testOneTask(self): 
     50    def test_oneTask(self): 
    4351        worker = MockWorker(0.5) 
    4452        self.queue.attachWorker(worker) 
     
    4755        return d 
    4856 
    49     def testOneWorker(self): 
     57    def test_oneWorker(self): 
    5058        N = 30 
    5159        mutable = [] 
     
    6876        return d 
    6977 
    70     def testMultipleWorkers(self): 
     78    def test_multipleWorkers(self): 
    7179        N = 100 
    7280        mutable = [] 
     
    9098        d.addCallback(checkResults) 
    9199        return d 
     100 
     101    def test_cancelSeries(self): 
     102        self.fail("Need to test this!")