Changeset 73

Show
Ignore:
Timestamp:
08/08/07 16:33:30 (1 year ago)
Author:
edsuom
Message:

Some initial tinkering with multi-threaded database transactions

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/sAsync/branches/multithread/sasync/database.py

    r57 r73  
    2929from twisted.python import failure 
    3030import sqlalchemy as SA 
    31 from asynqueue import ThreadQueue 
    3231 
    3332import misc 
     33from transactions import TransactionQueue 
    3434 
    3535 
     
    196196    U{http://www.sqlalchemy.org/docs/dbengine.myt}. 
    197197 
     198    @cvar numThreads: An integer specifying the number of threads to use for 
     199      transactions (default 1) 
     200 
    198201    @ivar dt: A property-generated reference to a deferred tracker that you can 
    199202      use to wait for database writes. See L{misc.DeferredTracker}. 
     
    206209     
    207210    """ 
     211    numThreads = 1 
    208212    globalParams = ('', {}) 
    209213    queues = {} 
     
    242246        """ 
    243247        Returns a threaded task queue that is dedicated to my database 
    244         connection. Creates the queue if the first time the property is 
    245         accessed. 
     248        connection. Creates the queue the first time the property is accessed. 
    246249        """ 
    247250        def newQueue(): 
    248             queue = ThreadQueue(1
     251            queue = TransactionQueue(self.numThreads
    249252            self.running = True 
    250253            self.queues[key] = queue 
     
    269272    """) 
    270273 
    271     def connect(self, forceNew=False): 
    272         """ 
    273         Generates and returns a singleton connection object. 
     274    def connect(self, threadID, forceNew=False): 
     275        """ 
     276        Generates and returns a connection object that serves all transations 
     277        within a given thread. 
     278 
     279        Runs in a transaction thread. 
    274280        """ 
    275281        def getEngine(): 
    276             if hasattr(self, '_dEngine'): 
    277                 d = defer.Deferred() 
    278                 d.addCallback(lambda _: getConnection()) 
    279                 self._dEngine.chainDeferred(d) 
    280             else: 
    281                 d = self._dEngine = \ 
    282                     self.q.call(createEngine, doNext=True) 
    283                 d.addCallback(gotEngine) 
    284             return d 
    285  
    286         def createEngine(): 
    287282            url, kw = self.engineParams 
    288283            kw['strategy'] = 'threadlocal' 
    289             return SA.create_engine(url, **kw) 
    290          
    291         def gotEngine(engine): 
    292             del self._dEngine 
    293             self._engine = engine 
    294             return getConnection() 
     284            self._engine = SA.create_engine(url, **kw) 
     285            self.connections = {} 
    295286 
    296287        def getConnection(): 
     
    313304 
    314305        # After all these function definitions, the method begins here 
    315         if hasattr(self, '_engine'): 
    316             return getConnection() 
    317         else: 
    318             return getEngine() 
     306        if not hasattr(self, '_engine'): 
     307            getEngine() 
     308        return getConnection() 
    319309 
    320310    def _sessionClose(self):