root/projects/sAsync/trunk/sasync/items.py

Revision 129, 12.3 kB (checked in by edsuom, 5 months ago)

VERSION 0.7: Updated to work with SA 0.4

Line 
1 # sAsync:
2 # An enhancement to the SQLAlchemy package that provides persistent
3 # dictionaries, text indexing and searching, and an access broker for
4 # conveniently managing database access, table setup, and
5 # transactions. Everything can be run in an asynchronous fashion using the
6 # Twisted framework and its deferred processing capabilities.
7 #
8 # Copyright (C) 2006 by Edwin A. Suominen, http://www.eepatents.com
9 #
10 # This program is free software; you can redistribute it and/or modify it under
11 # the terms of the GNU General Public License as published by the Free Software
12 # Foundation; either version 2 of the License, or (at your option) any later
13 # version.
14 #
15 # This program is distributed in the hope that it will be useful, but WITHOUT
16 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
18 #
19 # You should have received a copy of the GNU General Public License along with
20 # this program; if not, write to the Free Software Foundation, Inc., 51
21 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
22
23 """
24 Dictionary-like objects with behind-the-scenes database persistence
25
26 """
27
28 # Imports
29 from twisted.internet import defer
30 import sqlalchemy as SA
31
32 from database import transact, AccessBroker
33 import search
34
35
36 NICENESS_WRITE = 6
37
38
39 class Missing:
40     """
41     An instance of me is returned as the value of a missing item.
42     """
43     def __init__(self, group, name):
44         self.group, self.name = group, name
45
46
47 class Transactor(AccessBroker):
48     """
49     I do the hands-on work of non-blocking database access for the persistence
50     of name:value items within a uniquely-identified group, e.g., for a
51     persistent dictionary using L{PersistentDict}.
52
53     My methods return Twisted deferred instances to the results of their
54     database accesses rather than forcing the client code to block while the
55     database access is being completed.
56     
57     """
58     def __init__(self, ID, *url, **kw):
59         """
60         Instantiates me for the items of a particular group uniquely identified
61         by the supplied integer I{ID}, optionally using a particular database
62         connection to I{url} with any supplied keywords.
63         """
64         if not isinstance(ID, int):
65             raise TypeError("Item IDs must be integers")
66         self.groupID = ID
67         if url:
68             AccessBroker.__init__(self, url[0], **kw)
69         else:
70             AccessBroker.__init__(self)
71    
72     def startup(self):
73         """
74         Startup method, automatically called before the first transaction.
75         """
76         return self.table(
77             'sasync_items',
78             SA.Column('group_id', SA.Integer, primary_key=True),
79             SA.Column('name', SA.String(40), primary_key=True),
80             SA.Column('value', SA.PickleType, nullable=False)
81             )
82    
83     @transact
84     def load(self, name):
85         """
86         Item load transaction
87         """
88         items = self.sasync_items
89         if not self.s('load'):
90             self.s(
91                 [items.c.value],
92                 SA.and_(items.c.group_id == self.groupID,
93                         items.c.name == SA.bindparam('name')))
94         row = self.s().execute(name=name).fetchone()
95         if not row:
96             return Missing(self.groupID, name)
97         else:
98             return row['value']
99    
100     @transact
101     def loadAll(self):
102         """
103         Load all my items, returing a name:value dict
104         """
105         items = self.sasync_items
106         if not self.s('load_all'):
107             self.s(
108                 [items.c.name, items.c.value],
109                 items.c.group_id == self.groupID)
110         rows = self.s().execute().fetchall()
111         result = {}
112         for row in rows:
113             result[row['name']] = row['value']
114         return result
115
116     @transact
117     def update(self, name, value):
118         """
119         Item overwrite (entry update) transaction
120         """
121         items = self.sasync_items
122         u = items.update(
123             SA.and_(items.c.group_id == self.groupID,
124                     items.c.name == name))
125         u.execute(value=value)
126    
127     @transact
128     def insert(self, name, value):
129         """
130         Item add (entry insert) transaction
131         """
132         self.sasync_items.insert().execute(
133             group_id=self.groupID, name=name, value=value)
134
135     @transact
136     def delete(self, *names):
137         """
138         Item(s) delete transaction
139         """
140         items = self.sasync_items
141         self.sasync_items.delete(
142             SA.and_(items.c.group_id == self.groupID,
143                     items.c.name.in_(names))).execute()
144    
145     @transact
146     def names(self):
147         """
148         All item names loading transaction
149         """
150         items = self.sasync_items
151         if not self.s('names'):
152             self.s(
153                 [items.c.name],
154                 items.c.group_id == self.groupID)
155         return [str(x[0]) for x in self.s().execute().fetchall()]
156
157
158 class Items(object):
159     """
160     I provide a public interface for non-blocking database access to
161     persistently stored name:value items within a uniquely-identified group,
162     e.g., for a persistent dictionary using L{PersistentDict}.
163
164     Before you use any instance of me, you must specify the parameters for
165     creating an SQLAlchemy database engine. A single argument is used, which
166     specifies a connection to a database via an RFC-1738 url. In addition, the
167     following keyword options can be employed, which are listed in the API docs
168     for L{sasync} and L{sasync.database.AccessBroker}.
169
170     You can set an engine globally, for all instances of me via the
171     L{sasync.engine} package-level function, or via the L{AccessBroker.engine}
172     class method. Alternatively, you can specify an engine for one particular
173     instance by supplying the parameters to my constructor.
174     
175     B{IMPORTANT}: Make sure you call my L{shutdown} method for an instance of
176     me that you're done with before allowing that instance to be deleted.
177     """
178     search = None
179    
180     def __init__(self, ID, *url, **kw):
181         """
182         Instantiates me for the items of a particular group uniquely identified
183         by the supplied hashable I{ID}. Ensures that I have access to a
184         class-wide instance of a L{Search} object so that I can update the
185         database's full-text index when writing values containing text content.
186
187         In addition to any engine-specifying keywords supplied, the following
188         are particular to this constructor:
189
190         @param ID: A hashable object that is used as my unique identifier.
191
192         @keyword nameType: A C{type} object defining the type that each name
193             will be coerced to after being loaded as a string from the
194             database.
195
196         @keyword search: Set C{True} if text indexing is to be performed on items
197             as they are written.
198
199         """
200         try:
201             self.groupID = hash(ID)
202         except:
203             raise TypeError("Item IDs must be hashable")
204         if kw.pop('search', False):
205             # No search object, worry about searching later
206             self.search = None
207         self.nameType = kw.pop('nameType', str)
208         if url:
209             self.t = Transactor(self.groupID, url[0], **kw)
210         else:
211             self.t = Transactor(self.groupID)
212
213     def shutdown(self, *null):
214         """
215         Shuts down my database L{Transactor} and its synchronous task queue.
216         """
217         return self.t.shutdown()
218
219     def write(self, funcName, name, value, niceness=0):
220         """
221         Performs a database write transaction, returning a deferred to its
222         completion.
223
224         If we are updating the search index, there's a nuance to the
225         deferred processing. In that case, when the write is done, the
226         deferred is fired and processing separately proceeds with indexing
227         of the written value. Here's how it works:
228
229             1. Create a clean deferred B{d1} to return to the caller, whose
230             callback(s) will be fired from the callback to the transaction's
231             own deferred B{d2}.
232
233             2. Start the write transaction and assign the C{writeDone} function
234             as the callback to its deferred B{d2}. Note that the
235             defer-to-queue transaction keeps a reference to the deferred
236             object it instantiates, so we don't have to do so for either B{d2}
237             or B{d3}. Those references are merely defined in the method for
238             code readability.
239
240         """
241         def writeDone(noneResult, d1):
242             d3 = self.search.index(
243                 value, document=self.groupID, section=hash(name))
244             d3.addCallback(self.search.ready)
245             d1.callback(None)
246
247         func = getattr(self.t, funcName)
248         if self.search is None:
249             return func(name, value, niceness=niceness)
250         else:       
251             d1 = defer.Deferred()
252             self.search.busy()
253             d2 = func(name, value, niceness=niceness)
254             d2.addCallback(writeDone, d1)
255             return d1
256    
257     def load(self, name):
258         """
259         Loads item I{name} from the database, returning a deferred to the
260         loaded value. A L{Missing} object represents the value of a missing
261         item.
262         """
263         return self.t.load(name)
264    
265     def loadAll(self):
266         """
267         Loads all items in my group from the database, returning a deferred
268         to a dict of the loaded values. The keys of the dict are coerced to the
269         type of my I{nameType} attribute.
270         """
271         def loaded(valueDict):
272             newDict = {}
273             for name, value in valueDict.iteritems():
274                 key = self.nameType(name)
275                 newDict[key] = value
276             return newDict
277        
278         d = self.t.loadAll()
279         d.addCallback(loaded)
280         return d
281    
282     def update(self, name, value):
283         """
284         Updates the database entry for item I{name} = I{value}, returning a
285         deferred that fires when the transaction is done.
286         """
287         return self.write('update', name, value, niceness=NICENESS_WRITE)
288    
289     def insert(self, name, value):
290         """
291         Inserts a database entry for item I{name} = I{value}, returning a
292         deferred that fires when the transaction is done.
293         """
294         return self.write('insert', name, value, niceness=NICENESS_WRITE)
295    
296     def delete(self, *names):
297         """
298         Deletes the database entries for the items having the supplied
299         I{*names}, returning a deferred that fires when the transaction is
300         done.
301
302         If we are updating the search index, there's a nuance to the
303         deferred processing. In that case, when the deletions are done, the
304         deferred is fired and processing separately proceeds with dropping
305         index entries for the deleted values. Here's how it works:
306
307             1. Create a clean deferred B{d1} to return to the caller, whose
308             callback(s) will be fired from the callback to the transaction's
309             own deferred B{d2}.
310
311             2. Start the delete transaction and assign the C{deleteDone}
312             function as the callback to its deferred B{d2}. Note that the
313             defer-to-thread transaction keeps a reference to the deferred
314             object it instantiates, so we don't have to do so for either B{d2}
315             or B{d3}. Those references are merely defined in the method for
316             code readability.
317
318         """
319         def deleteDone(noneResult, d1):
320             dList = []
321             for name in names:
322                 dList.append(search.drop(
323                     document=self.groupID, section=hash(name)))
324             d3 = defer.DeferredList(dList)
325             d3.addCallback(self.search.ready)
326             d1.callback(None)
327
328         kw = {'niceness':NICENESS_WRITE}
329         if self.search is None:
330             return self.t.delete(*names, **kw)
331         else:       
332             d1 = defer.Deferred()
333             self.search.busy()
334             d2 = self.t.delete(*names, **kw)
335             d2.addCallback(deleteDone, d1)
336             return d1
337    
338     def names(self):
339         """
340         Returns a deferred that fires with a list of the names of all items
341         currently defined in my group.
342         """
343         def gotNames(names):
344             return [self.nameType(x) for x in names]
345        
346         d = self.t.names()
347         d.addCallback(gotNames)
348         return d
349
350
351 __all__ = ['Missing', 'Items']
Note: See TracBrowser for help on using the browser.