| 1 |
# sAsync: |
|---|
| 2 |
# An enhancement to the SQLAlchemy package that provides persistent |
|---|
| 3 |
# dictionaries, text indexing and searching, and an access broker for |
|---|
| 4 |
# conveniently managing database access, table setup, and |
|---|
| 5 |
# transactions. Everything is run in an asynchronous fashion using the Twisted |
|---|
| 6 |
# framework and its deferred processing capabilities. |
|---|
| 7 |
# |
|---|
| 8 |
# Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com |
|---|
| 9 |
# |
|---|
| 10 |
# This program is free software; you can redistribute it and/or modify it under |
|---|
| 11 |
# the terms of the GNU General Public License as published by the Free Software |
|---|
| 12 |
# Foundation; either version 2 of the License, or (at your option) any later |
|---|
| 13 |
# version. |
|---|
| 14 |
# |
|---|
| 15 |
# This program is distributed in the hope that it will be useful, but WITHOUT |
|---|
| 16 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 17 |
# FOR A PARTICULAR PURPOSE. See the file COPYING for more details. |
|---|
| 18 |
# |
|---|
| 19 |
# You should have received a copy of the GNU General Public License along with |
|---|
| 20 |
# this program; if not, write to the Free Software Foundation, Inc., 51 |
|---|
| 21 |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA |
|---|
| 22 |
|
|---|
| 23 |
""" |
|---|
| 24 |
Asynchronous database transactions via SQLAlchemy. |
|---|
| 25 |
""" |
|---|
| 26 |
|
|---|
| 27 |
import sys |
|---|
| 28 |
from twisted.internet import defer |
|---|
| 29 |
from twisted.python import failure |
|---|
| 30 |
|
|---|
| 31 |
import sqlalchemy as SA |
|---|
| 32 |
|
|---|
| 33 |
###################################################################### |
|---|
| 34 |
# SA 0.4 support contributed by Ricky Iacovou, based upon: |
|---|
| 35 |
# |
|---|
| 36 |
# http://www.sqlalchemy.org/docs/04/intro.html#overview_migration |
|---|
| 37 |
# |
|---|
| 38 |
# Determine the version of SQLAlchemy used, 0.3 or 0.4, and set the |
|---|
| 39 |
# Boolean variable "SA04" accordingly. |
|---|
| 40 |
# |
|---|
| 41 |
# We could also use a capability-based approach, like: |
|---|
| 42 |
# |
|---|
| 43 |
# try: |
|---|
| 44 |
# MetaData = SA.BoundMetaData |
|---|
| 45 |
# except AttributeError: |
|---|
| 46 |
# MetaData = SA.MetaData |
|---|
| 47 |
# |
|---|
| 48 |
# However, late 0.3.x versions also supported some 0.4 constructs, |
|---|
| 49 |
# so better use an explicit 0.3.x -> 0.4.x cutoff in order to avoid |
|---|
| 50 |
# ambiguity. |
|---|
| 51 |
###################################################################### |
|---|
| 52 |
_sv = SA.__version__.split ('.') |
|---|
| 53 |
try: |
|---|
| 54 |
_v = int (_sv[0]) + (int(_sv[1]) / 10.0) |
|---|
| 55 |
except: |
|---|
| 56 |
# Not strictly an Import Error, but close enough. |
|---|
| 57 |
raise ImportError("Failed to determine SQLAlchemy version: %s", _sv) |
|---|
| 58 |
if _v >= 0.4: |
|---|
| 59 |
SA04 = True |
|---|
| 60 |
else: |
|---|
| 61 |
SA04 = False |
|---|
| 62 |
del _sv, _v |
|---|
| 63 |
# End of version check |
|---|
| 64 |
|
|---|
| 65 |
|
|---|
| 66 |
from asynqueue import ThreadQueue |
|---|
| 67 |
|
|---|
| 68 |
import misc |
|---|
| 69 |
|
|---|
| 70 |
|
|---|
| 71 |
class DatabaseError(Exception): |
|---|
| 72 |
""" |
|---|
| 73 |
A problem occured when trying to access the database. |
|---|
| 74 |
""" |
|---|
| 75 |
|
|---|
| 76 |
|
|---|
| 77 |
def transact(f): |
|---|
| 78 |
""" |
|---|
| 79 |
Use this function as a decorator to wrap the supplied method I{f} of |
|---|
| 80 |
L{AccessBroker} in a transaction that runs C{f(*args, **kw)} in its own |
|---|
| 81 |
transaction. |
|---|
| 82 |
|
|---|
| 83 |
Immediately returns an instance of L{twisted.internet.defer.Deferred} that |
|---|
| 84 |
will eventually have its callback called with the result of the |
|---|
| 85 |
transaction. Inspired by and largely copied from Valentino Volonghi's |
|---|
| 86 |
C{makeTransactWith} code. |
|---|
| 87 |
|
|---|
| 88 |
You can add the following keyword options to your function call: |
|---|
| 89 |
|
|---|
| 90 |
@keyword niceness: Scheduling niceness, an integer between -20 and 20, |
|---|
| 91 |
with lower numbers having higher scheduling priority as in UNIX C{nice} |
|---|
| 92 |
and C{renice}. |
|---|
| 93 |
|
|---|
| 94 |
@keyword doNext: Set C{True} to assign highest possible priority, even |
|---|
| 95 |
higher than with niceness = -20. |
|---|
| 96 |
|
|---|
| 97 |
@keyword doLast: Set C{True} to assign lower possible priority, even |
|---|
| 98 |
lower than with niceness = 20. |
|---|
| 99 |
|
|---|
| 100 |
@keyword session: Set this option to C{True} to get a I{session} attribute |
|---|
| 101 |
for use within the transaction, which will be flushed at the end of the |
|---|
| 102 |
transaction. |
|---|
| 103 |
|
|---|
| 104 |
@type session: Boolean option, default C{False} |
|---|
| 105 |
|
|---|
| 106 |
@keyword ignore: Set this option to C{True} to have errors in the |
|---|
| 107 |
transaction function ignored and just do the rollback quietly. |
|---|
| 108 |
|
|---|
| 109 |
@type ignore: Boolean option, default C{False} |
|---|
| 110 |
|
|---|
| 111 |
""" |
|---|
| 112 |
def substituteFunction(self, *args, **kw): |
|---|
| 113 |
""" |
|---|
| 114 |
Puts the original function in the synchronous task queue and returns a |
|---|
| 115 |
deferred to its result when it is eventually run. |
|---|
| 116 |
|
|---|
| 117 |
This function will be given the same name as the original function so |
|---|
| 118 |
that it can be asked to masquerade as the original function. As a |
|---|
| 119 |
result, the threaded call to the original function that it makes inside |
|---|
| 120 |
its C{transaction} sub-function will be able to use the arguments for |
|---|
| 121 |
that original function. (The caller will actually be calling this |
|---|
| 122 |
substitute function, but it won't know that.) |
|---|
| 123 |
|
|---|
| 124 |
The original function should be a method of a L{AccessBroker} subclass |
|---|
| 125 |
instance, and the queue for that instance will be used to run it. |
|---|
| 126 |
""" |
|---|
| 127 |
def transaction(usingSession, func, *t_args, **t_kw): |
|---|
| 128 |
""" |
|---|
| 129 |
Everything making up a transaction, and everything run in the |
|---|
| 130 |
thread, is contained within this little function, including of |
|---|
| 131 |
course a call to C{func}. |
|---|
| 132 |
""" |
|---|
| 133 |
if not usingSession: |
|---|
| 134 |
trans = self.connection.begin() |
|---|
| 135 |
if not hasattr(func, 'im_self'): |
|---|
| 136 |
t_args = (self,) + t_args |
|---|
| 137 |
try: |
|---|
| 138 |
result = func(*t_args, **t_kw) |
|---|
| 139 |
except Exception, e: |
|---|
| 140 |
if not usingSession: |
|---|
| 141 |
trans.rollback() |
|---|
| 142 |
if not ignore: |
|---|
| 143 |
raise e |
|---|
| 144 |
else: |
|---|
| 145 |
if usingSession: |
|---|
| 146 |
self.session.flush() |
|---|
| 147 |
else: |
|---|
| 148 |
trans.commit() |
|---|
| 149 |
return result |
|---|
| 150 |
return failure.Failure() |
|---|
| 151 |
|
|---|
| 152 |
def doTransaction(usingSession): |
|---|
| 153 |
""" |
|---|
| 154 |
Queues up the transaction and immediately returns a deferred to |
|---|
| 155 |
its eventual result. |
|---|
| 156 |
""" |
|---|
| 157 |
if isNested(): |
|---|
| 158 |
return f(self, *args, **kw) |
|---|
| 159 |
return self.q.call(transaction, usingSession, f, *args, **kw) |
|---|
| 160 |
|
|---|
| 161 |
def started(null): |
|---|
| 162 |
self.ranStart = True |
|---|
| 163 |
del self._transactionStartupDeferred |
|---|
| 164 |
if useSession: |
|---|
| 165 |
d = self.getSession() |
|---|
| 166 |
else: |
|---|
| 167 |
d = self.connect() |
|---|
| 168 |
d.addCallback(lambda _: self.q.call( |
|---|
| 169 |
transaction, False, self.first, doNext=True)) |
|---|
| 170 |
return d |
|---|
| 171 |
|
|---|
| 172 |
def isNested(): |
|---|
| 173 |
frame = sys._getframe() |
|---|
| 174 |
while True: |
|---|
| 175 |
frame = frame.f_back |
|---|
| 176 |
if frame is None: |
|---|
| 177 |
return False |
|---|
| 178 |
if frame.f_code == transaction.func_code: |
|---|
| 179 |
return True |
|---|
| 180 |
|
|---|
| 181 |
ignore = kw.pop('ignore', False) |
|---|
| 182 |
useSession = kw.pop('session', False) |
|---|
| 183 |
if hasattr(self, 'connection') and getattr(self, 'ranStart', False): |
|---|
| 184 |
# We already have a connection, let's get right to the transaction |
|---|
| 185 |
if useSession: |
|---|
| 186 |
d = self.getSession() |
|---|
| 187 |
d.addCallback(lambda _: doTransaction(True)) |
|---|
| 188 |
else: |
|---|
| 189 |
d = doTransaction(False) |
|---|
| 190 |
elif hasattr(self, '_transactionStartupDeferred') and \ |
|---|
| 191 |
not self._transactionStartupDeferred.called: |
|---|
| 192 |
# Startup is in progress, make a new Deferred to the start of the |
|---|
| 193 |
# transaction and chain it to the startup Deferred. |
|---|
| 194 |
d = defer.Deferred() |
|---|
| 195 |
if useSession: |
|---|
| 196 |
d.addCallback(lambda _: self.getSession()) |
|---|
| 197 |
d.addCallback(lambda _: doTransaction(useSession)) |
|---|
| 198 |
self._transactionStartupDeferred.chainDeferred(d) |
|---|
| 199 |
else: |
|---|
| 200 |
# We need to start things up before doing this first transaction |
|---|
| 201 |
d = defer.maybeDeferred(self.startup) |
|---|
| 202 |
self._transactionStartupDeferred = d |
|---|
| 203 |
d.addCallback(started) |
|---|
| 204 |
d.addCallback(lambda _: doTransaction(useSession)) |
|---|
| 205 |
# Return whatever Deferred we've got |
|---|
| 206 |
return d |
|---|
| 207 |
|
|---|
| 208 |
if f.func_name == 'first': |
|---|
| 209 |
return f |
|---|
| 210 |
substituteFunction.func_name = f.func_name |
|---|
| 211 |
return substituteFunction |
|---|
| 212 |
|
|---|
| 213 |
|
|---|
| 214 |
class AccessBroker(object): |
|---|
| 215 |
""" |
|---|
| 216 |
I manage asynchronous access to a database. |
|---|
| 217 |
|
|---|
| 218 |
Before you use any instance of me, you must specify the parameters for |
|---|
| 219 |
creating an SQLAlchemy database engine. A single argument is used, which |
|---|
| 220 |
specifies a connection to a database via an RFC-1738 url. In addition, the |
|---|
| 221 |
following keyword options can be employed, which are listed below with |
|---|
| 222 |
their default values. |
|---|
| 223 |
|
|---|
| 224 |
You can set an engine globally, for all instances of me via the |
|---|
| 225 |
L{sasync.engine} package-level function, or via my L{engine} class |
|---|
| 226 |
method. Alternatively, you can specify an engine for one particular |
|---|
| 227 |
instance by supplying the parameters to the constructor. |
|---|
| 228 |
|
|---|
| 229 |
SQLAlchemy has excellent documentation, which describes the engine |
|---|
| 230 |
parameters in plenty of detail. See |
|---|
| 231 |
U{http://www.sqlalchemy.org/docs/dbengine.myt}. |
|---|
| 232 |
|
|---|
| 233 |
@ivar dt: A property-generated reference to a deferred tracker that you can |
|---|
| 234 |
use to wait for database writes. See L{misc.DeferredTracker}. |
|---|
| 235 |
|
|---|
| 236 |
@ivar q: A property-generated reference to a threaded task queue that is |
|---|
| 237 |
dedicated to my database connection. |
|---|
| 238 |
|
|---|
| 239 |
@ivar connection: The current SQLAlchemy connection object, if |
|---|
| 240 |
any yet exists. Generated by my L{connect} method. |
|---|
| 241 |
|
|---|
| 242 |
""" |
|---|
| 243 |
globalParams = ('', {}) |
|---|
| 244 |
queues = {} |
|---|
| 245 |
|
|---|
| 246 |
def __init__(self, *url, **kw): |
|---|
| 247 |
""" |
|---|
| 248 |
Constructs an instance of me, optionally specifying parameters for an |
|---|
| 249 |
SQLAlchemy engine object that serves this instance only. |
|---|
| 250 |
""" |
|---|
| 251 |
self.selects = {} |
|---|
| 252 |
if url: |
|---|
| 253 |
self.engineParams = (url[0], kw) |
|---|
| 254 |
else: |
|---|
| 255 |
self.engineParams = self.globalParams |
|---|
| 256 |
self.running = False |
|---|
| 257 |
|
|---|
| 258 |
@classmethod |
|---|
| 259 |
def engine(cls, url, **kw): |
|---|
| 260 |
""" |
|---|
| 261 |
Sets default connection parameters for all instances of me. |
|---|
| 262 |
""" |
|---|
| 263 |
cls.globalParams = (url, kw) |
|---|
| 264 |
|
|---|
| 265 |
def _getDeferredTracker(self): |
|---|
| 266 |
""" |
|---|
| 267 |
Returns an instance of L{misc.DeferredTracker} that is dedicated to the |
|---|
| 268 |
bound method's instance of me. Creates the deferred tracker the first |
|---|
| 269 |
time this method is called for a given instance of me. |
|---|
| 270 |
""" |
|---|
| 271 |
if not hasattr(self, '_deferredTracker'): |
|---|
| 272 |
self._deferredTracker = misc.DeferredTracker() |
|---|
| 273 |
return self._deferredTracker |
|---|
| 274 |
dt = property(_getDeferredTracker) |
|---|
| 275 |
|
|---|
| 276 |
def _getQueue(self): |
|---|
| 277 |
""" |
|---|
| 278 |
Returns a threaded task queue that is dedicated to my database |
|---|
| 279 |
connection. Creates the queue the first time the property is accessed. |
|---|
| 280 |
""" |
|---|
| 281 |
def newQueue(): |
|---|
| 282 |
queue = ThreadQueue(1) |
|---|
| 283 |
self.running = True |
|---|
| 284 |
self.queues[key] = queue |
|---|
| 285 |
return queue |
|---|
| 286 |
|
|---|
| 287 |
if hasattr(self, '_currentQueue'): |
|---|
| 288 |
return self._currentQueue |
|---|
| 289 |
url, kw = self.engineParams |
|---|
| 290 |
key = hash((url,) + tuple(kw.items())) |
|---|
| 291 |
if key in self.queues: |
|---|
| 292 |
queue = self.queues[key] |
|---|
| 293 |
if not queue.isRunning(): |
|---|
| 294 |
queue = newQueue() |
|---|
| 295 |
else: |
|---|
| 296 |
queue = newQueue() |
|---|
| 297 |
self._currentQueue = queue |
|---|
| 298 |
return queue |
|---|
| 299 |
|
|---|
| 300 |
q = property(_getQueue, doc=""" |
|---|
| 301 |
Accessing the 'q' attribute will always return a running queue object that |
|---|
| 302 |
is dedicated to this instance's connection parameters |
|---|
| 303 |
""") |
|---|
| 304 |
|
|---|
| 305 |
def connect(self, forceNew=False): |
|---|
| 306 |
""" |
|---|
| 307 |
Generates and returns a singleton connection object. |
|---|
| 308 |
""" |
|---|
| 309 |
def getEngine(): |
|---|
| 310 |
if hasattr(self, '_dEngine'): |
|---|
| 311 |
d = defer.Deferred() |
|---|
| 312 |
d.addCallback(lambda _: getConnection()) |
|---|
| 313 |
self._dEngine.chainDeferred(d) |
|---|
| 314 |
else: |
|---|
| 315 |
d = self._dEngine = \ |
|---|
| 316 |
self.q.call(createEngine, doNext=True) |
|---|
| 317 |
d.addCallback(gotEngine) |
|---|
| 318 |
return d |
|---|
| 319 |
|
|---|
| 320 |
def createEngine(): |
|---|
| 321 |
url, kw = self.engineParams |
|---|
| 322 |
# The 'threadlocal' keyword value is unchanged from SA 0.3 to 0.4 |
|---|
| 323 |
kw['strategy'] = 'threadlocal' |
|---|
| 324 |
return SA.create_engine(url, **kw) |
|---|
| 325 |
|
|---|
| 326 |
def gotEngine(engine): |
|---|
| 327 |
del self._dEngine |
|---|
| 328 |
self._engine = engine |
|---|
| 329 |
return getConnection() |
|---|
| 330 |
|
|---|
| 331 |
def getConnection(): |
|---|
| 332 |
if not forceNew and hasattr(self, 'connection'): |
|---|
| 333 |
d = defer.succeed(self.connection) |
|---|
| 334 |
elif not forceNew and hasattr(self, '_dConnect'): |
|---|
| 335 |
d = defer.Deferred().addCallback(lambda _: self.connection) |
|---|
| 336 |
self._dConnect.chainDeferred(d) |
|---|
| 337 |
else: |
|---|
| 338 |
d = self._dConnect = \ |
|---|
| 339 |
self.q.call(self._engine.contextual_connect, doNext=True) |
|---|
| 340 |
d.addCallback(gotConnection) |
|---|
| 341 |
return d |
|---|
| 342 |
|
|---|
| 343 |
def gotConnection(connection): |
|---|
| 344 |
if hasattr(self, '_dConnect'): |
|---|
| 345 |
del self._dConnect |
|---|
| 346 |
self.connection = connection |
|---|
| 347 |
return connection |
|---|
| 348 |
|
|---|
| 349 |
# After all these function definitions, the method begins here |
|---|
| 350 |
if hasattr(self, '_engine'): |
|---|
| 351 |
return getConnection() |
|---|
| 352 |
else: |
|---|
| 353 |
return getEngine() |
|---|
| 354 |
|
|---|
| 355 |
def _sessionClose(self): |
|---|
| 356 |
""" |
|---|
| 357 |
Replacement C{close} method for session objects. |
|---|
| 358 |
""" |
|---|
| 359 |
self.isActive = False |
|---|
| 360 |
return self.session._close() |
|---|
| 361 |
|
|---|
| 362 |
def getSession(self): |
|---|
| 363 |
""" |
|---|
| 364 |
Get a commitable session object |
|---|
| 365 |
""" |
|---|
| 366 |
def gotConnection(connection): |
|---|
| 367 |
if SA04: |
|---|
| 368 |
d = self.q.call( |
|---|
| 369 |
SA.orm.create_session, bind=connection, doNext=True) |
|---|
| 370 |
else: |
|---|
| 371 |
d = self.q.call( |
|---|
| 372 |
SA.create_session, bind_to=connection, doNext=True) |
|---|
| 373 |
d.addCallback(gotSession) |
|---|
| 374 |
return d |
|---|
| 375 |
|
|---|
| 376 |
def gotSession(session): |
|---|
| 377 |
session.isActive = True |
|---|
| 378 |
session._close = session.close |
|---|
| 379 |
session.close = self._sessionClose |
|---|
| 380 |
self.session = session |
|---|
| 381 |
return session |
|---|
| 382 |
|
|---|
| 383 |
if hasattr(self, 'session') and self.session.isActive: |
|---|
| 384 |
return defer.succeed(self.session) |
|---|
| 385 |
return self.connect(forceNew=True).addCallback(gotConnection) |
|---|
| 386 |
|
|---|
| 387 |
def table(self, name, *cols, **kw): |
|---|
| 388 |
""" |
|---|
| 389 |
Instantiates a new table object, creating it in the transaction thread |
|---|
| 390 |
as needed. |
|---|
| 391 |
|
|---|
| 392 |
One or more indexes other than the primary key can be defined |
|---|
| 393 |
via a keyword prefixed with I{index_} or I{unique_} and having |
|---|
| 394 |
the index name as the suffix. Use the I{unique_} prefix if the |
|---|
| 395 |
index is to be a unique one. The value of the keyword is a |
|---|
| 396 |
list or tuple containing the names of all columns in the |
|---|
| 397 |
index. |
|---|
| 398 |
""" |
|---|
| 399 |
def _table(): |
|---|
| 400 |
if not hasattr(self, '_meta'): |
|---|
| 401 |
if SA04: |
|---|
| 402 |
self._meta = SA.MetaData(self._engine) |
|---|
| 403 |
else: |
|---|
| 404 |
self._meta = SA.BoundMetaData(self._engine) |
|---|
| 405 |
indexes = {} |
|---|
| 406 |
for key in kw.keys(): |
|---|
| 407 |
if key.startswith('index_'): |
|---|
| 408 |
unique = False |
|---|
| 409 |
elif key.startswith('unique_'): |
|---|
| 410 |
unique = True |
|---|
| 411 |
else: |
|---|
| 412 |
continue |
|---|
| 413 |
indexes[key] = kw.pop(key), unique |
|---|
| 414 |
kw.setdefault('useexisting', True) |
|---|
| 415 |
table = SA.Table(name, self._meta, *cols, **kw) |
|---|
| 416 |
table.create(checkfirst=True) |
|---|
| 417 |
setattr(self, name, table) |
|---|
| 418 |
return table, indexes |
|---|
| 419 |
|
|---|
| 420 |
def _index(tableInfo): |
|---|
| 421 |
table, indexes = tableInfo |
|---|
| 422 |
for key, info in indexes.iteritems(): |
|---|
| 423 |
kwIndex = {'unique':info[1]} |
|---|
| 424 |
try: |
|---|
| 425 |
# This is stupid. Why can't I see if the index |
|---|
| 426 |
# already exists and only create it if needed? |
|---|
| 427 |
index = SA.Index(key, *[ |
|---|
| 428 |
getattr(table.c, x) for x in info[0] |
|---|
| 429 |
], **kwIndex) |
|---|
| 430 |
index.create() |
|---|
| 431 |
except: |
|---|
| 432 |
pass |
|---|
| 433 |
|
|---|
| 434 |
if hasattr(self, name): |
|---|
| 435 |
d = defer.succeed(False) |
|---|
| 436 |
else: |
|---|
| 437 |
d = self.connect() |
|---|
| 438 |
d.addCallback(lambda _: self.q.call(_table, doNext=True)) |
|---|
| 439 |
d.addCallback(lambda x: self.q.call(_index, x, doNext=True)) |
|---|
| 440 |
return d |
|---|
| 441 |
|
|---|
| 442 |
def startup(self): |
|---|
| 443 |
""" |
|---|
| 444 |
This method runs before the first transaction to start my synchronous |
|---|
| 445 |
task queue. B{Override it} to get whatever pre-transaction stuff you |
|---|
| 446 |
have run. |
|---|
| 447 |
|
|---|
| 448 |
Alternatively, with legacy support for the old API, your |
|---|
| 449 |
pre-transaction code can reside in a L{userStartup} method of your |
|---|
| 450 |
subclass. |
|---|
| 451 |
""" |
|---|
| 452 |
userStartup = getattr(self, 'userStartup', None) |
|---|
| 453 |
if callable(userStartup): |
|---|
| 454 |
return defer.maybeDeferred(userStartup) |
|---|
| 455 |
|
|---|
| 456 |
def userStartup(self): |
|---|
| 457 |
""" |
|---|
| 458 |
If this method is defined and L{startup} is not overridden in your |
|---|
| 459 |
subclass, however, this method will be run as the first callback in the |
|---|
| 460 |
deferred processing chain, after my synchronous task queue is safely |
|---|
| 461 |
underway. |
|---|
| 462 |
|
|---|
| 463 |
The method should return either an immediate result or a deferred to |
|---|
| 464 |
an eventual result. |
|---|
| 465 |
|
|---|
| 466 |
B{Deprecated}: Instead of defining this method, your subclass should |
|---|
| 467 |
simply override L{startup} with your custom startup stuff. |
|---|
| 468 |
|
|---|
| 469 |
""" |
|---|
| 470 |
|
|---|
| 471 |
def first(self): |
|---|
| 472 |
""" |
|---|
| 473 |
This method automatically runs as the first transaction after |
|---|
| 474 |
completion of L{startup} (or L{userStartup}). B{Override it} to define |
|---|
| 475 |
table contents or whatever else you want as a first transaction that |
|---|
| 476 |
immediately follows your pre-transaction stuff. |
|---|
| 477 |
|
|---|
| 478 |
You don't need to decorate the method with C{@transact}, but it doesn't |
|---|
| 479 |
break anything if you do. |
|---|
| 480 |
""" |
|---|
| 481 |
|
|---|
| 482 |
def shutdown(self, *null): |
|---|
| 483 |
""" |
|---|
| 484 |
Shuts down my database transaction functionality and threaded task |
|---|
| 485 |
queue, returning a deferred that fires when all queued tasks are |
|---|
| 486 |
done and the shutdown is complete. |
|---|
| 487 |
""" |
|---|
| 488 |
def finalTask(): |
|---|
| 489 |
if hasattr(self, 'connection'): |
|---|
| 490 |
self.connection.close() |
|---|
| 491 |
self.running = False |
|---|
| 492 |
|
|---|
| 493 |
if self.running: |
|---|
| 494 |
d = self.q.call(finalTask) |
|---|
| 495 |
d.addBoth(lambda _: self.q.shutdown()) |
|---|
| 496 |
else: |
|---|
| 497 |
d = defer.succeed(None) |
|---|
| 498 |
if hasattr(self, '_deferredTracker'): |
|---|
| 499 |
d.addCallback(lambda _: self._deferredTracker.deferToAll()) |
|---|
| 500 |
return d |
|---|
| 501 |
|
|---|
| 502 |
def s(self, *args, **kw): |
|---|
| 503 |
""" |
|---|
| 504 |
Polymorphic method for working with C{select} instances within a cached |
|---|
| 505 |
selection subcontext. |
|---|
| 506 |
|
|---|
| 507 |
- When called with a single argument (the select object's name as a |
|---|
| 508 |
string) and no keywords, this method indicates if the named |
|---|
| 509 |
select object already exists and sets its selection subcontext to |
|---|
| 510 |
I{name}. |
|---|
| 511 |
|
|---|
| 512 |
- With multiple arguments or any keywords, the method acts like a |
|---|
| 513 |
call to C{sqlalchemy.select(...).compile()}, except that nothing |
|---|
| 514 |
is returned. Instead, the resulting select object is stored in |
|---|
| 515 |
the current selection subcontext. |
|---|
| 516 |
|
|---|
| 517 |
- With no arguments or keywords, the method returns the select |
|---|
| 518 |
object for the current selection subcontext. |
|---|
| 519 |
|
|---|
| 520 |
""" |
|---|
| 521 |
if kw or (len(args) > 1): |
|---|
| 522 |
# It's a compilation. |
|---|
| 523 |
context = getattr(self, 'context', None) |
|---|
| 524 |
self.selects[context] = SA.select(*args, **kw).compile() |
|---|
| 525 |
elif len(args) == 1: |
|---|
| 526 |
# It's a lookup to see if the select has been previously |
|---|
| 527 |
# seen and compiled; return True or False. |
|---|
| 528 |
self.context = args[0] |
|---|
| 529 |
return self.context in self.selects |
|---|
| 530 |
else: |
|---|
| 531 |
# It's a retrieval of a compiled selection object, keyed off |
|---|
| 532 |
# the most recently mentioned context. |
|---|
| 533 |
context = getattr(self, 'context', None) |
|---|
| 534 |
return self.selects.get(context) |
|---|
| 535 |
|
|---|
| 536 |
def queryToList(self, **kw): |
|---|
| 537 |
""" |
|---|
| 538 |
Executes my current select object with the bind parameters supplied as |
|---|
| 539 |
keywords, returning a list containing the first element of each row in |
|---|
| 540 |
the result. |
|---|
| 541 |
""" |
|---|
| 542 |
rows = self.s().execute(**kw).fetchall() |
|---|
| 543 |
if rows is None: |
|---|
| 544 |
return [] |
|---|
| 545 |
return [row[0] for row in rows] |
|---|
| 546 |
|
|---|
| 547 |
def deferToQueue(self, func, *args, **kw): |
|---|
| 548 |
""" |
|---|
| 549 |
Dispatches I{callable(*args, **kw)} as a task via the like-named method |
|---|
| 550 |
of my synchronous queue, returning a deferred to its eventual result. |
|---|
| 551 |
|
|---|
| 552 |
Scheduling of the task is impacted by the I{niceness} keyword that can |
|---|
| 553 |
be included in I{**kw}. As with UNIX niceness, the value should be an |
|---|
| 554 |
integer where 0 is normal scheduling, negative numbers are higher |
|---|
| 555 |
priority, and positive numbers are lower priority. |
|---|
| 556 |
|
|---|
| 557 |
@keyword niceness: Scheduling niceness, an integer between -20 and 20, |
|---|
| 558 |
with lower numbers having higher scheduling priority as in UNIX |
|---|
| 559 |
C{nice} and C{renice}. |
|---|
| 560 |
|
|---|
| 561 |
""" |
|---|
| 562 |
return self.q.call(func, *args, **kw) |
|---|
| 563 |
|
|---|
| 564 |
|
|---|
| 565 |
__all__ = ['transact', 'AccessBroker', 'SA'] |
|---|