Ticket #2 (new defect)

Opened 1 year ago

hangs

Reported by: imgrey Assigned to: edsuom
Priority: blocker Milestone:
Component: AsynCluster Version:
Keywords: Cc:

Description

Cannot understand why it is hangs. Perhaps it is a bug ?

the 'client' :

from twisted.spread import pb, flavors
from twisted.internet import reactor, defer
from twisted.python.failure import Failure
from twisted.python.log import err
from asynqueue import TaskQueue
import asynqueue.workers as workers

class ChildWorker(workers.RemoteCallWorker):
	def _runNow(self, null, task):
		funcName, args, kw = task.callTuple
		d = self.remoteCaller('runJob', task.series, funcName, *args, **kw)
		job = (task, d)
		self.jobs.append(job)
		d.addBoth(self._doneTrying, job)
		d.addErrback(log.err)
	def stop(self):
		d = workers.RemoteCallWorker.stop(self)
		d.addBoth(lambda _: self.remoteCaller('exit'))
		d.addErrback(err)
		return d

class JobManager(TaskQueue):
	def __init__(self, *args, **kw):
		TaskQueue.__init__(self, *args, **kw)
		self.jobs  = {}
	def attachChild(self, childRoot):
		worker = ChildWorker(childRoot)
		result = [self.attachWorker(worker)]
		dList = []
		for jobID, jobInfo in self.jobs.iteritems():
			jobCode = jobInfo[0]
			d = childRoot.callRemote('newJob', jobID, jobCode)
			dList.append(d)
		return defer.DeferredList(dList).addCallback(lambda _: result[0])
	def detachChild(self, childID):
		return self.detachWorker(childID, reassign=True, crash=True)
	def new(self, jobCode, niceness=0):
		jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1
		self.jobs[jobID] = (jobCode, niceness)
		dList = []
		for worker in self.workers():
			d = worker.remoteCaller('newJob', jobID, jobCode)
			dList.append(d)
		d = defer.DeferredList(dList)
		d.addCallback(lambda _: jobID)
		return d
	def run(self, jobID, callName, *args, **kw):
		if jobID not in self.jobs:
			raise ValueError("No job '%s' registered" % jobID)
		kw['series'] = jobID
		kw['niceness'] = self.jobs[jobID][1]
		return self.call(callName, *args, **kw)

def p(s):
    print s
factory = pb.PBClientFactory()
reactor.connectTCP("localhost", 8800, factory)
d = factory.getRootObject()
m = JobManager()
d.addCallback(m.attachChild).addErrback(err)
d.addCallback(lambda _: m.new("def tst(): print 'qwerty'")).addErrback(err)
d.addCallback(lambda _: m.run(1,"tst", doNext=True)).addErrback(err)
d.addCallback(lambda _: m.detachChild).addErrback(err)
d.addCallback(lambda _: reactor.stop()).addErrback(err)
reactor.run()

the 'server'

from twisted.spread import pb
from twisted.internet import defer, reactor
from asynqueue.base import TaskQueue
import asynqueue.workers as workers
from twisted.python.log import err

class ChildRoot(pb.Root):
	def __getattr__(self, name):
		if name == 'jobs':
			result = self.jobs = {}
		elif name == 'queue':
			result = self.queue = TaskQueue(workers.ThreadWorker())
		elif name == 'trusted':
			result = self.trusted = False
		else:
			raise AttributeError("No such attribute '%s'" % name)
		return result
	def remote_newJob(self, jobID, jobCode):
		def tryJob():
			try:
				namespace = {}
				exec jobCode in namespace
			except:
 				return False
			self.jobs[jobID] = namespace
			return True
		return self.queue.call(tryJob, niceness=-10)
	def remote_runJob(self, jobID, callName, *args, **kw):
		calledObject = getattr(self.jobs[jobID], callName, None)
		if callable(calledObject):
			return self.queue.call(calledObject, *args, **kw)
		raise AttributeError(
			"No callable object '%s' defined in namespace for job %d" \
			% (callName, jobID))
	def remote_exit(self):
		self.queue.shutdown().addCallback(lambda _: reactor.stop()).addErrback(err)

if __name__ == '__main__':
	root = ChildRoot()
	root.trusted = True
	factory = pb.PBServerFactory(root)
	reactor.listenTCP(8800, pb.PBServerFactory(ChildRoot()))
	reactor.run()

python-asynqueue 0.1-1