Changeset 83

Show
Ignore:
Timestamp:
08/25/07 01:16:21 (1 year ago)
Author:
edsuom
Message:

Continuing Quiotic quest to get process-based WSGI put together

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/gevolver/grammar.py

    r81 r83  
    112112        """ 
    113113        k = self._k + 1 
    114         return self.sentence[ : self._k + 1 + N] 
     114        return self.sentence[k:k+N] 
    115115 
    116116 
     
    158158    def value(self, s): 
    159159        if callable(self.valuer): 
    160             return self.valuer(s, a
    161         return value 
     160            return self.valuer(s
     161        return self.valuer 
    162162 
    163163 
     
    178178 
    179179    def openBrace(self, a): 
    180         self.closer = 'closeParen
     180        self.closer = 'closeBrace
    181181        self.next = a.next 
    182182        return Terminal(self._evalUntilClose, label='{') 
  • projects/AsynQueue/trunk/asynqueue/processworker.py

    r79 r83  
    9595        them. 
    9696        """ 
    97         self.children = {} 
     97        if not hasattr(self, 'children'): 
     98            self.children = {} 
    9899        for k in xrange(N): 
    99100            wfd = defer.waitForDeferred(self.spawnChild()) 
  • projects/Twisted-Goodies/branches/simpleserver-process-wsgi/twisted_goodies/simpleserver/http/resources/external.py

    r64 r83  
    2323""" 
    2424 
    25 import trac.web.main 
     25import textwrap 
    2626import nevow 
    2727from zope.interface import implements 
     
    5050    def __init__(self, path, env={}): 
    5151        self.path, self.env = path, env 
    52         wsgi.WSGIResource.__init__(self, self.tracApplication) 
    5352 
    54     def tracApplication(self, environ, start_response): 
     53    def appCode(self): 
     54        code = """ 
     55        import trac.web.main 
     56         
     57        def application(self, environ, start_response): 
     58            environ['trac.env_path'] = self.path 
     59            environ.update(self.env) 
     60            return trac.web.main.dispatch_request(environ, start_response) 
    5561        """ 
    56         This method is the callable object that provides access to my 
    57         particular Trac environment via WSGI. 
    58         """ 
    59         environ['trac.env_path'] = self.path 
    60         environ.update(self.env) 
    61         return trac.web.main.dispatch_request(environ, start_response) 
     62        return textwrap.dedent(code) 
    6263 
    6364 
  • projects/Twisted-Goodies/branches/simpleserver-process-wsgi/twisted_goodies/simpleserver/http/wsgi/parent.py

    r82 r83  
    2121 
    2222""" 
    23 A non-blocking container resource for WSGI web applications. 
     23A non-blocking container resource for WSGI web applications, run via a child 
     24worker process: parent code. 
    2425""" 
    2526 
     
    3031from twisted.internet import defer, interfaces, reactor 
    3132from twisted.python import log, failure 
     33from twisted.spread import pb 
    3234from twisted.web2 import http, http_headers 
    3335from twisted.web2 import iweb, server, stream, resource 
     
    3739 
    3840 
    39 MAX_PENDING = 10 
    40 IP_BAN_SECS = 30.0 
    41  
    4241 
    4342class AlreadyStartedResponse(Exception): 
     
    4544 
    4645 
    47 class WSGIMeta(type): 
    48     """ 
    49     This metaclass instantiates a child process manager and other queue-related 
    50     attributes that are common to all invoking classes and subclasses. 
    51     """ 
    52     def __new__(mcls, name, bases, dictionary): 
    53         if not hasattr(mcls, 'globalStuff'): 
    54             gs = mcls.globalStuff = {} 
    55             # Just use one worker in one thread for now 
    56             gs['mgr'] = ChildManager() 
    57             gs['pending'] = 0 
    58             gs['handlers'] = [] 
    59             gs['banned'] = {} 
    60         for name, value in mcls.globalStuff.iteritems(): 
    61             dictionary[name] = value 
    62         newClass = super(WSGIMeta, mcls).__new__(mcls, name, bases, dictionary) 
    63         return newClass 
    64  
    65  
    6646class WSGIResource(object): 
    6747    """ 
    68     A web2 Resource which wraps the given WSGI application callable. 
     48    An implementation of L{iweb.IResource} which wraps the given WSGI 
     49    application callable. 
    6950 
    7051    The WSGI application will be called in a separate process whenever a 
     
    7253    received. 
    7354 
    74     This isn't a subclass of resource.Resource, because it shouldn't do any 
    75     method-specific actions at all. All that stuff is totally up to the 
    76     contained wsgi application 
    77     """ 
    78     __metaclass__ = WSGIMeta 
    79  
    80     implements(iweb.IResource, interfaces.IFinishableConsumer) 
    81  
    82     def __init__(self, application): 
    83         self.application = application 
    84         self.mgr.queue.subscribe(self) 
     55    Your subclass must override the L{appCode} method to return a string of 
     56    python code that defines an C{application} function of the two standard 
     57    WSGI arguments, I{environ} and I{start_response}. 
     58    """ 
     59    implements(iweb.IResource) 
     60 
     61    def appCode(self): 
     62        raise NotImplementedError( 
     63            "You must override this with your own appcode-generating method") 
     64 
     65    def startChild(self): 
     66        def started(childID): 
     67            self.childID = childID 
     68            if not hasattr(self, 'jobID'): 
     69                fh = open(util.sibpath(__file__, 'child.py')) 
     70                jobCode = fh.read() 
     71                fh.close() 
     72                d = self.mgr.new(jobCode) 
     73                d.addCallback(lambda jobID: setattr(self, 'jobID', jobID)) 
     74                return d 
     75         
     76        return mgr.startup(1).addCallback(started) 
    8577 
    8678    def renderHTTP(self, req): 
     
    9082            if handler in self.handlers: 
    9183                self.handlers.remove(handler) 
    92                  
    93         # Do stuff with WSGIHandler 
    94         IP = req.remoteAddr.host 
    95         if IP in self.banned: 
    96             return self.denial() 
    97         handler = WSGIHandler(self.application, req) 
    98         self.handlers.append(handler) 
    99         # Queue it up for running in the thread 
    100         self.queue.call(handler.run).addBoth(done, IP) 
    101         # We get the result piecemeal from this method call's unique handler, 
    102         # not in some single result of the run method. So there's no need to do 
    103         # anything with the deferred from the queuing call. We can queue some 
    104         # more requests right away if we want. 
    105         return handler.responseDeferred 
    106  
    107     def banIP(self, IP): 
    108         """ 
    109         Temporarily bans the specified IP address from making connections. 
    110         """ 
    111         if IP in self.banned: 
    112             d, delayedCall = self.banned[IP] 
    113             delayedCall.cancel() 
    114             d.callback(IP) 
    115         d = defer.Deferred() 
    116         d.addCallback(self.banned.pop) 
    117         delayedCall = reactor.callLater(IP_BAN_SECS, d.callback, IP) 
    118         self.banned[IP] = d, delayedCall 
     84 
     85        def do(null): 
     86            # Do stuff with WSGIHandler 
     87            IP = req.remoteAddr.host 
     88            if IP in self.banned: 
     89                return self.denial() 
     90            handler = WSGIHandler(self.application, req) 
     91            self.handlers.append(handler) 
     92            # Queue it up for running in the child process 
     93            self.queue.call(handler.run).addBoth(done, IP) 
     94            # We get the result piecemeal from this method call's unique handler, 
     95            # not in some single result of the run method. So there's no need to do 
     96            # anything with the deferred from the queuing call. We can queue some 
     97            # more requests right away if we want. 
     98            return handler.responseDeferred 
     99 
     100        if hasattr(self, 'mgr'): 
     101            return do(None) 
     102        self.mgr = ChildManager() 
    119103     
    120     def denial(self): 
    121         title = "Access Denied" 
    122         html  = "<html>" 
    123         html += "<head><title>%s</title></head>" % title 
    124         html += "<body>" 
    125         html += "<h1>%s</h1>" % title 
    126         html += "<p>Access from your IP has been temporarily denied</p>" 
    127         html += "</body></html>" 
    128         return http.Response( 
    129             200, 
    130             {'content-type': http_headers.MimeType('text', 'html')}, html) 
     104         
     105         
     106        if hasattr(self, 'd'): 
     107            d = defer.Deferred().addCallback(do) 
     108            self.d.chainDeferred(d) 
     109            return d 
     110        return do(None) 
    131111     
    132112    def locateChild(self, request, segments): 
    133113        return self, server.StopTraversal 
    134114 
    135     def registerProducer(self, producer, streaming): 
    136         pass 
    137  
    138     def unregisterProducer(self): 
    139         pass 
    140  
    141     @classmethod 
    142     def write(cls, new): 
    143         old = cls.pending 
    144         cls.pending = new 
    145         if new > MAX_PENDING and cls.handlers: 
    146             if new > min([old, 2*MAX_PENDING]): 
    147                 oldestHandler = cls.handlers.pop(0) 
    148                 oldestHandler.stopProducing() 
    149  
    150     def finish(self): 
    151         while self.handlers: 
    152             handler = self.handlers.pop(0) 
    153             handler.inputStream.close() 
    154         del self.application 
    155  
    156  
    157 class FinishableBufferedStream(stream.BufferedStream): 
    158     """ 
    159     """ 
    160     def __init__(self, stream): 
    161         self.running = True 
    162         super(FinishableBufferedStream, self).__init__(stream) 
    163  
    164     def _readUntil(self, f): 
    165         """ 
    166         Non-borked internal helper function which repeatedly calls f each 
    167         time after more data has been received, until it returns non-None. 
    168         """ 
    169         while self.running: 
    170             r = f() 
    171             if r is not None: 
    172                 yield r; return 
    173              
    174             newdata = self.stream.read() 
    175             if isinstance(newdata, defer.Deferred): 
    176                 newdata = defer.waitForDeferred(newdata) 
    177                 yield newdata 
    178                 newdata = newdata.getResult() 
    179              
    180             if newdata is None: 
    181                 # End Of File 
    182                 newdata = self.data 
    183                 self.data = '' 
    184                 yield newdata 
    185                 return 
    186             self.data += str(newdata) 
    187     _readUntil = defer.deferredGenerator(_readUntil) 
    188      
    189     def finish(self): 
    190         self.stream.finish() 
    191         self.running = False 
    192  
    193  
    194 class InputStream(object): 
    195     """ 
    196     This class implements the 'wsgi.input' object. The methods are 
    197     expected to have the same behavior as the same-named methods for 
    198     python's builtin file object. 
    199     """ 
    200     def __init__(self, newstream): 
    201         # Called in IO thread 
    202         self.stream = FinishableBufferedStream(newstream) 
    203          
    204     def callInReactor(self, f, *args, **kw): 
    205         """ 
    206         Read a line, delimited by a newline. If the stream reaches EOF 
    207         or size bytes have been read before reaching a newline (if 
    208         size is given), the partial line is returned. 
    209  
    210         COMPATIBILITY NOTE: the size argument is excluded from the 
    211         WSGI specification, but is provided here anyhow, because 
    212         useful libraries such as python stdlib's cgi.py assume their 
    213         input file-like-object supports readline with a size 
    214         argument. If you use it, be aware your application may not be 
    215         portable to other conformant WSGI servers. 
    216         """ 
    217         def call(queue): 
    218             result = defer.maybeDeferred(f, *args, **kw) 
    219             result.addBoth(queue.put) 
    220  
    221         from twisted.internet import reactor 
    222         queue = Queue.Queue() 
    223         reactor.callFromThread(call, queue) 
    224         result = queue.get() 
    225         if isinstance(result, failure.Failure): 
    226             result.raiseException() 
    227         return result 
    228          
    229     def read(self, size=None): 
    230         """ 
    231         Read at most size bytes from the input, or less if EOF is 
    232         encountered. If size is ommitted or negative, read until EOF. 
    233         """ 
    234         # Called in application thread 
    235         if size < 0: 
    236             size = None 
    237         result = self.callInReactor(self.stream.readExactly, size) 
    238         return result 
    239  
    240     def readline(self): 
    241         # Called in application thread 
    242         line = self.callInReactor(self.stream.readline, '\n') 
    243         if line is not None: 
    244             line += '\n' 
    245         return line 
    246      
    247     def readlines(self, hint=None): 
    248         """ 
    249         Read until EOF, collecting all lines in a list, and returns 
    250         that list. The hint argument is ignored (as is allowed in the 
    251         API specification) 
    252         """ 
    253         # Called in application thread 
    254         data = self.read() 
    255         lines = data.split('\n') 
    256         last = lines.pop() 
    257         lines = [s+'\n' for s in lines] 
    258         if last != '': 
    259             lines.append(last) 
    260         return lines 
    261  
    262     def finish(self): 
    263         """ 
    264         Call this if this input stream is backing things up too much. 
    265         """ 
    266         self.stream.finish() 
    267  
    268     def __iter__(self): 
    269         """ 
    270         Returns an iterator, each iteration of which returns the 
    271         result of readline(), and stops when readline() returns an 
    272         empty string. 
    273         """ 
    274         while True: 
    275             line = self.readline() 
    276             if not line: 
    277                 return 
    278             yield line 
    279  
    280      
    281 class ErrorStream(object): 
    282     """ 
    283     This class implements the 'wsgi.error' object. 
    284     """ 
    285     def flush(self): 
    286         # Called in application thread 
    287         return 
    288  
    289     def write(self, s): 
    290         # Called in application thread 
    291         log.msg("WSGI app error: "+s, isError=True) 
    292  
    293     def writelines(self, seq): 
    294         # Called in application thread 
    295         s = ''.join(seq) 
    296         log.msg("WSGI app error: "+s, isError=True) 
     115 
    297116 
    298117