Changeset 214 for projects

Show
Ignore:
Timestamp:
07/09/08 21:15:09 (5 months ago)
Author:
edsuom
Message:

Allow worker processes to be run with lower priority

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/processworker.py

    r88 r214  
    2929 
    3030import jobs 
     31 
     32CHILD_CODE = """ 
     33import os, sys 
     34os.renice(int(sys.argv[-1])) 
     35from asynqueue import processworker 
     36processworker.runAsChild() 
     37""" 
    3138 
    3239 
     
    8794    interpreters as at least some of its workers. 
    8895    """ 
     96    def _get_children(self): 
     97        if not hasattr(self, '_children'): 
     98            self._children = {} 
     99        return self._children 
     100    children = property(_get_children) 
     101     
    89102    @defer.deferredGenerator 
    90     def startup(self, N=1): 
     103    def startup(self, N=1, niceness=0): 
    91104        """ 
    92105        Starts I{N} child interpreters and attaches instances of 
     
    100113        accept jobs for them. 
    101114        """ 
    102         if not hasattr(self, 'children'): 
    103             self.children = {} 
    104115        for k in xrange(N): 
    105             wfd = defer.waitForDeferred(self.spawnChild()) 
     116            wfd = defer.waitForDeferred(self.spawnChild(niceness)) 
    106117            yield wfd 
    107118            process, root = wfd.getResult() 
     
    112123        yield self.children.keys() 
    113124     
    114     def spawnChild(self): 
     125    def spawnChild(self, niceness=0): 
    115126        """ 
    116127        Connects my factory through STDIO to a child python interpreter process. 
     
    122133        objects. 
    123134        """ 
     135        if not isinstance(niceness, int) or niceness < 0 or niceness > 19: 
     136            raise TypeError("Niceness level must be an integer 0-19") 
    124137        factory = pb.PBClientFactory() 
    125138        broker = factory.buildProtocol(('127.0.0.1',)) 
     
    127140        wrappedProtocol = ProtocolWrapper( 
    128141            broker, startCallback=lambda: d.callback(None)) 
    129         args = (sys.executable, os.path.abspath(__file__)) 
     142        code = "; ".join(CHILD_CODE.strip().split("\n")) 
     143        args = (sys.executable, '-c', code, str(niceness)) 
    130144        process = reactor.spawnProcess( 
    131145            wrappedProtocol, sys.executable, args=args, env=None) 
     
    133147        d.addCallback(lambda root: (process, root)) 
    134148        return d 
    135  
     149     
    136150    def shutdown(self): 
    137151        def wrapUp(null): 
     
    162176    stdio.StandardIO(wrappedProtocol) 
    163177    reactor.run() 
    164  
    165  
    166 if __name__ == '__main__': 
    167     runAsChild() 
  • projects/AsynQueue/trunk/asynqueue/test/test_processworker.py

    r213 r214  
    4444        def check(result): 
    4545            process, root = result 
    46             self.mgr.children = {1:process} 
     46            self.mgr._children = {1:process} 
    4747            d = root.callRemote('newJob', JOB_ID, JOB_CODE) 
    4848            d.addCallback(self.failUnlessEqual, (True, ['test'])) 
     
    5656        yield wfd 
    5757        process, root = wfd.getResult() 
    58         self.mgr.children = {1:process} 
     58        self.mgr._children = {1:process} 
    5959        wfd = defer.waitForDeferred( 
    6060            root.callRemote('newJob', JOB_ID, JOB_CODE))