Changeset 84

Show
Ignore:
Timestamp:
09/14/07 14:27:32 (1 year ago)
Author:
edsuom
Message:

Processworker-related tweaks

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynQueue/trunk/asynqueue/jobs.py

    r79 r84  
    2424 
    2525from zope.interface import implements, Interface 
    26 from twisted.internet import defer 
     26from twisted.internet import defer, reactor 
    2727from twisted.python.failure import Failure 
    2828from twisted.spread import pb, flavors 
     
    4242        if name == 'jobs': 
    4343            result = self.jobs = {} 
    44         elif name == 'queue': 
    45             result = self.queue = base.TaskQueue(workers.ThreadWorker()) 
    4644        elif name == 'trusted': 
    4745            result = self.trusted = False 
     
    5553        represented by Python code contained in the string I{jobCode}. 
    5654 
     55        The namespace can include a no-argument function C{shutdown}, which may 
     56        return a deferred. If it is present, the function will be called before 
     57        my process exits. 
     58 
    5759        Returns C{True} if the I{jobCode} was accepted and executed OK, or 
    5860        C{False} otherwise. 
    5961        """ 
    60         def tryJob(): 
    61             try: 
    62                 namespace = {} 
    63                 exec jobCode in namespace 
    64             except: 
    65                 return False 
    66             self.jobs[jobID] = namespace 
    67             return True 
    68  
    6962        if not self.trusted: 
    7063            return False 
    71         return self.queue.call(tryJob, niceness=-10) 
     64        try: 
     65            namespace = {} 
     66            exec jobCode in namespace 
     67        except: 
     68            return False 
     69        self.jobs[jobID] = namespace 
     70        return True 
    7271     
    7372    def remote_runJob(self, jobID, callName, *args, **kw): 
    7473        """ 
    75         Runs the specified callable object with any supplied arguments and 
    76         keywords in the namespace of the specified I{jobID}. 
    77          
    78         @param jobID: An integer uniquely specifying an already-registered job. 
    79  
    80         @param callName: A string with the name of a callable object in the 
    81           job's namespace. 
    82  
    83         @args: Any arguments for the call. 
    84  
    85         @kw: Any keywords for the call. 
    86          
    87         @return: A deferred to the eventual result of the call. 
    88          
     74        Calls the object that is present in the namespace of I{jobID} under the 
     75        specified I{callName}, with any supplied arguments and 
     76        keywords. Returns the result of the call, which may be a deferred. 
    8977        """ 
    9078        if not self.trusted: 
     
    9280        calledObject = getattr(self.jobs[jobID], callName, None) 
    9381        if callable(calledObject): 
    94             return self.queue.call(calledObject, *args, **kw) 
     82            return calledObject(*args, **kw) 
    9583        raise AttributeError( 
    9684            "No callable object '%s' defined in namespace for job %d" \ 
     
    9987    def remote_exit(self): 
    10088        """ 
    101         Terminates the child worker process. 
    102         """ 
    103         self.queue.shutdown().addCallback(lambda _: reactor.stop()) 
     89        Terminates my child worker process, calling and waiting for any 
     90        C{shutdown} methods present in my jobs' namespaces before stopping my 
     91        reactor. 
     92        """ 
     93        dList = [] 
     94        for namespace in self.jobs.itervalues(): 
     95            possibleShutdownFunction = namespace.get('shutdown', None) 
     96            if callable(possibleShutdownFunction): 
     97                dList.append(defer.maybeDeferred(possibleShutdownFunction)) 
     98        return defer.DeferredList(dList).addCallback(lambda _: reactor.stop()) 
    10499 
    105100 
     
    158153        return self.queue.shutdown() 
    159154 
    160     def attachChild(self, childRoot): 
    161         """ 
    162         Attaches a new child interpreter worker. 
    163          
    164         @return: A deferred that fires with a unique ID for the interpreter 
    165           when all of the currently registered jobs have been tried on it. 
    166          
     155    def attachChild(self, childRoot, N=3): 
     156        """ 
     157        Attaches a new child interpreter worker using the supplied I{childRoot} 
     158        PB root reference. 
     159 
     160        The default number (3) of job runs that the worker is willing to queue 
     161        up on its end can be overridden with the I{N} keyword. 
     162         
     163        Returns a deferred that fires with a unique ID for the interpreter when 
     164        all of the currently registered jobs have been tried on it. 
    167165        """ 
    168166        def tried(success, jobID): 
     
    175173                    TypeError("Supplied root reference doesn't conform")) 
    176174 
    177         worker = ChildWorker(childRoot
     175        worker = ChildWorker(childRoot, N
    178176        result = [self.queue.attachWorker(worker)] 
    179177        dList = [] 
  • projects/AsynQueue/trunk/asynqueue/processworker.py

    r83 r84  
    9090    def startup(self, N=1): 
    9191        """ 
    92         Starts I{N} child interpreters and attaches them to my queue. Returns a 
    93         deferred that fires with a list of the IDs for the interpreters when 
    94         they have all been started and the queue is ready to accept jobs for 
    95         them. 
     92        Starts I{N} child interpreters and attaches instances of 
     93        L{jobs.ChildWorker} for each of them to my queue. 
     94 
     95        The workers are set to accept just one job run at a time because 
     96        network latency isn't an issue. The PB connection is via STDIO. 
     97 
     98        Returns a deferred that fires with a list of the IDs for the 
     99        interpreters when they have all been started and the queue is ready to 
     100        accept jobs for them. 
    96101        """ 
    97102        if not hasattr(self, 'children'): 
     
    101106            yield wfd 
    102107            process, root = wfd.getResult() 
    103             wfd = defer.waitForDeferred(self.attachChild(root)) 
     108            wfd = defer.waitForDeferred(self.attachChild(root, 1)) 
    104109            yield wfd 
    105110            childID = wfd.getResult() 
  • projects/Twisted-Goodies/branches/simpleserver-process-wsgi/twisted_goodies/simpleserver/http/wsgi/parent.py

    r83 r84  
    7070                jobCode = fh.read() 
    7171                fh.close() 
     72                jobCode += '\n' + self.appCode() 
    7273                d = self.mgr.new(jobCode) 
    7374                d.addCallback(lambda jobID: setattr(self, 'jobID', jobID))