root/projects/AsynQueue/trunk/asynqueue/processworker.py

Revision 214, 5.9 kB (checked in by edsuom, 6 months ago)

Allow worker processes to be run with lower priority

Line 
1 # AsynQueue:
2 # Asynchronous task queueing based on the Twisted framework, with task
3 # prioritization and a powerful worker/manager interface.
4 #
5 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
6 #
7 # This program is free software; you can redistribute it and/or modify it under
8 # the terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # This program is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # this program; if not, write to the Free Software Foundation, Inc., 51
18 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
19
20 """
21 An IWorker implementation using Perspective Broker (PB) over STDIO.
22
23 Based on coding by Konrads Smelkovs
24 """
25
26 import sys, os.path
27 from twisted.internet import protocol, stdio, reactor, defer
28 from twisted.spread import pb
29
30 import jobs
31
32 CHILD_CODE = """
33 import os, sys
34 os.renice(int(sys.argv[-1]))
35 from asynqueue import processworker
36 processworker.runAsChild()
37 """
38
39
40 class ProtocolWrapper(protocol.ProcessProtocol):
41     """
42     I wrap a L{Protocol} instance in a L{ProcessProtocol} instance so that...
43     """
44     def __init__(self, proto, startCallback=None, stopCallback=None):
45         self.proto = proto
46         self.startCallback = startCallback
47         self.stopCallback = stopCallback
48
49     def connectionMade(self):
50         """
51         Direct mapping of the protocol's C{connectionMade} method.
52         """
53         self.proto.connectionMade()
54         if callable(self.startCallback):
55             self.startCallback()
56    
57     def outReceived(self, data):
58         """
59         Output received from the process via STDOUT is piped to the protocol as
60         data received.
61         """
62         self.proto.dataReceived(data)
63
64     def processEnded(self, reason):
65         """
66         When the process ends, the connection is lost.
67         """
68         self.connectionLost(reason)
69    
70     def makeConnection(self, transport):
71         """
72         """
73         self.proto.transport = transport
74         self.connectionMade()
75        
76     def dataReceived(self, data):
77         """
78         Direct mapping of the protocol's C{dataReceived} method.
79         """
80         self.proto.dataReceived(data)
81        
82     def connectionLost(self, reason):
83         """
84         Direct mapping of the protocol's C{connectionLost} method.
85         """
86         self.proto.connectionLost(reason)
87         if callable(self.stopCallback):
88             self.stopCallback()
89
90
91 class ChildManager(jobs.JobManager):
92     """
93     I am a L{jobs.JobManager} that manages a pool of one or more child python
94     interpreters as at least some of its workers.
95     """
96     def _get_children(self):
97         if not hasattr(self, '_children'):
98             self._children = {}
99         return self._children
100     children = property(_get_children)
101    
102     @defer.deferredGenerator
103     def startup(self, N=1, niceness=0):
104         """
105         Starts I{N} child interpreters and attaches instances of
106         L{jobs.ChildWorker} for each of them to my queue.
107
108         The workers are set to accept just one job run at a time because
109         network latency isn't an issue. The PB connection is via STDIO.
110
111         Returns a deferred that fires with a list of the IDs for the
112         interpreters when they have all been started and the queue is ready to
113         accept jobs for them.
114         """
115         for k in xrange(N):
116             wfd = defer.waitForDeferred(self.spawnChild(niceness))
117             yield wfd
118             process, root = wfd.getResult()
119             wfd = defer.waitForDeferred(self.attachChild(root, 1))
120             yield wfd
121             childID = wfd.getResult()
122             self.children[childID] = process
123         yield self.children.keys()
124    
125     def spawnChild(self, niceness=0):
126         """
127         Connects my factory through STDIO to a child python interpreter process.
128
129         The child process should be enabled through a STDIO-wrapped version of
130         its server-side PB broker.
131
132         Returns a deferred that fires with the child's process and PB root
133         objects.
134         """
135         if not isinstance(niceness, int) or niceness < 0 or niceness > 19:
136             raise TypeError("Niceness level must be an integer 0-19")
137         factory = pb.PBClientFactory()
138         broker = factory.buildProtocol(('127.0.0.1',))
139         d = defer.Deferred()
140         wrappedProtocol = ProtocolWrapper(
141             broker, startCallback=lambda: d.callback(None))
142         code = "; ".join(CHILD_CODE.strip().split("\n"))
143         args = (sys.executable, '-c', code, str(niceness))
144         process = reactor.spawnProcess(
145             wrappedProtocol, sys.executable, args=args, env=None)
146         d.addCallback(lambda _: factory.getRootObject())
147         d.addCallback(lambda root: (process, root))
148         return d
149    
150     def shutdown(self):
151         def wrapUp(null):
152             for process in self.children.itervalues():
153                 process.loseConnection()
154             self.children.clear()
155        
156         return jobs.JobManager.shutdown(self).addCallback(wrapUp)
157
158     def detachChild(self, childID):
159         result = jobs.JobManager.detachChild(self, childID)
160         if childID in self.children:
161             process = self.children.pop(childID)
162             process.loseConnection()
163         return result
164
165
166 def runAsChild():
167     """
168     This function takes care of everything needed for a Python interpreter to
169     act as a child process worker.
170     """
171     root = jobs.ChildRoot()
172     root.trusted = True
173     factory = pb.PBServerFactory(root)
174     broker = factory.buildProtocol(('127.0.0.1',))
175     wrappedProtocol = ProtocolWrapper(broker)
176     stdio.StandardIO(wrappedProtocol)
177     reactor.run()
Note: See TracBrowser for help on using the browser.