sAsync:   SQLAlchemy done Asynchronously

The startup Method of AccessBroker

Here is an example for making two tables with the automatically-called startup method. (For backwards compatibility, you can still user userStartup instead.)

class DatabaseBroker(AccessBroker):
    """Provides a sAsync AccessBroker
       myBroker = DatabaseBroker('sqlite://foo.db')
    """
    def startup(self):
        users = self.table('users',
            sa.Column('user_id', sa.Integer, primary_key=True),
            sa.Column('user_name', sa.String(255)),
            sa.Column('user_password', sa.String(255)),
        )

        posts = self.table('posts',
            sa.Column('post_id', sa.Integer, primary_key=True),
            sa.Column('user_id', sa.Integer), # Do foreign keys actually work at all with sqlite?
            sa.Column('post_text', sa.String()),
        )
        return defer.DeferredList((users, posts))

    # Carry on as usual.

From a pgasync User

Since I'm the reason this page exists, I'll try contributing towards it. I can't help but appending a rant to this situation though. Unfortunately I don't quite yet have my PhD in semantics and programming lingo - so the general obfuscation I was faced with when trying to move away from pgasync was frustrating. A simple answer to "where do I start?!" should be along the lines of... When looking at a simple use case for a database API, "Joe's bakery" is better and more accessible than "Doctor Swarts theory of binary graph tree cycles", after all I program to try get away from maths ;-).

# From the man who started the documentation plea

from twisted.internet import reactor
import sqlalchemy as sa
from sasync.database import AccessBroker, transact

class TestAB(AccessBroker):
    def startup(self):
        # This method can also be 'userStartup' instead of 'startup', due to
        # backwards compatibility.
        self.table('accounts',
            sa.Column('id', sa.Integer, primary_key=True),
            sa.Column('name', sa.String(255)),
            sa.Column('password', sa.String(255))
        )

    @transact
    def insertUser(self, name, password):
        return self.accounts.insert().execute(name=name, password=password)

    @transact
    def getUsers(self):
        return self.accounts.select().execute().fetchall()


class ICouldBeAnything(object):
    """
    Some class - could be a Nevow page, or any sort of database interface object.
    """
    myBroker = TestAB('sqlite://./test.db')

    def testSelect(self):
        """ Run a test call - this is the messy way..."""

        def fooCall(queryResult):
            """A boring callback"""
            print repr(queryResult)
            reactor.stop()

        self.myBroker.getUsers().addCallback(fooCall)

    def testInsert(self):
        self.myBroker.insertUser('colin', 'test')


stuff = ICouldBeAnything()
stuff.testInsert()
stuff.testSelect()
reactor.run()

Another User

This apparently may or may not work. What does work is what I do with Teleport, that is to create a child class of AccessBroker: http://www.slipgate.co.za/~karnaugh/teleport/Core/Database.py. This code is maintained and "Works".

One should also note that you must be careful about which version of SQLAlchemy you are using. The sAsync setup should require the appropriate version, however.

Improvements on the above example

From another user: After reading the sAsync code I've come to the following conclusions:

You must wait for startup (or userStartup using the old way) to complete successfully, using the Deferred object it returns, before issuing any commands. However, any method encapsulated with the @transact (as all of your transactions should be) will automatically do that wait for you.

Here is my modification of the above example.

from twisted.internet import reactor
import sqlalchemy as sa
from sasync.database import AccessBroker, transact

class TestAB(AccessBroker):
    def startup(self):
        # This method can also be 'userStartup' instead of 'startup', due to
        # backwards compatibility.
        return self.table('accounts',
            sa.Column('id', sa.Integer, primary_key=True),
            sa.Column('name', sa.String(255)),
            sa.Column('password', sa.String(255))
        )

    @transact
    def insertUser(self, name, password):
        return self.accounts.insert().execute(name=name, password=password)

    @transact
    def getUsers(self):
        return self.accounts.select().execute().fetchall()

def insert(ab):    
    return ab.insertUser('Joe Schmortz', 'secret!')
       
def select(ab):
    def stopAndDisplay(queryResult):        
        print repr(queryResult)
        reactor.stop()
        
    return ab.getUsers().addCallback(stopAndDisplay)
    
def start():
    broker = TestAB("sqlite://./test.db")
    d = broker.startup()
    # Only Once the AccessBroker starts up can we use it
    d = d.addCallback(lambda _: insert(broker))    
    d.addCallback(lambda _: select(broker))            
    # Start up the reactor
    reactor.run()
    
if __name__ == "__main__":
    start()

A semi-functional Twisted XML-RPC Server Example

Here's a simple example of sAsync integrated with a simple Twisted XML-RPC Server.

The aim of this is to give a simple starting point for people wishing to experiment with sAsync in a "real" example, as opposed to snippets which start and stop their own reactor.

There's a server and a client; run the server, then in a separate window run the client.

Twisted XML-RPC Server


# Install the Twisted libraries to use this, from
#
#   http://www.twistedmatrix.com
#
# Run this example with 'twistd -noy <file.py>

from   twisted.internet import reactor
from   twisted.web      import xmlrpc
import sqlalchemy       as sa
from   sasync.database  import AccessBroker
from   sasync.database  import transact

