Changeset 118
- Timestamp:
- 12/22/07 22:16:40 (1 year ago)
- Files:
-
- projects/AsynCluster/trunk/doc/example.py (modified) (13 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/doc/example.py
r117 r118 23 23 """ 24 24 This script shows how Bayesian inference can be run on a cluster of nodes to 25 quickly identify the mean and variance of a Gaussian random process. 25 identify the location (median) and scale (half-width at half-maximum) of a 26 Cauchy-distributed random process. 26 27 """ 27 28 … … 32 33 from scipy import stats 33 34 34 from twisted.internet import defer, reactor 35 from twisted.internet import defer, reactor, threads 35 36 36 37 from asynqueue import ThreadQueue … … 166 167 return self.client.startup().addCallback(maybeStarted) 167 168 168 def likelihood(self, X ):169 def likelihood(self, X, runLocally=False): 169 170 """ 170 171 Returns the log-likelihood of my data given the Cauchy distribution of … … 176 177 example a 'non-informative uniform prior' is used. 177 178 """ 179 def localLikelihood(): 180 distObj = stats.distributions.cauchy(loc=X[0], scale=X[1]) 181 return s.log(distObj.pdf(self.data)).sum() 182 178 183 def gotResult(L_packed): 179 184 return pack.Unpacker(L_packed)() 180 181 X_packed = pack.Packer(X)() 182 d = self.client.run('likelihood', X_packed, **{'timeout':20}) 183 d.addCallbacks(gotResult, self._oops) 185 186 if runLocally: 187 d = threads.deferToThread(localLikelihood) 188 else: 189 X_packed = pack.Packer(X)() 190 d = self.client.run('likelihood', X_packed, **{'timeout':20}) 191 d.addCallbacks(gotResult, self._oops) 184 192 return d 185 193 … … 199 207 variance of the data set. 200 208 """ 209 runLocally = False 201 210 socket="/tmp/.ndm" 202 V = [0. 75, 0.3, 0.1, 0.025]211 V = [0.10, 0.02, 0.005, 0.001] 203 212 204 213 def __init__(self, data): … … 246 255 # the most time-consuming step in real-life usage. 247 256 XD = self.proposer.r(len(X), v) 248 dList = [self.caller.likelihood(Xp ) for Xp in X+XD]257 dList = [self.caller.likelihood(Xp, self.runLocally) for Xp in X+XD] 249 258 d = defer.gatherResults(dList) 250 259 d.addCallback(gotLikelihoods) … … 252 261 253 262 @defer.deferredGenerator 254 def run(self, N_ bi, N_iter, N_chains):263 def run(self, N_iter, N_chains): 255 264 """ 256 265 Does a PMC run with I{N_chains} population members and jump variances … … 266 275 provided via the deferred, however. 267 276 268 Use a provider of L{interfaces.IConsumer} to get the samples as they269 are produced. Each attached consumer is updated after every iteration270 with my array of parameter vectors (one vector per row) for each271 iteration.272 273 277 @param N_iter: The number of iterations to produce after burn-in. 274 275 @param N_bi: The number of burn-in iterations to discard before276 producing any.277 278 278 279 @param N_chains: The number of Monte Carlo chains run at each … … 311 312 X = self.proposer.r(N_chains, self.V[0]) 312 313 # The iteration loop 313 i = 0 314 while i < N_iter + N_bi: 314 for i in xrange(N_iter): 315 315 t0 = time.time() 316 316 # Generate and weight some proposals … … 329 329 X = s.row_stack(XP)[I] 330 330 R = s.array([sum([x in II[k] for x in I]) for k in xrange(P)]) 331 if i >= N_bi:332 # Write the parameters for this iteration to the output file333 for Xk in X:334 self.fh.write(", ".join(["%.3g" % x for x in Xk]))331 # Write the parameters for this iteration to the output file 332 for Xk in X: 333 rowString = ", ".join(["%f" % x for x in Xk]) 334 self.fh.write(rowString + "\n") 335 335 # Normalize R to maintain a total of N_chains population members 336 336 normalize(R) … … 341 341 ", ".join(["%9.3g" % x for x in X.mean(0)])) 342 342 print msg 343 # Ready for the next iteration344 i += 1345 343 # All done 346 344 self.fh.close() … … 350 348 if __name__ == '__main__': 351 349 # Parameters for the example 352 N_obs = 5000 350 N_obs = 50000 353 351 N_chains = 500 354 N_ bi, N_iter = 100,500352 N_iter = 500 355 353 loc, scale = 0.1, 0.4 356 354 … … 358 356 distObj = stats.distributions.cauchy(loc=loc, scale=scale) 359 357 data = distObj.rvs(N_obs) 360 358 fh = open('data.csv', 'w') 359 for x in data: 360 fh.write("%f\n" % x) 361 fh.close() 362 361 363 # Create the Monte Carlo runner and set up the run 362 364 pmc = Population_Monte_Carlo(data) 363 reactor.callWhenRunning(pmc.run, N_ bi, N_iter, N_chains)365 reactor.callWhenRunning(pmc.run, N_iter, N_chains) 364 366 365 367 # Run!
