Cannot understand why it is hangs. Perhaps it is a bug ?
the 'client' :
from twisted.spread import pb, flavors
from twisted.internet import reactor, defer
from twisted.python.failure import Failure
from twisted.python.log import err
from asynqueue import TaskQueue
import asynqueue.workers as workers
class ChildWorker(workers.RemoteCallWorker):
def _runNow(self, null, task):
funcName, args, kw = task.callTuple
d = self.remoteCaller('runJob', task.series, funcName, *args, **kw)
job = (task, d)
self.jobs.append(job)
d.addBoth(self._doneTrying, job)
d.addErrback(log.err)
def stop(self):
d = workers.RemoteCallWorker.stop(self)
d.addBoth(lambda _: self.remoteCaller('exit'))
d.addErrback(err)
return d
class JobManager(TaskQueue):
def __init__(self, *args, **kw):
TaskQueue.__init__(self, *args, **kw)
self.jobs = {}
def attachChild(self, childRoot):
worker = ChildWorker(childRoot)
result = [self.attachWorker(worker)]
dList = []
for jobID, jobInfo in self.jobs.iteritems():
jobCode = jobInfo[0]
d = childRoot.callRemote('newJob', jobID, jobCode)
dList.append(d)
return defer.DeferredList(dList).addCallback(lambda _: result[0])
def detachChild(self, childID):
return self.detachWorker(childID, reassign=True, crash=True)
def new(self, jobCode, niceness=0):
jobID = self._jobCounter = getattr(self, '_jobCounter', 0) + 1
self.jobs[jobID] = (jobCode, niceness)
dList = []
for worker in self.workers():
d = worker.remoteCaller('newJob', jobID, jobCode)
dList.append(d)
d = defer.DeferredList(dList)
d.addCallback(lambda _: jobID)
return d
def run(self, jobID, callName, *args, **kw):
if jobID not in self.jobs:
raise ValueError("No job '%s' registered" % jobID)
kw['series'] = jobID
kw['niceness'] = self.jobs[jobID][1]
return self.call(callName, *args, **kw)
def p(s):
print s
factory = pb.PBClientFactory()
reactor.connectTCP("localhost", 8800, factory)
d = factory.getRootObject()
m = JobManager()
d.addCallback(m.attachChild).addErrback(err)
d.addCallback(lambda _: m.new("def tst(): print 'qwerty'")).addErrback(err)
d.addCallback(lambda _: m.run(1,"tst", doNext=True)).addErrback(err)
d.addCallback(lambda _: m.detachChild).addErrback(err)
d.addCallback(lambda _: reactor.stop()).addErrback(err)
reactor.run()
the 'server'
from twisted.spread import pb
from twisted.internet import defer, reactor
from asynqueue.base import TaskQueue
import asynqueue.workers as workers
from twisted.python.log import err
class ChildRoot(pb.Root):
def __getattr__(self, name):
if name == 'jobs':
result = self.jobs = {}
elif name == 'queue':
result = self.queue = TaskQueue(workers.ThreadWorker())
elif name == 'trusted':
result = self.trusted = False
else:
raise AttributeError("No such attribute '%s'" % name)
return result
def remote_newJob(self, jobID, jobCode):
def tryJob():
try:
namespace = {}
exec jobCode in namespace
except:
return False
self.jobs[jobID] = namespace
return True
return self.queue.call(tryJob, niceness=-10)
def remote_runJob(self, jobID, callName, *args, **kw):
calledObject = getattr(self.jobs[jobID], callName, None)
if callable(calledObject):
return self.queue.call(calledObject, *args, **kw)
raise AttributeError(
"No callable object '%s' defined in namespace for job %d" \
% (callName, jobID))
def remote_exit(self):
self.queue.shutdown().addCallback(lambda _: reactor.stop()).addErrback(err)
if __name__ == '__main__':
root = ChildRoot()
root.trusted = True
factory = pb.PBServerFactory(root)
reactor.listenTCP(8800, pb.PBServerFactory(ChildRoot()))
reactor.run()
python-asynqueue 0.1-1