root/projects/sAsync/trunk/sasync/database.py

Revision 129, 20.6 kB (checked in by edsuom, 7 months ago)

VERSION 0.7: Updated to work with SA 0.4

Line 
1 # sAsync:
2 # An enhancement to the SQLAlchemy package that provides persistent
3 # dictionaries, text indexing and searching, and an access broker for
4 # conveniently managing database access, table setup, and
5 # transactions. Everything is run in an asynchronous fashion using the Twisted
6 # framework and its deferred processing capabilities.
7 #
8 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
9 #
10 # This program is free software; you can redistribute it and/or modify it under
11 # the terms of the GNU General Public License as published by the Free Software
12 # Foundation; either version 2 of the License, or (at your option) any later
13 # version.
14 #
15 # This program is distributed in the hope that it will be useful, but WITHOUT
16 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
18 #
19 # You should have received a copy of the GNU General Public License along with
20 # this program; if not, write to the Free Software Foundation, Inc., 51
21 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
22
23 """
24 Asynchronous database transactions via SQLAlchemy.
25 """
26
27 import sys
28 from twisted.internet import defer
29 from twisted.python import failure
30
31 import sqlalchemy as SA
32
33 ######################################################################
34 # SA 0.4 support contributed by Ricky Iacovou,  based upon:
35 #
36 #   http://www.sqlalchemy.org/docs/04/intro.html#overview_migration
37 #
38 # Determine the version of SQLAlchemy used, 0.3 or 0.4, and set the
39 # Boolean variable "SA04" accordingly.
40 #
41 # We could also use a capability-based approach, like:
42 #
43 # try:
44 #     MetaData = SA.BoundMetaData
45 # except AttributeError:
46 #     MetaData = SA.MetaData
47 #
48 # However, late 0.3.x versions also supported some 0.4 constructs,
49 # so better use an explicit 0.3.x -> 0.4.x cutoff in order to avoid
50 # ambiguity.
51 ######################################################################
52 _sv = SA.__version__.split ('.')
53 try:
54     _v = int (_sv[0]) + (int(_sv[1]) / 10.0)
55 except:
56     # Not strictly an Import Error, but close enough.
57     raise ImportError("Failed to determine SQLAlchemy version: %s", _sv)
58 if _v >= 0.4:
59     SA04 = True
60 else:
61     SA04 = False
62 del _sv, _v
63 # End of version check
64
65
66 from asynqueue import ThreadQueue
67
68 import misc
69
70
71 class DatabaseError(Exception):
72     """
73     A problem occured when trying to access the database.
74     """
75
76
77 def transact(f):
78     """
79     Use this function as a decorator to wrap the supplied method I{f} of
80     L{AccessBroker} in a transaction that runs C{f(*args, **kw)} in its own
81     transaction.
82
83     Immediately returns an instance of L{twisted.internet.defer.Deferred} that
84     will eventually have its callback called with the result of the
85     transaction. Inspired by and largely copied from Valentino Volonghi's
86     C{makeTransactWith} code.
87
88     You can add the following keyword options to your function call:
89
90     @keyword niceness: Scheduling niceness, an integer between -20 and 20,
91       with lower numbers having higher scheduling priority as in UNIX C{nice}
92       and C{renice}.
93
94     @keyword doNext: Set C{True} to assign highest possible priority, even
95       higher than with niceness = -20.               
96
97     @keyword doLast: Set C{True} to assign lower possible priority, even
98       lower than with niceness = 20.
99
100     @keyword session: Set this option to C{True} to get a I{session} attribute
101       for use within the transaction, which will be flushed at the end of the
102       transaction.
103
104     @type session: Boolean option, default C{False}
105
106     @keyword ignore: Set this option to C{True} to have errors in the
107       transaction function ignored and just do the rollback quietly.
108
109     @type ignore: Boolean option, default C{False}
110     
111     """
112     def substituteFunction(self, *args, **kw):
113         """
114         Puts the original function in the synchronous task queue and returns a
115         deferred to its result when it is eventually run.
116
117         This function will be given the same name as the original function so
118         that it can be asked to masquerade as the original function. As a
119         result, the threaded call to the original function that it makes inside
120         its C{transaction} sub-function will be able to use the arguments for
121         that original function. (The caller will actually be calling this
122         substitute function, but it won't know that.)
123
124         The original function should be a method of a L{AccessBroker} subclass
125         instance, and the queue for that instance will be used to run it.
126         """
127         def transaction(usingSession, func, *t_args, **t_kw):
128             """
129             Everything making up a transaction, and everything run in the
130             thread, is contained within this little function, including of
131             course a call to C{func}.
132             """
133             if not usingSession:
134                 trans = self.connection.begin()
135             if not hasattr(func, 'im_self'):
136                 t_args = (self,) + t_args
137             try:
138                 result = func(*t_args, **t_kw)
139             except Exception, e:
140                 if not usingSession:
141                     trans.rollback()
142                 if not ignore:
143                     raise e
144             else:
145                 if usingSession:
146                     self.session.flush()
147                 else:
148                     trans.commit()
149                 return result
150             return failure.Failure()
151
152         def doTransaction(usingSession):
153             """
154             Queues up the transaction and immediately returns a deferred to
155             its eventual result.
156             """
157             if isNested():
158                 return f(self, *args, **kw)
159             return self.q.call(transaction, usingSession, f, *args, **kw)
160
161         def started(null):
162             self.ranStart = True
163             del self._transactionStartupDeferred
164             if useSession:
165                 d = self.getSession()
166             else:
167                 d = self.connect()
168             d.addCallback(lambda _: self.q.call(
169                 transaction, False, self.first, doNext=True))
170             return d
171
172         def isNested():
173             frame = sys._getframe()
174             while True:
175                 frame = frame.f_back
176                 if frame is None:
177                     return False
178                 if frame.f_code == transaction.func_code:
179                     return True
180
181         ignore = kw.pop('ignore', False)
182         useSession = kw.pop('session', False)
183         if hasattr(self, 'connection') and getattr(self, 'ranStart', False):
184             # We already have a connection, let's get right to the transaction
185             if useSession:
186                 d = self.getSession()
187                 d.addCallback(lambda _: doTransaction(True))
188             else:
189                 d = doTransaction(False)
190         elif hasattr(self, '_transactionStartupDeferred') and \
191              not self._transactionStartupDeferred.called:
192             # Startup is in progress, make a new Deferred to the start of the
193             # transaction and chain it to the startup Deferred.
194             d = defer.Deferred()
195             if useSession:
196                 d.addCallback(lambda _: self.getSession())
197             d.addCallback(lambda _: doTransaction(useSession))
198             self._transactionStartupDeferred.chainDeferred(d)
199         else:
200             # We need to start things up before doing this first transaction
201             d = defer.maybeDeferred(self.startup)
202             self._transactionStartupDeferred = d
203             d.addCallback(started)
204             d.addCallback(lambda _: doTransaction(useSession))
205         # Return whatever Deferred we've got
206         return d
207
208     if f.func_name == 'first':
209         return f
210     substituteFunction.func_name = f.func_name
211     return substituteFunction
212
213
214 class AccessBroker(object):
215     """
216     I manage asynchronous access to a database.
217
218     Before you use any instance of me, you must specify the parameters for
219     creating an SQLAlchemy database engine. A single argument is used, which
220     specifies a connection to a database via an RFC-1738 url. In addition, the
221     following keyword options can be employed, which are listed below with
222     their default values.
223
224     You can set an engine globally, for all instances of me via the
225     L{sasync.engine} package-level function, or via my L{engine} class
226     method. Alternatively, you can specify an engine for one particular
227     instance by supplying the parameters to the constructor.
228           
229     SQLAlchemy has excellent documentation, which describes the engine
230     parameters in plenty of detail. See
231     U{http://www.sqlalchemy.org/docs/dbengine.myt}.
232
233     @ivar dt: A property-generated reference to a deferred tracker that you can
234       use to wait for database writes. See L{misc.DeferredTracker}.
235
236     @ivar q: A property-generated reference to a threaded task queue that is
237       dedicated to my database connection.
238
239     @ivar connection: The current SQLAlchemy connection object, if
240       any yet exists. Generated by my L{connect} method.
241     
242     """
243     globalParams = ('', {})
244     queues = {}
245    
246     def __init__(self, *url, **kw):
247         """
248         Constructs an instance of me, optionally specifying parameters for an
249         SQLAlchemy engine object that serves this instance only.
250         """
251         self.selects = {}
252         if url:
253             self.engineParams = (url[0], kw)
254         else:
255             self.engineParams = self.globalParams
256         self.running = False
257
258     @classmethod
259     def engine(cls, url, **kw):
260         """
261         Sets default connection parameters for all instances of me.
262         """
263         cls.globalParams = (url, kw)
264
265     def _getDeferredTracker(self):
266         """
267         Returns an instance of L{misc.DeferredTracker} that is dedicated to the
268         bound method's instance of me. Creates the deferred tracker the first
269         time this method is called for a given instance of me.
270         """
271         if not hasattr(self, '_deferredTracker'):
272             self._deferredTracker = misc.DeferredTracker()
273         return self._deferredTracker
274     dt = property(_getDeferredTracker)
275
276     def _getQueue(self):
277         """
278         Returns a threaded task queue that is dedicated to my database
279         connection. Creates the queue the first time the property is accessed.
280         """
281         def newQueue():
282             queue = ThreadQueue(1)
283             self.running = True
284             self.queues[key] = queue
285             return queue
286
287         if hasattr(self, '_currentQueue'):
288             return self._currentQueue
289         url, kw = self.engineParams
290         key = hash((url,) + tuple(kw.items()))
291         if key in self.queues:
292             queue = self.queues[key]
293             if not queue.isRunning():
294                 queue = newQueue()
295         else:
296             queue = newQueue()
297         self._currentQueue = queue
298         return queue
299
300     q = property(_getQueue, doc="""
301     Accessing the 'q' attribute will always return a running queue object that
302     is dedicated to this instance's connection parameters
303     """)
304
305     def connect(self, forceNew=False):
306         """
307         Generates and returns a singleton connection object.
308         """
309         def getEngine():
310             if hasattr(self, '_dEngine'):
311                 d = defer.Deferred()
312                 d.addCallback(lambda _: getConnection())
313                 self._dEngine.chainDeferred(d)
314             else:
315                 d = self._dEngine = \
316                     self.q.call(createEngine, doNext=True)
317                 d.addCallback(gotEngine)
318             return d
319
320         def createEngine():
321             url, kw = self.engineParams
322             # The 'threadlocal' keyword value is unchanged from SA 0.3 to 0.4
323             kw['strategy'] = 'threadlocal'
324             return SA.create_engine(url, **kw)
325        
326         def gotEngine(engine):
327             del self._dEngine
328             self._engine = engine
329             return getConnection()
330
331         def getConnection():
332             if not forceNew and hasattr(self, 'connection'):
333                 d = defer.succeed(self.connection)
334             elif not forceNew and hasattr(self, '_dConnect'):
335                 d = defer.Deferred().addCallback(lambda _: self.connection)
336                 self._dConnect.chainDeferred(d)
337             else:
338                 d = self._dConnect = \
339                     self.q.call(self._engine.contextual_connect, doNext=True)
340                 d.addCallback(gotConnection)
341             return d
342
343         def gotConnection(connection):
344             if hasattr(self, '_dConnect'):
345                 del self._dConnect
346             self.connection = connection
347             return connection
348
349         # After all these function definitions, the method begins here
350         if hasattr(self, '_engine'):
351             return getConnection()
352         else:
353             return getEngine()
354
355     def _sessionClose(self):
356         """
357         Replacement C{close} method for session objects.
358         """
359         self.isActive = False
360         return self.session._close()
361
362     def getSession(self):
363         """
364         Get a commitable session object
365         """
366         def gotConnection(connection):
367             if SA04:
368                 d = self.q.call(
369                     SA.orm.create_session, bind=connection, doNext=True)
370             else:
371                 d = self.q.call(
372                     SA.create_session, bind_to=connection, doNext=True)
373             d.addCallback(gotSession)
374             return d
375
376         def gotSession(session):
377             session.isActive = True
378             session._close = session.close
379             session.close = self._sessionClose
380             self.session = session
381             return session
382
383         if hasattr(self, 'session') and self.session.isActive:
384             return defer.succeed(self.session)
385         return self.connect(forceNew=True).addCallback(gotConnection)
386    
387     def table(self, name, *cols, **kw):
388         """
389         Instantiates a new table object, creating it in the transaction thread
390         as needed.
391
392         One or more indexes other than the primary key can be defined
393         via a keyword prefixed with I{index_} or I{unique_} and having
394         the index name as the suffix. Use the I{unique_} prefix if the
395         index is to be a unique one. The value of the keyword is a
396         list or tuple containing the names of all columns in the
397         index.
398         """
399         def _table():
400             if not hasattr(self, '_meta'):
401                 if SA04:
402                     self._meta = SA.MetaData(self._engine)
403                 else:
404                     self._meta = SA.BoundMetaData(self._engine)
405             indexes = {}
406             for key in kw.keys():
407                 if key.startswith('index_'):
408                     unique = False
409                 elif key.startswith('unique_'):
410                     unique = True
411                 else:
412                     continue
413                 indexes[key] = kw.pop(key), unique
414             kw.setdefault('useexisting', True)
415             table = SA.Table(name, self._meta, *cols, **kw)
416             table.create(checkfirst=True)
417             setattr(self, name, table)
418             return table, indexes
419
420         def _index(tableInfo):
421             table, indexes = tableInfo
422             for key, info in indexes.iteritems():
423                 kwIndex = {'unique':info[1]}
424                 try:
425                     # This is stupid. Why can't I see if the index
426                     # already exists and only create it if needed?
427                     index = SA.Index(key, *[
428                         getattr(table.c, x) for x in info[0]
429                         ], **kwIndex)
430                     index.create()
431                 except:
432                     pass
433        
434         if hasattr(self, name):
435             d = defer.succeed(False)
436         else:
437             d = self.connect()
438             d.addCallback(lambda _: self.q.call(_table, doNext=True))
439             d.addCallback(lambda x: self.q.call(_index, x, doNext=True))
440         return d
441    
442     def startup(self):
443         """
444         This method runs before the first transaction to start my synchronous
445         task queue. B{Override it} to get whatever pre-transaction stuff you
446         have run.
447
448         Alternatively, with legacy support for the old API, your
449         pre-transaction code can reside in a L{userStartup} method of your
450         subclass.
451         """
452         userStartup = getattr(self, 'userStartup', None)
453         if callable(userStartup):
454             return defer.maybeDeferred(userStartup)
455
456     def userStartup(self):
457         """
458         If this method is defined and L{startup} is not overridden in your
459         subclass, however, this method will be run as the first callback in the
460         deferred processing chain, after my synchronous task queue is safely
461         underway.
462
463         The method should return either an immediate result or a deferred to
464         an eventual result.
465
466         B{Deprecated}: Instead of defining this method, your subclass should
467         simply override L{startup} with your custom startup stuff.
468
469         """
470
471     def first(self):
472         """
473         This method automatically runs as the first transaction after
474         completion of L{startup} (or L{userStartup}). B{Override it} to define
475         table contents or whatever else you want as a first transaction that
476         immediately follows your pre-transaction stuff.
477
478         You don't need to decorate the method with C{@transact}, but it doesn't
479         break anything if you do.
480         """
481
482     def shutdown(self, *null):
483         """
484         Shuts down my database transaction functionality and threaded task
485         queue, returning a deferred that fires when all queued tasks are
486         done and the shutdown is complete.
487         """
488         def finalTask():
489             if hasattr(self, 'connection'):
490                 self.connection.close()
491             self.running = False
492
493         if self.running:
494             d = self.q.call(finalTask)
495             d.addBoth(lambda _: self.q.shutdown())
496         else:
497             d = defer.succeed(None)
498         if hasattr(self, '_deferredTracker'):
499             d.addCallback(lambda _: self._deferredTracker.deferToAll())
500         return d
501    
502     def s(self, *args, **kw):
503         """
504         Polymorphic method for working with C{select} instances within a cached
505         selection subcontext.
506
507             - When called with a single argument (the select object's name as a
508               string) and no keywords, this method indicates if the named
509               select object already exists and sets its selection subcontext to
510               I{name}.
511             
512             - With multiple arguments or any keywords, the method acts like a
513               call to C{sqlalchemy.select(...).compile()}, except that nothing
514               is returned. Instead, the resulting select object is stored in
515               the current selection subcontext.
516             
517             - With no arguments or keywords, the method returns the select
518               object for the current selection subcontext.
519               
520         """
521         if kw or (len(args) > 1):
522             # It's a compilation.
523             context = getattr(self, 'context', None)
524             self.selects[context] = SA.select(*args, **kw).compile()
525         elif len(args) == 1:
526             # It's a lookup to see if the select has been previously
527             # seen and compiled; return True or False.
528             self.context = args[0]
529             return self.context in self.selects
530         else:
531             # It's a retrieval of a compiled selection object, keyed off
532             # the most recently mentioned context.
533             context = getattr(self, 'context', None)
534             return self.selects.get(context)
535
536     def queryToList(self, **kw):
537         """
538         Executes my current select object with the bind parameters supplied as
539         keywords, returning a list containing the first element of each row in
540         the result.
541         """
542         rows = self.s().execute(**kw).fetchall()
543         if rows is None:
544             return []
545         return [row[0] for row in rows]
546
547     def deferToQueue(self, func, *args, **kw):
548         """
549         Dispatches I{callable(*args, **kw)} as a task via the like-named method
550         of my synchronous queue, returning a deferred to its eventual result.
551
552         Scheduling of the task is impacted by the I{niceness} keyword that can
553         be included in I{**kw}. As with UNIX niceness, the value should be an
554         integer where 0 is normal scheduling, negative numbers are higher
555         priority, and positive numbers are lower priority.
556         
557         @keyword niceness: Scheduling niceness, an integer between -20 and 20,
558             with lower numbers having higher scheduling priority as in UNIX
559             C{nice} and C{renice}.
560         
561         """
562         return self.q.call(func, *args, **kw)
563
564
565 __all__ = ['transact', 'AccessBroker', 'SA']
Note: See TracBrowser for help on using the browser.