| 1 |
# AsynQueue: |
|---|
| 2 |
# Asynchronous task queueing based on the Twisted framework, with task |
|---|
| 3 |
# prioritization and a powerful worker/manager interface. |
|---|
| 4 |
# |
|---|
| 5 |
# Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com |
|---|
| 6 |
# |
|---|
| 7 |
# This program is free software; you can redistribute it and/or modify it under |
|---|
| 8 |
# the terms of the GNU General Public License as published by the Free Software |
|---|
| 9 |
# Foundation; either version 2 of the License, or (at your option) any later |
|---|
| 10 |
# version. |
|---|
| 11 |
# |
|---|
| 12 |
# This program is distributed in the hope that it will be useful, but WITHOUT |
|---|
| 13 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 14 |
# FOR A PARTICULAR PURPOSE. See the file COPYING for more details. |
|---|
| 15 |
# |
|---|
| 16 |
# You should have received a copy of the GNU General Public License along with |
|---|
| 17 |
# this program; if not, write to the Free Software Foundation, Inc., 51 |
|---|
| 18 |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA |
|---|
| 19 |
|
|---|
| 20 |
""" |
|---|
| 21 |
An IWorker implementation using Perspective Broker (PB) over STDIO. |
|---|
| 22 |
|
|---|
| 23 |
Based on coding by Konrads Smelkovs |
|---|
| 24 |
""" |
|---|
| 25 |
|
|---|
| 26 |
import sys, os.path |
|---|
| 27 |
from twisted.internet import protocol, stdio, reactor, defer |
|---|
| 28 |
from twisted.spread import pb |
|---|
| 29 |
|
|---|
| 30 |
import jobs |
|---|
| 31 |
|
|---|
| 32 |
CHILD_CODE = """ |
|---|
| 33 |
import os, sys |
|---|
| 34 |
os.renice(int(sys.argv[-1])) |
|---|
| 35 |
from asynqueue import processworker |
|---|
| 36 |
processworker.runAsChild() |
|---|
| 37 |
""" |
|---|
| 38 |
|
|---|
| 39 |
|
|---|
| 40 |
class ProtocolWrapper(protocol.ProcessProtocol): |
|---|
| 41 |
""" |
|---|
| 42 |
I wrap a L{Protocol} instance in a L{ProcessProtocol} instance so that... |
|---|
| 43 |
""" |
|---|
| 44 |
def __init__(self, proto, startCallback=None, stopCallback=None): |
|---|
| 45 |
self.proto = proto |
|---|
| 46 |
self.startCallback = startCallback |
|---|
| 47 |
self.stopCallback = stopCallback |
|---|
| 48 |
|
|---|
| 49 |
def connectionMade(self): |
|---|
| 50 |
""" |
|---|
| 51 |
Direct mapping of the protocol's C{connectionMade} method. |
|---|
| 52 |
""" |
|---|
| 53 |
self.proto.connectionMade() |
|---|
| 54 |
if callable(self.startCallback): |
|---|
| 55 |
self.startCallback() |
|---|
| 56 |
|
|---|
| 57 |
def outReceived(self, data): |
|---|
| 58 |
""" |
|---|
| 59 |
Output received from the process via STDOUT is piped to the protocol as |
|---|
| 60 |
data received. |
|---|
| 61 |
""" |
|---|
| 62 |
self.proto.dataReceived(data) |
|---|
| 63 |
|
|---|
| 64 |
def processEnded(self, reason): |
|---|
| 65 |
""" |
|---|
| 66 |
When the process ends, the connection is lost. |
|---|
| 67 |
""" |
|---|
| 68 |
self.connectionLost(reason) |
|---|
| 69 |
|
|---|
| 70 |
def makeConnection(self, transport): |
|---|
| 71 |
""" |
|---|
| 72 |
""" |
|---|
| 73 |
self.proto.transport = transport |
|---|
| 74 |
self.connectionMade() |
|---|
| 75 |
|
|---|
| 76 |
def dataReceived(self, data): |
|---|
| 77 |
""" |
|---|
| 78 |
Direct mapping of the protocol's C{dataReceived} method. |
|---|
| 79 |
""" |
|---|
| 80 |
self.proto.dataReceived(data) |
|---|
| 81 |
|
|---|
| 82 |
def connectionLost(self, reason): |
|---|
| 83 |
""" |
|---|
| 84 |
Direct mapping of the protocol's C{connectionLost} method. |
|---|
| 85 |
""" |
|---|
| 86 |
self.proto.connectionLost(reason) |
|---|
| 87 |
if callable(self.stopCallback): |
|---|
| 88 |
self.stopCallback() |
|---|
| 89 |
|
|---|
| 90 |
|
|---|
| 91 |
class ChildManager(jobs.JobManager): |
|---|
| 92 |
""" |
|---|
| 93 |
I am a L{jobs.JobManager} that manages a pool of one or more child python |
|---|
| 94 |
interpreters as at least some of its workers. |
|---|
| 95 |
""" |
|---|
| 96 |
def _get_children(self): |
|---|
| 97 |
if not hasattr(self, '_children'): |
|---|
| 98 |
self._children = {} |
|---|
| 99 |
return self._children |
|---|
| 100 |
children = property(_get_children) |
|---|
| 101 |
|
|---|
| 102 |
@defer.deferredGenerator |
|---|
| 103 |
def startup(self, N=1, niceness=0): |
|---|
| 104 |
""" |
|---|
| 105 |
Starts I{N} child interpreters and attaches instances of |
|---|
| 106 |
L{jobs.ChildWorker} for each of them to my queue. |
|---|
| 107 |
|
|---|
| 108 |
The workers are set to accept just one job run at a time because |
|---|
| 109 |
network latency isn't an issue. The PB connection is via STDIO. |
|---|
| 110 |
|
|---|
| 111 |
Returns a deferred that fires with a list of the IDs for the |
|---|
| 112 |
interpreters when they have all been started and the queue is ready to |
|---|
| 113 |
accept jobs for them. |
|---|
| 114 |
""" |
|---|
| 115 |
for k in xrange(N): |
|---|
| 116 |
wfd = defer.waitForDeferred(self.spawnChild(niceness)) |
|---|
| 117 |
yield wfd |
|---|
| 118 |
process, root = wfd.getResult() |
|---|
| 119 |
wfd = defer.waitForDeferred(self.attachChild(root, 1)) |
|---|
| 120 |
yield wfd |
|---|
| 121 |
childID = wfd.getResult() |
|---|
| 122 |
self.children[childID] = process |
|---|
| 123 |
yield self.children.keys() |
|---|
| 124 |
|
|---|
| 125 |
def spawnChild(self, niceness=0): |
|---|
| 126 |
""" |
|---|
| 127 |
Connects my factory through STDIO to a child python interpreter process. |
|---|
| 128 |
|
|---|
| 129 |
The child process should be enabled through a STDIO-wrapped version of |
|---|
| 130 |
its server-side PB broker. |
|---|
| 131 |
|
|---|
| 132 |
Returns a deferred that fires with the child's process and PB root |
|---|
| 133 |
objects. |
|---|
| 134 |
""" |
|---|
| 135 |
if not isinstance(niceness, int) or niceness < 0 or niceness > 19: |
|---|
| 136 |
raise TypeError("Niceness level must be an integer 0-19") |
|---|
| 137 |
factory = pb.PBClientFactory() |
|---|
| 138 |
broker = factory.buildProtocol(('127.0.0.1',)) |
|---|
| 139 |
d = defer.Deferred() |
|---|
| 140 |
wrappedProtocol = ProtocolWrapper( |
|---|
| 141 |
broker, startCallback=lambda: d.callback(None)) |
|---|
| 142 |
code = "; ".join(CHILD_CODE.strip().split("\n")) |
|---|
| 143 |
args = (sys.executable, '-c', code, str(niceness)) |
|---|
| 144 |
process = reactor.spawnProcess( |
|---|
| 145 |
wrappedProtocol, sys.executable, args=args, env=None) |
|---|
| 146 |
d.addCallback(lambda _: factory.getRootObject()) |
|---|
| 147 |
d.addCallback(lambda root: (process, root)) |
|---|
| 148 |
return d |
|---|
| 149 |
|
|---|
| 150 |
def shutdown(self): |
|---|
| 151 |
def wrapUp(null): |
|---|
| 152 |
for process in self.children.itervalues(): |
|---|
| 153 |
process.loseConnection() |
|---|
| 154 |
self.children.clear() |
|---|
| 155 |
|
|---|
| 156 |
return jobs.JobManager.shutdown(self).addCallback(wrapUp) |
|---|
| 157 |
|
|---|
| 158 |
def detachChild(self, childID): |
|---|
| 159 |
result = jobs.JobManager.detachChild(self, childID) |
|---|
| 160 |
if childID in self.children: |
|---|
| 161 |
process = self.children.pop(childID) |
|---|
| 162 |
process.loseConnection() |
|---|
| 163 |
return result |
|---|
| 164 |
|
|---|
| 165 |
|
|---|
| 166 |
def runAsChild(): |
|---|
| 167 |
""" |
|---|
| 168 |
This function takes care of everything needed for a Python interpreter to |
|---|
| 169 |
act as a child process worker. |
|---|
| 170 |
""" |
|---|
| 171 |
root = jobs.ChildRoot() |
|---|
| 172 |
root.trusted = True |
|---|
| 173 |
factory = pb.PBServerFactory(root) |
|---|
| 174 |
broker = factory.buildProtocol(('127.0.0.1',)) |
|---|
| 175 |
wrappedProtocol = ProtocolWrapper(broker) |
|---|
| 176 |
stdio.StandardIO(wrappedProtocol) |
|---|
| 177 |
reactor.run() |
|---|