| 74 | | This isn't a subclass of resource.Resource, because it shouldn't do any |
|---|
| 75 | | method-specific actions at all. All that stuff is totally up to the |
|---|
| 76 | | contained wsgi application |
|---|
| 77 | | """ |
|---|
| 78 | | __metaclass__ = WSGIMeta |
|---|
| 79 | | |
|---|
| 80 | | implements(iweb.IResource, interfaces.IFinishableConsumer) |
|---|
| 81 | | |
|---|
| 82 | | def __init__(self, application): |
|---|
| 83 | | self.application = application |
|---|
| 84 | | self.mgr.queue.subscribe(self) |
|---|
| | 55 | Your subclass must override the L{appCode} method to return a string of |
|---|
| | 56 | python code that defines an C{application} function of the two standard |
|---|
| | 57 | WSGI arguments, I{environ} and I{start_response}. |
|---|
| | 58 | """ |
|---|
| | 59 | implements(iweb.IResource) |
|---|
| | 60 | |
|---|
| | 61 | def appCode(self): |
|---|
| | 62 | raise NotImplementedError( |
|---|
| | 63 | "You must override this with your own appcode-generating method") |
|---|
| | 64 | |
|---|
| | 65 | def startChild(self): |
|---|
| | 66 | def started(childID): |
|---|
| | 67 | self.childID = childID |
|---|
| | 68 | if not hasattr(self, 'jobID'): |
|---|
| | 69 | fh = open(util.sibpath(__file__, 'child.py')) |
|---|
| | 70 | jobCode = fh.read() |
|---|
| | 71 | fh.close() |
|---|
| | 72 | d = self.mgr.new(jobCode) |
|---|
| | 73 | d.addCallback(lambda jobID: setattr(self, 'jobID', jobID)) |
|---|
| | 74 | return d |
|---|
| | 75 | |
|---|
| | 76 | return mgr.startup(1).addCallback(started) |
|---|
| 92 | | |
|---|
| 93 | | # Do stuff with WSGIHandler |
|---|
| 94 | | IP = req.remoteAddr.host |
|---|
| 95 | | if IP in self.banned: |
|---|
| 96 | | return self.denial() |
|---|
| 97 | | handler = WSGIHandler(self.application, req) |
|---|
| 98 | | self.handlers.append(handler) |
|---|
| 99 | | # Queue it up for running in the thread |
|---|
| 100 | | self.queue.call(handler.run).addBoth(done, IP) |
|---|
| 101 | | # We get the result piecemeal from this method call's unique handler, |
|---|
| 102 | | # not in some single result of the run method. So there's no need to do |
|---|
| 103 | | # anything with the deferred from the queuing call. We can queue some |
|---|
| 104 | | # more requests right away if we want. |
|---|
| 105 | | return handler.responseDeferred |
|---|
| 106 | | |
|---|
| 107 | | def banIP(self, IP): |
|---|
| 108 | | """ |
|---|
| 109 | | Temporarily bans the specified IP address from making connections. |
|---|
| 110 | | """ |
|---|
| 111 | | if IP in self.banned: |
|---|
| 112 | | d, delayedCall = self.banned[IP] |
|---|
| 113 | | delayedCall.cancel() |
|---|
| 114 | | d.callback(IP) |
|---|
| 115 | | d = defer.Deferred() |
|---|
| 116 | | d.addCallback(self.banned.pop) |
|---|
| 117 | | delayedCall = reactor.callLater(IP_BAN_SECS, d.callback, IP) |
|---|
| 118 | | self.banned[IP] = d, delayedCall |
|---|
| | 84 | |
|---|
| | 85 | def do(null): |
|---|
| | 86 | # Do stuff with WSGIHandler |
|---|
| | 87 | IP = req.remoteAddr.host |
|---|
| | 88 | if IP in self.banned: |
|---|
| | 89 | return self.denial() |
|---|
| | 90 | handler = WSGIHandler(self.application, req) |
|---|
| | 91 | self.handlers.append(handler) |
|---|
| | 92 | # Queue it up for running in the child process |
|---|
| | 93 | self.queue.call(handler.run).addBoth(done, IP) |
|---|
| | 94 | # We get the result piecemeal from this method call's unique handler, |
|---|
| | 95 | # not in some single result of the run method. So there's no need to do |
|---|
| | 96 | # anything with the deferred from the queuing call. We can queue some |
|---|
| | 97 | # more requests right away if we want. |
|---|
| | 98 | return handler.responseDeferred |
|---|
| | 99 | |
|---|
| | 100 | if hasattr(self, 'mgr'): |
|---|
| | 101 | return do(None) |
|---|
| | 102 | self.mgr = ChildManager() |
|---|
| 135 | | def registerProducer(self, producer, streaming): |
|---|
| 136 | | pass |
|---|
| 137 | | |
|---|
| 138 | | def unregisterProducer(self): |
|---|
| 139 | | pass |
|---|
| 140 | | |
|---|
| 141 | | @classmethod |
|---|
| 142 | | def write(cls, new): |
|---|
| 143 | | old = cls.pending |
|---|
| 144 | | cls.pending = new |
|---|
| 145 | | if new > MAX_PENDING and cls.handlers: |
|---|
| 146 | | if new > min([old, 2*MAX_PENDING]): |
|---|
| 147 | | oldestHandler = cls.handlers.pop(0) |
|---|
| 148 | | oldestHandler.stopProducing() |
|---|
| 149 | | |
|---|
| 150 | | def finish(self): |
|---|
| 151 | | while self.handlers: |
|---|
| 152 | | handler = self.handlers.pop(0) |
|---|
| 153 | | handler.inputStream.close() |
|---|
| 154 | | del self.application |
|---|
| 155 | | |
|---|
| 156 | | |
|---|
| 157 | | class FinishableBufferedStream(stream.BufferedStream): |
|---|
| 158 | | """ |
|---|
| 159 | | """ |
|---|
| 160 | | def __init__(self, stream): |
|---|
| 161 | | self.running = True |
|---|
| 162 | | super(FinishableBufferedStream, self).__init__(stream) |
|---|
| 163 | | |
|---|
| 164 | | def _readUntil(self, f): |
|---|
| 165 | | """ |
|---|
| 166 | | Non-borked internal helper function which repeatedly calls f each |
|---|
| 167 | | time after more data has been received, until it returns non-None. |
|---|
| 168 | | """ |
|---|
| 169 | | while self.running: |
|---|
| 170 | | r = f() |
|---|
| 171 | | if r is not None: |
|---|
| 172 | | yield r; return |
|---|
| 173 | | |
|---|
| 174 | | newdata = self.stream.read() |
|---|
| 175 | | if isinstance(newdata, defer.Deferred): |
|---|
| 176 | | newdata = defer.waitForDeferred(newdata) |
|---|
| 177 | | yield newdata |
|---|
| 178 | | newdata = newdata.getResult() |
|---|
| 179 | | |
|---|
| 180 | | if newdata is None: |
|---|
| 181 | | # End Of File |
|---|
| 182 | | newdata = self.data |
|---|
| 183 | | self.data = '' |
|---|
| 184 | | yield newdata |
|---|
| 185 | | return |
|---|
| 186 | | self.data += str(newdata) |
|---|
| 187 | | _readUntil = defer.deferredGenerator(_readUntil) |
|---|
| 188 | | |
|---|
| 189 | | def finish(self): |
|---|
| 190 | | self.stream.finish() |
|---|
| 191 | | self.running = False |
|---|
| 192 | | |
|---|
| 193 | | |
|---|
| 194 | | class InputStream(object): |
|---|
| 195 | | """ |
|---|
| 196 | | This class implements the 'wsgi.input' object. The methods are |
|---|
| 197 | | expected to have the same behavior as the same-named methods for |
|---|
| 198 | | python's builtin file object. |
|---|
| 199 | | """ |
|---|
| 200 | | def __init__(self, newstream): |
|---|
| 201 | | # Called in IO thread |
|---|
| 202 | | self.stream = FinishableBufferedStream(newstream) |
|---|
| 203 | | |
|---|
| 204 | | def callInReactor(self, f, *args, **kw): |
|---|
| 205 | | """ |
|---|
| 206 | | Read a line, delimited by a newline. If the stream reaches EOF |
|---|
| 207 | | or size bytes have been read before reaching a newline (if |
|---|
| 208 | | size is given), the partial line is returned. |
|---|
| 209 | | |
|---|
| 210 | | COMPATIBILITY NOTE: the size argument is excluded from the |
|---|
| 211 | | WSGI specification, but is provided here anyhow, because |
|---|
| 212 | | useful libraries such as python stdlib's cgi.py assume their |
|---|
| 213 | | input file-like-object supports readline with a size |
|---|
| 214 | | argument. If you use it, be aware your application may not be |
|---|
| 215 | | portable to other conformant WSGI servers. |
|---|
| 216 | | """ |
|---|
| 217 | | def call(queue): |
|---|
| 218 | | result = defer.maybeDeferred(f, *args, **kw) |
|---|
| 219 | | result.addBoth(queue.put) |
|---|
| 220 | | |
|---|
| 221 | | from twisted.internet import reactor |
|---|
| 222 | | queue = Queue.Queue() |
|---|
| 223 | | reactor.callFromThread(call, queue) |
|---|
| 224 | | result = queue.get() |
|---|
| 225 | | if isinstance(result, failure.Failure): |
|---|
| 226 | | result.raiseException() |
|---|
| 227 | | return result |
|---|
| 228 | | |
|---|
| 229 | | def read(self, size=None): |
|---|
| 230 | | """ |
|---|
| 231 | | Read at most size bytes from the input, or less if EOF is |
|---|
| 232 | | encountered. If size is ommitted or negative, read until EOF. |
|---|
| 233 | | """ |
|---|
| 234 | | # Called in application thread |
|---|
| 235 | | if size < 0: |
|---|
| 236 | | size = None |
|---|
| 237 | | result = self.callInReactor(self.stream.readExactly, size) |
|---|
| 238 | | return result |
|---|
| 239 | | |
|---|
| 240 | | def readline(self): |
|---|
| 241 | | # Called in application thread |
|---|
| 242 | | line = self.callInReactor(self.stream.readline, '\n') |
|---|
| 243 | | if line is not None: |
|---|
| 244 | | line += '\n' |
|---|
| 245 | | return line |
|---|
| 246 | | |
|---|
| 247 | | def readlines(self, hint=None): |
|---|
| 248 | | """ |
|---|
| 249 | | Read until EOF, collecting all lines in a list, and returns |
|---|
| 250 | | that list. The hint argument is ignored (as is allowed in the |
|---|
| 251 | | API specification) |
|---|
| 252 | | """ |
|---|
| 253 | | # Called in application thread |
|---|
| 254 | | data = self.read() |
|---|
| 255 | | lines = data.split('\n') |
|---|
| 256 | | last = lines.pop() |
|---|
| 257 | | lines = [s+'\n' for s in lines] |
|---|
| 258 | | if last != '': |
|---|
| 259 | | lines.append(last) |
|---|
| 260 | | return lines |
|---|
| 261 | | |
|---|
| 262 | | def finish(self): |
|---|
| 263 | | """ |
|---|
| 264 | | Call this if this input stream is backing things up too much. |
|---|
| 265 | | """ |
|---|
| 266 | | self.stream.finish() |
|---|
| 267 | | |
|---|
| 268 | | def __iter__(self): |
|---|
| 269 | | """ |
|---|
| 270 | | Returns an iterator, each iteration of which returns the |
|---|
| 271 | | result of readline(), and stops when readline() returns an |
|---|
| 272 | | empty string. |
|---|
| 273 | | """ |
|---|
| 274 | | while True: |
|---|
| 275 | | line = self.readline() |
|---|
| 276 | | if not line: |
|---|
| 277 | | return |
|---|
| 278 | | yield line |
|---|
| 279 | | |
|---|
| 280 | | |
|---|
| 281 | | class ErrorStream(object): |
|---|
| 282 | | """ |
|---|
| 283 | | This class implements the 'wsgi.error' object. |
|---|
| 284 | | """ |
|---|
| 285 | | def flush(self): |
|---|
| 286 | | # Called in application thread |
|---|
| 287 | | return |
|---|
| 288 | | |
|---|
| 289 | | def write(self, s): |
|---|
| 290 | | # Called in application thread |
|---|
| 291 | | log.msg("WSGI app error: "+s, isError=True) |
|---|
| 292 | | |
|---|
| 293 | | def writelines(self, seq): |
|---|
| 294 | | # Called in application thread |
|---|
| 295 | | s = ''.join(seq) |
|---|
| 296 | | log.msg("WSGI app error: "+s, isError=True) |
|---|
| | 115 | |
|---|