Changeset 90
- Timestamp:
- 10/02/07 21:02:44 (1 year ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/base.py (modified) (2 diffs)
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (1 diff)
- projects/AsynQueue/trunk/asynqueue/test/test_base.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/base.py
r79 r90 72 72 d = self.pendingGetCalls.pop() 73 73 d.callback(heapq.heappop(self.heap)) 74 75 def cancel(self, selector): 76 """ 77 Removes all pending items from the heap that the supplied I{selector} 78 function selects. The function must take an item as its sole argument 79 and return C{True} if it selects the item for queue removal. 80 """ 81 for item in self.heap: 82 if selector(item): 83 self.heap.remove(item) 84 # Fix up the possibly mangled heap list 85 heapq.heapify(self.heap) 74 86 75 87 … … 347 359 task.d.addBoth(oneLessPending) 348 360 return task.d 361 362 def cancelSeries(self, series): 363 """ 364 Cancels any pending tasks in the specified I{series}, unceremoniously 365 removing them from the queue. 366 """ 367 self.heap.cancel(lambda item: getattr(item, 'series', None) == series) 349 368 350 369 def subscribe(self, consumer): projects/AsynQueue/trunk/asynqueue/jobs.py
r88 r90 248 248 kw['niceness'] = self.jobs[jobID][1] 249 249 return self.queue.call(callName, *args, **kw) 250 251 def cancel(self, jobID): 252 """ 253 Cancels the specified I{jobID} and any jobs that may be queued for 254 it. If the job doesn't exist, no error is raised. 255 """ 256 self.queue.cancelSeries(jobID) 257 self.jobs.pop(jobID, None) 258 259 projects/AsynQueue/trunk/asynqueue/test/test_base.py
r55 r90 33 33 34 34 35 class TestTaskQueueGeneric(TestCase): 35 class Test_Priority(TestCase): 36 def setUp(self): 37 self.heap = Priority() 38 39 def test_cancel(self): 40 self.fail("Need to test this!") 41 42 43 class Test_TaskQueue_Generic(TestCase): 36 44 def setUp(self): 37 45 self.queue = base.TaskQueue() … … 40 48 return self.queue.shutdown() 41 49 42 def test OneTask(self):50 def test_oneTask(self): 43 51 worker = MockWorker(0.5) 44 52 self.queue.attachWorker(worker) … … 47 55 return d 48 56 49 def test OneWorker(self):57 def test_oneWorker(self): 50 58 N = 30 51 59 mutable = [] … … 68 76 return d 69 77 70 def test MultipleWorkers(self):78 def test_multipleWorkers(self): 71 79 N = 100 72 80 mutable = [] … … 90 98 d.addCallback(checkResults) 91 99 return d 100 101 def test_cancelSeries(self): 102 self.fail("Need to test this!")
