- Timestamp:
- 07/09/08 21:15:09 (5 months ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynQueue/trunk/asynqueue/processworker.py
r88 r214 29 29 30 30 import jobs 31 32 CHILD_CODE = """ 33 import os, sys 34 os.renice(int(sys.argv[-1])) 35 from asynqueue import processworker 36 processworker.runAsChild() 37 """ 31 38 32 39 … … 87 94 interpreters as at least some of its workers. 88 95 """ 96 def _get_children(self): 97 if not hasattr(self, '_children'): 98 self._children = {} 99 return self._children 100 children = property(_get_children) 101 89 102 @defer.deferredGenerator 90 def startup(self, N=1 ):103 def startup(self, N=1, niceness=0): 91 104 """ 92 105 Starts I{N} child interpreters and attaches instances of … … 100 113 accept jobs for them. 101 114 """ 102 if not hasattr(self, 'children'):103 self.children = {}104 115 for k in xrange(N): 105 wfd = defer.waitForDeferred(self.spawnChild( ))116 wfd = defer.waitForDeferred(self.spawnChild(niceness)) 106 117 yield wfd 107 118 process, root = wfd.getResult() … … 112 123 yield self.children.keys() 113 124 114 def spawnChild(self ):125 def spawnChild(self, niceness=0): 115 126 """ 116 127 Connects my factory through STDIO to a child python interpreter process. … … 122 133 objects. 123 134 """ 135 if not isinstance(niceness, int) or niceness < 0 or niceness > 19: 136 raise TypeError("Niceness level must be an integer 0-19") 124 137 factory = pb.PBClientFactory() 125 138 broker = factory.buildProtocol(('127.0.0.1',)) … … 127 140 wrappedProtocol = ProtocolWrapper( 128 141 broker, startCallback=lambda: d.callback(None)) 129 args = (sys.executable, os.path.abspath(__file__)) 142 code = "; ".join(CHILD_CODE.strip().split("\n")) 143 args = (sys.executable, '-c', code, str(niceness)) 130 144 process = reactor.spawnProcess( 131 145 wrappedProtocol, sys.executable, args=args, env=None) … … 133 147 d.addCallback(lambda root: (process, root)) 134 148 return d 135 149 136 150 def shutdown(self): 137 151 def wrapUp(null): … … 162 176 stdio.StandardIO(wrappedProtocol) 163 177 reactor.run() 164 165 166 if __name__ == '__main__':167 runAsChild()projects/AsynQueue/trunk/asynqueue/test/test_processworker.py
r213 r214 44 44 def check(result): 45 45 process, root = result 46 self.mgr. children = {1:process}46 self.mgr._children = {1:process} 47 47 d = root.callRemote('newJob', JOB_ID, JOB_CODE) 48 48 d.addCallback(self.failUnlessEqual, (True, ['test'])) … … 56 56 yield wfd 57 57 process, root = wfd.getResult() 58 self.mgr. children = {1:process}58 self.mgr._children = {1:process} 59 59 wfd = defer.waitForDeferred( 60 60 root.callRemote('newJob', JOB_ID, JOB_CODE))
