| 1 |
class SmartBroker(AccessBroker): |
|---|
| 2 |
""" |
|---|
| 3 |
Broker for asynchronous database access that allows for external schema and |
|---|
| 4 |
migrations, as well as a configurable thread pool size. |
|---|
| 5 |
""" |
|---|
| 6 |
|
|---|
| 7 |
def __init__(self, url, schema, **kw): |
|---|
| 8 |
self.schema = schema |
|---|
| 9 |
if kw.has_key('threads'): |
|---|
| 10 |
self.threads = kw['threads'] |
|---|
| 11 |
del kw['threads'] |
|---|
| 12 |
else: |
|---|
| 13 |
warn('Thread pool size not specified, using a single thread!') |
|---|
| 14 |
self.threads = 1 |
|---|
| 15 |
AccessBroker.__init__(self, url, **kw) |
|---|
| 16 |
|
|---|
| 17 |
def _getQueue(self): |
|---|
| 18 |
""" |
|---|
| 19 |
Returns a threaded task queue that is dedicated to my database |
|---|
| 20 |
connection. Creates the queue if the first time the property is |
|---|
| 21 |
accessed. |
|---|
| 22 |
""" |
|---|
| 23 |
def newQueue(): |
|---|
| 24 |
queue = asynqueue.ThreadQueue(self.threads) |
|---|
| 25 |
self.running = True |
|---|
| 26 |
self.queues[key] = queue |
|---|
| 27 |
return queue |
|---|
| 28 |
|
|---|
| 29 |
if hasattr(self, '_currentQueue'): |
|---|
| 30 |
return self._currentQueue |
|---|
| 31 |
url, kw = self.engineParams |
|---|
| 32 |
key = hash((url,) + tuple(kw.items())) |
|---|
| 33 |
if key in self.queues: |
|---|
| 34 |
queue = self.queues[key] |
|---|
| 35 |
if not queue.isRunning(): |
|---|
| 36 |
queue = newQueue() |
|---|
| 37 |
else: |
|---|
| 38 |
queue = newQueue() |
|---|
| 39 |
self._currentQueue = queue |
|---|
| 40 |
return queue |
|---|
| 41 |
|
|---|
| 42 |
q = property(_getQueue, doc=""" |
|---|
| 43 |
Accessing the 'q' attribute will always return a running queue object |
|---|
| 44 |
that is dedicated to this instance's connection parameters |
|---|
| 45 |
""") |
|---|
| 46 |
|
|---|
| 47 |
def startup(self): |
|---|
| 48 |
""" Connects db, sets up tables in schema for later use """ |
|---|
| 49 |
def post_startup(result, dfrd): |
|---|
| 50 |
if not hasattr(self, '_engine'): |
|---|
| 51 |
err = Exception("I have no database engine!") |
|---|
| 52 |
dfrd.errback(err) |
|---|
| 53 |
metadata = sa.BoundMetaData(self._engine) |
|---|
| 54 |
boundschema = self.schema(metadata) |
|---|
| 55 |
for table in boundschema.tables: |
|---|
| 56 |
setattr(self, table.name, table) |
|---|
| 57 |
dfrd.callback(True) |
|---|
| 58 |
d = defer.Deferred() |
|---|
| 59 |
self.connect().addBoth(post_startup, d) |
|---|
| 60 |
return d |
|---|
| 61 |
|
|---|
| 62 |
# vim:set ai sw=4 ts=4 tw=0 expandtab: |
|---|