Changeset 118

Show
Ignore:
Timestamp:
12/22/07 22:16:40 (1 year ago)
Author:
edsuom
Message:

Usage example done

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/doc/example.py

    r117 r118  
    2323""" 
    2424This script shows how Bayesian inference can be run on a cluster of nodes to 
    25 quickly identify the mean and variance of a Gaussian random process. 
     25identify the location (median) and scale (half-width at half-maximum) of a 
     26Cauchy-distributed random process. 
    2627""" 
    2728 
     
    3233from scipy import stats 
    3334 
    34 from twisted.internet import defer, reactor 
     35from twisted.internet import defer, reactor, threads 
    3536 
    3637from asynqueue import ThreadQueue 
     
    166167        return self.client.startup().addCallback(maybeStarted) 
    167168     
    168     def likelihood(self, X): 
     169    def likelihood(self, X, runLocally=False): 
    169170        """ 
    170171        Returns the log-likelihood of my data given the Cauchy distribution of 
     
    176177        example a 'non-informative uniform prior' is used. 
    177178        """ 
     179        def localLikelihood(): 
     180            distObj = stats.distributions.cauchy(loc=X[0], scale=X[1]) 
     181            return s.log(distObj.pdf(self.data)).sum() 
     182         
    178183        def gotResult(L_packed): 
    179184            return pack.Unpacker(L_packed)() 
    180          
    181         X_packed = pack.Packer(X)() 
    182         d = self.client.run('likelihood', X_packed, **{'timeout':20}) 
    183         d.addCallbacks(gotResult, self._oops) 
     185 
     186        if runLocally: 
     187            d = threads.deferToThread(localLikelihood) 
     188        else: 
     189            X_packed = pack.Packer(X)() 
     190            d = self.client.run('likelihood', X_packed, **{'timeout':20}) 
     191            d.addCallbacks(gotResult, self._oops) 
    184192        return d 
    185193 
     
    199207    variance of the data set. 
    200208    """ 
     209    runLocally = False 
    201210    socket="/tmp/.ndm" 
    202     V = [0.75, 0.3, 0.1, 0.025
     211    V = [0.10, 0.02, 0.005, 0.001
    203212 
    204213    def __init__(self, data): 
     
    246255        # the most time-consuming step in real-life usage. 
    247256        XD = self.proposer.r(len(X), v) 
    248         dList = [self.caller.likelihood(Xp) for Xp in X+XD] 
     257        dList = [self.caller.likelihood(Xp, self.runLocally) for Xp in X+XD] 
    249258        d = defer.gatherResults(dList) 
    250259        d.addCallback(gotLikelihoods) 
     
    252261     
    253262    @defer.deferredGenerator 
    254     def run(self, N_bi, N_iter, N_chains): 
     263    def run(self, N_iter, N_chains): 
    255264        """ 
    256265        Does a PMC run with I{N_chains} population members and jump variances 
     
    266275        provided via the deferred, however. 
    267276 
    268         Use a provider of L{interfaces.IConsumer} to get the samples as they 
    269         are produced. Each attached consumer is updated after every iteration 
    270         with my array of parameter vectors (one vector per row) for each 
    271         iteration. 
    272  
    273277        @param N_iter: The number of iterations to produce after burn-in. 
    274  
    275         @param N_bi: The number of burn-in iterations to discard before 
    276           producing any. 
    277278 
    278279        @param N_chains: The number of Monte Carlo chains run at each 
     
    311312        X = self.proposer.r(N_chains, self.V[0]) 
    312313        # The iteration loop 
    313         i = 0 
    314         while i < N_iter + N_bi: 
     314        for i in xrange(N_iter): 
    315315            t0 = time.time() 
    316316            # Generate and weight some proposals 
     
    329329                X = s.row_stack(XP)[I] 
    330330            R = s.array([sum([x in II[k] for x in I]) for k in xrange(P)]) 
    331             if i >= N_bi: 
    332                 # Write the parameters for this iteration to the output file 
    333                 for Xk in X: 
    334                     self.fh.write(", ".join(["%.3g" % x for x in Xk])
     331            # Write the parameters for this iteration to the output file 
     332            for Xk in X: 
     333                rowString = ", ".join(["%f" % x for x in Xk]) 
     334                self.fh.write(rowString + "\n"
    335335            # Normalize R to maintain a total of N_chains population members 
    336336            normalize(R) 
     
    341341                ", ".join(["%9.3g" % x for x in X.mean(0)])) 
    342342            print msg 
    343             # Ready for the next iteration 
    344             i += 1 
    345343        # All done 
    346344        self.fh.close() 
     
    350348if __name__ == '__main__': 
    351349    # Parameters for the example 
    352     N_obs = 5000 
     350    N_obs = 50000 
    353351    N_chains = 500 
    354     N_bi, N_iter = 100, 500 
     352    N_iter = 500 
    355353    loc, scale = 0.1, 0.4 
    356354 
     
    358356    distObj = stats.distributions.cauchy(loc=loc, scale=scale) 
    359357    data = distObj.rvs(N_obs) 
    360  
     358    fh = open('data.csv', 'w') 
     359    for x in data: 
     360        fh.write("%f\n" % x) 
     361    fh.close() 
     362     
    361363    # Create the Monte Carlo runner and set up the run 
    362364    pmc = Population_Monte_Carlo(data) 
    363     reactor.callWhenRunning(pmc.run, N_bi, N_iter, N_chains) 
     365    reactor.callWhenRunning(pmc.run, N_iter, N_chains) 
    364366 
    365367    # Run!