TracNav menu
sAsync: SQLAlchemy done Asynchronously
The startup Method of AccessBroker
Here is an example for making two tables with the automatically-called startup method. (For backwards compatibility, you can still user userStartup instead.)
class DatabaseBroker(AccessBroker): """Provides a sAsync AccessBroker myBroker = DatabaseBroker('sqlite://foo.db') """ def startup(self): users = self.table('users', sa.Column('user_id', sa.Integer, primary_key=True), sa.Column('user_name', sa.String(255)), sa.Column('user_password', sa.String(255)), ) posts = self.table('posts', sa.Column('post_id', sa.Integer, primary_key=True), sa.Column('user_id', sa.Integer), # Do foreign keys actually work at all with sqlite? sa.Column('post_text', sa.String()), ) return defer.DeferredList((users, posts)) # Carry on as usual.
From a pgasync User
Since I'm the reason this page exists, I'll try contributing towards it. I can't help but appending a rant to this situation though. Unfortunately I don't quite yet have my PhD in semantics and programming lingo - so the general obfuscation I was faced with when trying to move away from pgasync was frustrating. A simple answer to "where do I start?!" should be along the lines of... When looking at a simple use case for a database API, "Joe's bakery" is better and more accessible than "Doctor Swarts theory of binary graph tree cycles", after all I program to try get away from maths ;-).
# From the man who started the documentation plea from twisted.internet import reactor import sqlalchemy as sa from sasync.database import AccessBroker, transact class TestAB(AccessBroker): def startup(self): # This method can also be 'userStartup' instead of 'startup', due to # backwards compatibility. self.table('accounts', sa.Column('id', sa.Integer, primary_key=True), sa.Column('name', sa.String(255)), sa.Column('password', sa.String(255)) ) @transact def insertUser(self, name, password): return self.accounts.insert().execute(name=name, password=password) @transact def getUsers(self): return self.accounts.select().execute().fetchall() class ICouldBeAnything(object): """ Some class - could be a Nevow page, or any sort of database interface object. """ myBroker = TestAB('sqlite://./test.db') def testSelect(self): """ Run a test call - this is the messy way...""" def fooCall(queryResult): """A boring callback""" print repr(queryResult) reactor.stop() self.myBroker.getUsers().addCallback(fooCall) def testInsert(self): self.myBroker.insertUser('colin', 'test') stuff = ICouldBeAnything() stuff.testInsert() stuff.testSelect() reactor.run()
Another User
This apparently may or may not work. What does work is what I do with Teleport, that is to create a child class of AccessBroker: http://www.slipgate.co.za/~karnaugh/teleport/Core/Database.py. This code is maintained and "Works".
One should also note that you must be careful about which version of SQLAlchemy you are using. The sAsync setup should require the appropriate version, however.
Improvements on the above example
From another user: After reading the sAsync code I've come to the following conclusions:
You must wait for startup (or userStartup using the old way) to complete successfully, using the Deferred object it returns, before issuing any commands. However, any method encapsulated with the @transact (as all of your transactions should be) will automatically do that wait for you.
Here is my modification of the above example.
from twisted.internet import reactor import sqlalchemy as sa from sasync.database import AccessBroker, transact class TestAB(AccessBroker): def startup(self): # This method can also be 'userStartup' instead of 'startup', due to # backwards compatibility. return self.table('accounts', sa.Column('id', sa.Integer, primary_key=True), sa.Column('name', sa.String(255)), sa.Column('password', sa.String(255)) ) @transact def insertUser(self, name, password): return self.accounts.insert().execute(name=name, password=password) @transact def getUsers(self): return self.accounts.select().execute().fetchall() def insert(ab): return ab.insertUser('Joe Schmortz', 'secret!') def select(ab): def stopAndDisplay(queryResult): print repr(queryResult) reactor.stop() return ab.getUsers().addCallback(stopAndDisplay) def start(): broker = TestAB("sqlite://./test.db") d = broker.startup() # Only Once the AccessBroker starts up can we use it d = d.addCallback(lambda _: insert(broker)) d.addCallback(lambda _: select(broker)) # Start up the reactor reactor.run() if __name__ == "__main__": start()
A semi-functional Twisted XML-RPC Server Example
Here's a simple example of sAsync integrated with a simple Twisted XML-RPC Server.
The aim of this is to give a simple starting point for people wishing to experiment with sAsync in a "real" example, as opposed to snippets which start and stop their own reactor.
There's a server and a client; run the server, then in a separate window run the client.
Twisted XML-RPC Server
# Install the Twisted libraries to use this, from # # http://www.twistedmatrix.com # # Run this example with 'twistd -noy <file.py> from twisted.internet import reactor from twisted.web import xmlrpc import sqlalchemy as sa from sasync.database import AccessBroker from sasync.database import transact class TestAB ( AccessBroker ): def startup(self): # This method can also be 'userStartup' instead of 'startup', due to # backwards compatibility. return self.table ( 'accounts', sa.Column ( 'id', sa.Integer, primary_key = True ), sa.Column ( 'name', sa.String ( 255 ) ), sa.Column ( 'password', sa.String ( 255 ) ) ) @transact def insertUser ( self, name, password ): return self.accounts.insert().execute ( name = name, password = password ).fetchall() @transact def getUsers ( self ): return self.accounts.select().execute().fetchall() class XMLRPCServer ( xmlrpc.XMLRPC ): def __init__ ( self ): self.broker = TestAB ( "sqlite://./test.db" ) def xmlrpc_insertUser ( self, user, password ): d = self.broker.insertUser ( user, password ) return d def xmlrpc_getUsers ( self ): d = self.broker.getUsers() ud = defer.Deferred() d.addCallback ( self.copyRetval, ud ) return ud def copyRetval ( self, rp, ud ): retval = [] for row in rp: print row data = [ elem for elem in row ] retval.append ( data ) ud.callback ( retval ) ###################################################################### # Twisted Boilerplate code. ###################################################################### from twisted.application import service from twisted.web import server from twisted.application import strports xmlServer = XMLRPCServer() application = service.Application ( 'sasync' ) serviceCollection = service.IServiceCollection ( application ) xmlSite = server.Site ( xmlServer ) xmlServer = strports.service ( "8080", xmlSite ) xmlServer.setServiceParent ( application )
Twisted XML-RPC Client
from twisted.web import xmlrpc from twisted.internet import reactor class XMLRPCClient ( object ): def __init__ ( self ): self.proxy = xmlrpc.Proxy ( 'http://localhost:8080' ) def start ( self ): d = self.proxy.callRemote ( "insertUser", 'john', 'smith' ) d.addCallback ( self.cb1 ) def cb1 ( self, val1 ): print "Val 1: ", val1 d = self.proxy.callRemote ( "getUsers" ) d.addCallback ( self.cb2 ) def cb2 ( self, val2 ): print "Val 2: ", val2 reactor.stop() xrc = XMLRPCClient() xrc.start() reactor.run()
Note the following:
1. We don't return the result of the DB call directly to the caller; this is because it's actually a ResultProxy? object which behaves just like a list of tuples in "normal" code, but which the xmlrpclib serializer tries to serialize verbatim and trips up over. For that reason we iterate over the results and copy them to a temporary structure in copyRetval().
A nicer but more "magical" way of doing this is to have an additional decorator function:
from twisted.internet import defer def resultCopier ( f, *args, **kwargs ): def _resultCopier ( rp, ud ): retval = [] for row in rp: data = [ elem for elem in row ] retval.append ( data ) ud.callback ( retval ) def resultDecorator ( *args, **kwargs ): d = f ( *args, **kwargs ) ud = defer.Deferred() d.addCallback ( _resultCopier, ud ) return ud return resultDecorator # Use this decorator like this: class TestAB ( AccessBroker ): # ... @resultCopier @transact def getUsers ( self ): return self.accounts.select().execute().fetchall() # Then you can use the AccessBroker method calls directly from # your XML-RPC methods: class XMLRPCServer ( xmlrpc.XMLRPC ): # ... def xmlrpc_getUsers ( self ): d = self.broker.getUsers() # Return directly; no need to copy! return d
2. We call fetchall() immediately on the value returned from execute(). In other words, we do this:
@transact def getUsers ( self ): # Call fetchall() immediately return self.accounts.select().execute().fetchall()
rather than this:
@transact def getUsers ( self ): # Let the code that receives the result call fetchall() return self.accounts.select().execute()
This is because if you try to defer the call to fetchall() you get a threading error under SQLite that looks like this:
pysqlite2.dbapi2.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id -1217213520 and this is thread id -1210480960
Daemonization Caveats
You may find sAsync will hang if you ask twistd to daemonize. I still don't know why this happens, and I've searched quite alot for the reason.
However you can work around this by placing the startup call in a reactor.callWhenRunning. The problem is somewhere in the startup code, something fails when twistd forks and kills the parent. This may save you HUGE headaches!
db = Database.DatabaseBroker('sqlite:////home/karnaugh/test.db') ServerInstance = someProtocolServer(db) def initDatabase(db): def carryOn(_): print "Database Started ", _ return db.startup().addCallback(carryOn) reactor.callWhenRunning(initDatabase, db) # Some boiler plate startup stuff application = app.Application('ServerThing') application.listenSSL(1234, server.Site(ServerInstance), myServerFactory())
