Changeset 96
- Timestamp:
- 10/18/07 17:30:38 (1 year ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/test/test_jobs.py
r95 r96 31 31 32 32 JOB_CODE = """ 33 G = [] 34 35 def setup(x): 36 G.append(x) 37 return G 38 33 39 def test(a, b, c=0): 34 40 return a + 2*b + 3*c … … 48 54 result = self.root.remote_newJob(JOB_ID, JOB_CODE) 49 55 self.failUnlessEqual(result[0], True) 50 self.failUnlessElementsEqual(result[1], [' test', 'bogusable'])56 self.failUnlessElementsEqual(result[1], ['setup', 'test', 'bogusable']) 51 57 52 58 def test_newJob_bogus(self): … … 190 196 191 197 def test_attachChild_withUpdate(self): 192 self.fail( 193 "Test that attached child gets updated before running any jobs") 198 self.mgr.jobs[JOB_ID] = (JOB_CODE, 0) 199 # This must run first, on attachment 200 d1 = self.mgr.update(JOB_ID, 'setup', 1) 201 # The actual attachment event chain 202 d2 = self._attach() 203 d2.addCallback(self.mgr.run, 'setup', 2) 204 d2.addCallback(self.failUnlessEqual, [1, 2]) 205 # Wait for both 206 return defer.DeferredList([d1, d2]) 194 207 195 208 def test_new(self): … … 238 251 yield defer.waitForDeferred(defer.DeferredList(dList)) 239 252 self.failUnlessEqual(results, range(10)) 240 241 def test_update(self):242 self.fail("Test updates before next job runs")
