Changeset 95
- Timestamp:
- 10/18/07 01:19:48 (1 year ago)
- Files:
-
- projects/AsynQueue/trunk/asynqueue/jobs.py (modified) (7 diffs)
- projects/AsynQueue/trunk/asynqueue/test/test_jobs.py (modified) (2 diffs)
- projects/AsynQueue/trunk/asynqueue/workers.py (modified) (9 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/jobs.py
r93 r95 144 144 workers.RemoteCallWorker.__init__(self, *args, **kw) 145 145 146 def _runNow(self, null, task):146 def runNow(self, null, task): 147 147 funcName, args, kw = task.callTuple 148 148 d = self.remoteCaller('runJob', task.series, funcName, *args, **kw) 149 149 job = (task, d) 150 150 self.jobs.append(job) 151 d.addBoth(self. _doneTrying, job)151 d.addBoth(self.doneTrying, job) 152 152 # This task's deferred is NOT returned! 153 153 … … 174 174 def __init__(self, queue=None): 175 175 self.jobs = {} 176 self.updates = {} 176 177 if queue is None: 177 178 self.queue = base.TaskQueue() … … 228 229 if status is False: 229 230 mutable.append(None) 231 elif jobID in self.updates: 232 return self._runUpdate(jobID, worker) 230 233 231 234 def allDone(null): … … 281 284 d.addCallback(lambda _: jobID) 282 285 return d 286 287 def _runUpdate(self, jobID, worker): 288 funcName, args, kw = self.updates[jobID] 289 return worker.remoteCaller('runJob', jobID, funcName, *args, **kw) 290 291 def update(self, jobID, callName, *args, **kw): 292 """ 293 """ 294 self.updates[jobID] = callName, args, kw 295 dList = [ 296 self._runUpdate(jobID, worker) 297 for worker in self.queue.workers()] 298 return defer.DeferredList(dList) 283 299 284 300 def run(self, jobID, callName, *args, **kw): … … 307 323 return result 308 324 log("Error running job %d:\n%s", jobID, result) 309 325 326 def jobFailed(failure): 327 log("Unexpected error running job %d:\n%s", 328 jobID, failure.getTraceback()) 329 310 330 jobID = int(jobID) 311 331 if jobID not in self.jobs: … … 314 334 kw['niceness'] = self.jobs[jobID][1] 315 335 d = self.queue.call(callName, *args, **kw) 316 d.add Both(jobRan)336 d.addCallbacks(jobRan, jobFailed) 317 337 return d 318 338 … … 324 344 self.queue.cancelSeries(jobID) 325 345 self.jobs.pop(jobID, None) 326 327 346 self.updates.pop(jobID, None) 347 348 projects/AsynQueue/trunk/asynqueue/test/test_jobs.py
r92 r95 189 189 return self._attach(bogus=True).addCallback(self.failUnlessEqual, None) 190 190 191 def test_attachChild_withUpdate(self): 192 self.fail( 193 "Test that attached child gets updated before running any jobs") 194 191 195 def test_new(self): 192 196 def check(jobID): … … 234 238 yield defer.waitForDeferred(defer.DeferredList(dList)) 235 239 self.failUnlessEqual(results, range(10)) 240 241 def test_update(self): 242 self.fail("Test updates before next job runs") projects/AsynQueue/trunk/asynqueue/workers.py
r79 r95 25 25 from twisted.python import failure 26 26 from twisted.internet import defer, reactor 27 from twisted.spread import pb 27 28 28 29 import errors … … 225 226 226 227 def __init__(self, remoteReference, N=3, noTypeCheck=False): 227 from twisted.spread import pb 228 self.N = N 229 self.iQualified = [] 230 self.remoteCaller = remoteReference.callRemote 228 231 # Check supplied remote reference object 229 232 if not noTypeCheck: … … 233 236 raise TypeError( 234 237 "You must construct me with a PB RemoteReference") 238 self.startup(remoteReference) 239 240 def startup(self, remoteReference): 241 """ 242 Starts things up with the remote reference in hand. Useful to have this 243 as a separate method when you're subclassing and doing difference 244 constructor stuff. 245 """ 235 246 # Setup resignation-upon-disconnect 236 247 self.resignators = [] 237 248 self.disconnectErrors = (pb.DeadReferenceError, pb.PBConnectionLost) 238 remoteReference.notifyOnDisconnect(self._resign) 239 # Setup attributes 240 self.N = N 241 self.iQualified = [] 242 self.remoteCaller = remoteReference.callRemote 249 remoteReference.notifyOnDisconnect(self.resign) 243 250 # Prepare the run request queue 244 251 self.jobs = [] … … 247 254 self.runRequestQueue.put(None) 248 255 249 def _runNow(self, null, task):256 def runNow(self, null, task): 250 257 suffix, args, kw = task.callTuple 251 258 d = self.remoteCaller(suffix, *args, **kw) 252 259 job = (task, d) 253 260 self.jobs.append(job) 254 d.addCallback(self. _doneTrying, job)255 d.addErrback(self. _oops)261 d.addCallback(self.doneTrying, job) 262 d.addErrback(self.oops) 256 263 # The task's deferred is NOT returned! 257 264 258 def _oops(self, failure):265 def oops(self, failure): 259 266 if failure.check(*self.disconnectErrors): 260 self. _resign()267 self.resign() 261 268 else: 262 269 return failure 263 270 264 def _doneTrying(self, result, job):271 def doneTrying(self, result, job): 265 272 self.jobs.remove(job) 266 273 self.runRequestQueue.put(None) … … 268 275 task.d.callback(result) 269 276 270 def _resign(self, *null):277 def resign(self, *null): 271 278 while self.resignators: 272 279 callableObject = self.resignators.pop() … … 291 298 if getattr(self, 'isShuttingDown', False): 292 299 raise errors.QueueRunError 293 return self.runRequestQueue.get().addCallback(self. _runNow, task)300 return self.runRequestQueue.get().addCallback(self.runNow, task) 294 301 295 302 def stop(self): … … 329 336 self.suffixCache = [] 330 337 331 def _names(self, items):338 def names(self, items): 332 339 nameListing = [x.__name__ for x in items] 333 340 nameListing[-1] = "or " + nameListing[-1] … … 335 342 return joinString.join(nameListing) 336 343 337 def _checkSuffix(self, suffix):344 def checkSuffix(self, suffix): 338 345 for interface in self.interfaces: 339 346 for attrName in interface: … … 341 348 self.suffixCache.append(suffix) 342 349 return 343 names = self. _names(self.interfaces)350 names = self.names(self.interfaces) 344 351 raise AttributeError( 345 352 "No remote method *_%s provided by interface %s" % (suffix, names)) 346 353 347 def _runNow(self, null, task):354 def runNow(self, null, task): 348 355 suffix, args, kw = task.callTuple 349 356 if suffix not in self.suffixCache: 350 self. _checkSuffix(suffix)357 self.checkSuffix(suffix) 351 358 d = self.remoteCaller(suffix, *args, **kw) 352 359 job = (task, d) 353 360 self.jobs.append(job) 354 d.addBoth(self. _doneTrying, job)361 d.addBoth(self.doneTrying, job) 355 362 # The task's deferred is NOT returned!
