root/projects/sAsync/branches/smartbroker.py

Revision 92, 2.1 kB (checked in by edsuom, 1 year ago)

Job-dispatching work

Line 
1 class SmartBroker(AccessBroker):
2     """
3     Broker for asynchronous database access that allows for external schema and
4     migrations, as well as a configurable thread pool size.
5     """
6
7     def __init__(self, url, schema, **kw):
8         self.schema = schema
9         if kw.has_key('threads'):
10             self.threads = kw['threads']
11             del kw['threads']
12         else:
13             warn('Thread pool size not specified, using a single thread!')
14             self.threads = 1
15         AccessBroker.__init__(self, url, **kw)
16
17     def _getQueue(self):
18         """
19         Returns a threaded task queue that is dedicated to my database
20         connection. Creates the queue if the first time the property is
21         accessed.
22         """
23         def newQueue():
24             queue = asynqueue.ThreadQueue(self.threads)
25             self.running = True
26             self.queues[key] = queue
27             return queue
28
29         if hasattr(self, '_currentQueue'):
30             return self._currentQueue
31         url, kw = self.engineParams
32         key = hash((url,) + tuple(kw.items()))
33         if key in self.queues:
34             queue = self.queues[key]
35             if not queue.isRunning():
36                 queue = newQueue()
37         else:
38             queue = newQueue()
39         self._currentQueue = queue
40         return queue
41
42     q = property(_getQueue, doc="""
43         Accessing the 'q' attribute will always return a running queue object
44         that is dedicated to this instance's connection parameters
45         """)
46
47     def startup(self):
48         """ Connects db, sets up tables in schema for later use """
49         def post_startup(result, dfrd):
50             if not hasattr(self, '_engine'):
51                 err = Exception("I have no database engine!")
52                 dfrd.errback(err)
53             metadata = sa.BoundMetaData(self._engine)
54             boundschema = self.schema(metadata)
55             for table in boundschema.tables:
56                 setattr(self, table.name, table)
57             dfrd.callback(True)
58         d = defer.Deferred()
59         self.connect().addBoth(post_startup, d)
60         return d
61    
62 # vim:set ai sw=4 ts=4 tw=0 expandtab:
Note: See TracBrowser for help on using the browser.