class TestAB ( AccessBroker ):
    def startup(self):
        # This method can also be 'userStartup' instead of 'startup', due to
        # backwards compatibility.
        return self.table ( 'accounts',
                            sa.Column ( 'id', sa.Integer, primary_key = True ),
                            sa.Column ( 'name', sa.String ( 255 ) ),
                            sa.Column ( 'password', sa.String ( 255 ) )
                            )

    @transact
    def insertUser ( self, name, password ):
        return self.accounts.insert().execute (
            name = name,
            password = password
            ).fetchall()

    @transact
    def getUsers ( self ):
        return self.accounts.select().execute().fetchall()
    

class XMLRPCServer ( xmlrpc.XMLRPC ):    
    def __init__ ( self ):
        self.broker = TestAB ( "sqlite://./test.db" )

    def xmlrpc_insertUser ( self, user, password ):
        d = self.broker.insertUser ( user, password )
        return d

    def xmlrpc_getUsers ( self ):
        d = self.broker.getUsers()
        ud = defer.Deferred()
        d.addCallback ( self.copyRetval, ud )
        return ud

    def copyRetval ( self, rp, ud ):
        retval = []
        for row in rp:
            print row
            data = [ elem for elem in row ]
            retval.append ( data )
        ud.callback ( retval )


######################################################################
# Twisted Boilerplate code.
######################################################################
    
from twisted.application                import service
from twisted.web                        import server
from twisted.application                import strports

xmlServer         = XMLRPCServer()
application       = service.Application ( 'sasync' )
serviceCollection = service.IServiceCollection ( application )
xmlSite           = server.Site ( xmlServer )
xmlServer         = strports.service ( "8080", xmlSite )
xmlServer.setServiceParent ( application )

Twisted XML-RPC Client


from twisted.web      import xmlrpc
from twisted.internet import reactor

class XMLRPCClient ( object ):
    
    def __init__ ( self ):
        self.proxy = xmlrpc.Proxy ( 'http://localhost:8080' )


    def start ( self ):
        d = self.proxy.callRemote ( "insertUser", 'john', 'smith' )
        d.addCallback ( self.cb1 )
        
    def cb1 ( self, val1 ):
        print "Val 1: ", val1
        d = self.proxy.callRemote ( "getUsers" )
        d.addCallback ( self.cb2 )
        
    def cb2 ( self, val2 ):
        print "Val 2: ", val2
        reactor.stop()
        
xrc = XMLRPCClient()
xrc.start()
reactor.run()

Note the following:

1. We don't return the result of the DB call directly to the caller; this is because it's actually a ResultProxy? object which behaves just like a list of tuples in "normal" code, but which the xmlrpclib serializer tries to serialize verbatim and trips up over. For that reason we iterate over the results and copy them to a temporary structure in copyRetval().

A nicer but more "magical" way of doing this is to have an additional decorator function:

from   twisted.internet import defer

def resultCopier ( f, *args, **kwargs ):

    def _resultCopier ( rp, ud ):
        retval = []
        for row in rp:
            data = [ elem for elem in row ]
            retval.append ( data )
        ud.callback ( retval )

    def resultDecorator ( *args, **kwargs ):
        d = f ( *args, **kwargs )
        ud = defer.Deferred()
        d.addCallback ( _resultCopier, ud )
        return ud

    return resultDecorator

# Use this decorator like this:
class TestAB ( AccessBroker ):

    # ...

    @resultCopier
    @transact
    def getUsers ( self ):
        return self.accounts.select().execute().fetchall()

# Then you can use the AccessBroker method calls directly from
# your XML-RPC methods:

class XMLRPCServer ( xmlrpc.XMLRPC ):    
    # ...
    def xmlrpc_getUsers ( self ):
        d = self.broker.getUsers()
        # Return directly; no need to copy!
        return d

2. We call fetchall() immediately on the value returned from execute(). In other words, we do this:

    @transact
    def getUsers ( self ):
        # Call fetchall() immediately
        return self.accounts.select().execute().fetchall()

rather than this:

    @transact
    def getUsers ( self ):
        # Let the code that receives the result call fetchall()
        return self.accounts.select().execute()

This is because if you try to defer the call to fetchall() you get a threading error under SQLite that looks like this:

pysqlite2.dbapi2.ProgrammingError: SQLite objects created in a thread
can only be used in that same thread.The object was created in thread id
-1217213520 and this is thread id -1210480960

Daemonization Caveats

You may find sAsync will hang if you ask twistd to daemonize. I still don't know why this happens, and I've searched quite alot for the reason.

However you can work around this by placing the startup call in a reactor.callWhenRunning. The problem is somewhere in the startup code, something fails when twistd forks and kills the parent. This may save you HUGE headaches!

db = Database.DatabaseBroker('sqlite:////home/karnaugh/test.db')

ServerInstance = someProtocolServer(db)

def initDatabase(db):
    def carryOn(_):
        print "Database Started ", _
    return db.startup().addCallback(carryOn)

reactor.callWhenRunning(initDatabase, db)

# Some boiler plate startup stuff
application = app.Application('ServerThing')
application.listenSSL(1234, server.Site(ServerInstance), myServerFactory())