Changeset 205
- Timestamp:
- 06/01/08 00:22:10 (6 months ago)
- Files:
-
- projects/AsynCluster/trunk/svpmc/model.c (modified) (13 diffs)
- projects/AsynCluster/trunk/svpmc/model.py (modified) (12 diffs)
- projects/AsynCluster/trunk/svpmc/params.py (modified) (6 diffs)
- projects/AsynCluster/trunk/svpmc/pmc.py (modified) (10 diffs)
- projects/AsynCluster/trunk/svpmc/project.py (modified) (2 diffs)
- projects/AsynCluster/trunk/svpmc/test/test_model.py (modified) (8 diffs)
- projects/AsynCluster/trunk/svpmc/test/test_pmc.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/svpmc/model.c
r202 r205 182 182 double c, cp; 183 183 const double diff = b-a; 184 const double scaleUp = 1.06; // Increase scale value to account for EXP error 184 185 // Compute the scale value c as the maximum possible pdf (regular normal 185 186 // distribution) within the truncated range … … 187 188 if(b > 0) { 188 189 // Center is within the range, so use its pdf 189 c = 1;190 c = scaleUp; 190 191 } else { 191 192 // Range is below center, so use pdf at its right edge 192 c = EXP(-0.5*b*b);193 c = scaleUp * EXP(-0.5*b*b); 193 194 } 194 195 } else { 195 196 // Range is above center, so use pdf at its left edge 196 c = EXP(-0.5*a*a);197 c = scaleUp * EXP(-0.5*a*a); 197 198 } 198 199 // Accept-reject sampling for the truncated normal distribution … … 266 267 // array lookups and products in the equation. 267 268 // 268 //SPA1(S, x0, k) = exp(-0.5 * SPA1(S, h, k)) * PA1(x, k); 269 // 270 // This is very fast, but has error of about +/-2%. However, the error 271 // doesn't seem to make any difference in the variance of log-likelihood 272 // estimates. 273 SPA1(S, x0, k) = EXP(-0.5 * SPA1(S, h, k)) * PA1(x, k); 269 SPA1(S, x0, k) = exp(-0.5 * SPA1(S, h, k)) * PA1(x, k); 270 // Using EXP is very fast, but has error of about +/-6%. In a multivariate 271 // test, the resulting error was -1112.29 vs an expected -1135.15. 272 //SPA1(S, x0, k) = EXP(-0.5 * SPA1(S, h, k)) * PA1(x, k); 274 273 275 274 // Now decorrelate the equi-variance innovations in preparation for the … … 308 307 309 308 310 // Model. likelihood309 // Model.hybridGibbs 311 310 // 312 311 // Supplied variables … … 330 329 // sc Constants for multivariate normal PDF 331 330 332 //--- Return value (double float) ---333 // The linear likelihood P(x|w) of the innovations given the parameters,334 // integrated over the ni log-volatility simulations335 336 331 337 332 // Run evaluations until odds of error in selection between current and … … 346 341 int ki, ke; 347 342 int k0, k1, k2, k3; 348 double val , maxVal;343 double val; 349 344 350 345 // Multivariate innovation for one current time-series sample … … 355 350 // first time-series sample of the evaluation interval 356 351 double he[2][Nt]; 357 // Log-likelihood of the innovation for the first time-series sample in current358 // and proposal evalutions, working values and accumulated for each iteration359 double Lxk[2], Lx[ni];360 352 361 353 // A set of evaluation sample structs … … 380 372 // For each iteration... 381 373 for(ki=0; ki<ni; ki++) { 382 383 // Initialize this iteration's accumulator384 Lx[ki] = 0;385 374 386 375 // The "previous" log-volatility of the first time-series sample is set to … … 407 396 for(k2=0; k2<2; k2++) 408 397 se[k2].Lx = 0; 409 // Lxk[.] will be set to a value in se[.].Lx that has already been410 // multivariate-accumulated, so it doesn't need clearing here411 398 412 399 do { … … 438 425 logLikelihood(&se[k2], &P, xk); 439 426 440 // Save the first log-volatility and the log-likelihood based on it (and 441 // it alone, at this point) as the log-likelihood of the innovation given 442 // the log-volatility of the proposal 427 // Save the first log-volatility 443 428 if(k1==k0) { 444 429 for(k3=0; k3<Nt; k3++) 445 430 he[k2][k3] = PA1(se[k2].h, k3); 446 Lxk[k2] = se[k2].Lx;447 431 } 448 432 } … … 478 462 PA1(se[k3].h, k2) = val; 479 463 } 480 481 // The joint probability of the innovation and the variates underlying the482 // log-volatility simulation, conditional on the parameters, is equal to483 // the likelihood of the innovation, conditional on the simulated484 // variates and the parameters, times the probability of the485 // simulated variates, conditional on the parameters:486 //487 // P(x,z|w) = P(x|z,w) * P(z|w)488 //489 // P(z|w) doesn't need to be accounted for, however, because the z's are490 // simulated from it.491 492 // Accumulate Log(P(xk,zk|w) = Lxk to eventually get Log(P(x,z|w)493 Lx[ki] += Lxk[ke];494 464 495 465 // Done with log-volatility draw for this time-series sample … … 513 483 } 514 484 515 // Compute the result, the integrated P(x|w) over the last 3/4 of the 516 // simulations on z 517 k0 = (int)(100*ni/400); 485 486 487 // Model.likelihood 488 // 489 // Supplied variables 490 // ---------------------------------------------------------------------------- 491 492 //--- Supplied variables --- 493 // z Independent normal variates, m previously simulated sets, [mpn] array 494 // x Innovation values, [pn] array 495 496 //--- Model parameters, direct --- 497 // d Volatility offset, [p] vector 498 // e Lag-1 cross-correlations for VAR of volatilities, [pp] array 499 // g Volatility/innovation shock correlations, [p] vector 500 501 //--- Model parameters, derivations --- 502 // rz Concurrent xcorr, innovation-volatility normals, [pp] 503 // ri Inverse concurrent xcorr, innovation shocks, [pp] 504 // sc Constants for multivariate normal PDF 505 506 //--- Return value (double float) --- 507 // The linear likelihood P(x|w) of the innovations given the parameters, 508 // integrated over the log-volatility simulations along the first axis of z 509 510 511 // The number of time series making up a single multivariate sample 512 const int Nt = Nx[0]; 513 // The number of multivariate samples 514 const int Ns = Nx[1]; 515 516 int ki; 517 int k0, k1; 518 double val, maxVal; 519 520 // Multivariate innovation for one current time-series sample 521 double *xk; 522 // Log-likelihood of the innovation for the time-series sample in the current 523 // evalution, accumulated for each iteration 524 double Lx[Nz[0]]; 525 526 // A single evaluation sample struct 527 struct sample se; 528 // A struct for the model parameters 529 struct parameters P; 530 531 532 // Generate parameter struct 533 P.d = d_array; P.e = e_array; P.g = g_array; 534 P.rz = rz_array; P.ri = ri_array; P.sc = sc_array; 535 536 // Initialize various arrays 537 xk = (double *) malloc(sizeof(double) * Nt); 538 se.z = (double *) malloc(sizeof(double) * Nt); 539 se.h = (double *) malloc(sizeof(double) * Nt); 540 se.x0 = (double *) malloc(sizeof(double) * Nt); 541 se.x1 = (double *) malloc(sizeof(double) * Nt); 542 543 544 // Iterate over each set of simulated z values... 545 for(ki=0; ki<Nz[0]; ki++) { 546 547 // Initialize this iteration's accumulator 548 Lx[ki] = 0; 549 550 // The "previous" log-volatility of the first time-series sample is set to 551 // the log-volatility offset plus a simulated, latent-parameter value. 552 // TODO 553 for(k1=0; k1<Nt; k1++) 554 PA1(se.h, k1) = D1(k1); 555 556 // For each time-series sample... 557 for(k0=0; k0<Ns; k0++) { 558 559 // For each time series... 560 for(k1=0; k1<Nt; k1++) { 561 // Prepare current-innovation vector xk... 562 PA1(xk, k1) = X2(k1, k0); 563 // ...and set z 564 PA1(se.z, k1) = Z3(ki, k1, k0); 565 } 566 567 // Compute the multivariate log-volatility... 568 logVol(&se, &P); 569 // ...and the log-likelihood of the innovation for this time-series 570 // sample, given the log-volatility 571 logLikelihood(&se, &P, xk); 572 // Accumulate the log-likelihood 573 Lx[ki] += se.Lxk; 574 575 // Done with log-likelihood evaluation for this time-series sample 576 } 577 578 // Done with iterations 579 } 580 581 // Free array memory 582 free(xk); 583 free(se.z); 584 free(se.h); 585 free(se.x0); 586 free(se.x1); 587 588 589 // Compute the result, the integrated P(x|w) over all the sets of simulated z 590 // values 518 591 519 592 // Compute the maximum log density and save it to subtract from everybody so 520 593 // that the maximum log density is normalized to zero 521 594 maxVal = -HUGE_VAL; 522 for(ki= k0; ki<ni; ki++) {595 for(ki=0; ki<Nz[0]; ki++) { 523 596 if(Lx[ki] > maxVal) 524 597 maxVal = Lx[ki]; … … 526 599 // Accumulate the linearized normalized densities 527 600 val = 0; 528 for(ki= k0; ki<ni; ki++)601 for(ki=0; ki<Nz[0]; ki++) 529 602 val += exp(Lx[ki] - maxVal); 530 603 // ...and return the denormalized, log mean of the linear densities... 531 return_val = log(val) + maxVal - log( ni-k0);604 return_val = log(val) + maxVal - log(Nz[0]); projects/AsynCluster/trunk/svpmc/model.py
r202 r205 20 20 """ 21 21 22 from copy import copy 23 22 24 import scipy as s 23 25 from scipy import linalg, stats 24 26 25 27 from twisted.internet import defer 28 from twisted.spread import pb 26 29 27 30 from asyncluster.master import jobs … … 32 35 33 36 37 class ZFetcher(pb.Referenceable): 38 """ 39 I provide a node worker with access to the normal variates underlying the 40 latest log-volatility simulations. 41 42 ???????????????????????????? 43 """ 44 chunkSize = 60000 45 intScaleUp = 2000 46 intScaleDown = 1.0 / intScaleUp 47 48 def __init__(self, m, p, n): 49 self.m = m 50 self.z = s.empty((m, p, n)) 51 52 def getZ(self, fullPrecision=False): 53 # Transmitting at short int cuts data size to one fourth, still offers 54 # full needed range with reasonable precision 55 result = self.z[self.successes,:,:] 56 if fullPrecision: 57 return result 58 return (self.intScaleUp*result).astype('h') 59 60 def _packageZ(self): 61 zPacked = pack.Packer(self.getZ())() 62 k = 0 63 N = len(zPacked) 64 self.zPackedChunks = [] 65 while k < N: 66 chunk = zPacked[k:k+self.chunkSize] 67 self.zPackedChunks.append(chunk) 68 k += self.chunkSize 69 70 def restart(self): 71 self.failures = [] 72 self.successes = [] 73 self.chunks = {} 74 self.zPackedChunks = None 75 76 def update(self, member, z=None): 77 """ 78 Call to update me with the results of a log-volatility simulation, with 79 integer arguments specifying the I{member} of the population for the 80 latest iteration of the overall PMC simulation. 81 82 If the log-volatility simulation succeeded, also supply the 2-D array 83 of its normal variates. Otherwise, the population member will be marked 84 as a failure. 85 86 Returns C{True} to indicate when this is the last update, C{False} 87 until then. 88 """ 89 if not isinstance(member, int) or member < 0 or member >= self.m: 90 raise ValueError( 91 "Invalid member ID '%s', must be integer between 0 and %d" \ 92 % (member, self.m-1)) 93 if z is None: 94 self.failures.append(member) 95 else: 96 self.successes.append(member) 97 self.z[member,:,:] = z 98 if len(self.failures) + len(self.successes) == self.m: 99 self._packageZ() 100 return True 101 return False 102 103 def view_get(self, perspective): 104 if perspective not in self.chunks: 105 self.chunks[perspective] = copy(self.zPackedChunks) 106 chunks = self.chunks[perspective] 107 if chunks: 108 return self.chunks[perspective].pop(0) 109 110 34 111 class ModelManager(object): 35 112 """ … … 41 118 Instantiate me with the text of a specification for the project and model 42 119 I'm going to manage. 120 121 @ivar zFetcher: An instance of L{ZFetcher} that is supplied to my model, 122 through which remote (and local) instances of the model access the normal 123 variates underlying the latest log-volatility simulations. 43 124 44 125 """ … … 56 137 M = modelObj 57 138 58 def likelihood(paramContainer):59 re sult = M.likelihood(paramContainer)60 if result is None:61 return62 return Packer(*result)()139 def hybridGibbs(paramContainer): 140 return Packer(*M.hybridGibbs(paramContainer)) 141 142 def likelihood(paramContainer, zFilePath): 143 return M.likelihood(paramContainer, iteration) 63 144 64 145 ########################################################################### … … 67 148 kw['y'] = y 68 149 kw['paramNames'] = projectManager.paramNames 150 kw['zFetcher'] = self.zFetcher = ZFetcher( 151 projectManager.m, projectManager.p, projectManager.n) 152 #??????????????????????????? 153 #kw['zFetcherRemote'] = 69 154 self.modelObj = Model(**kw) 70 155 self.queue = projectManager.queue 156 self.iteration = -1 71 157 72 158 def _oops(self, failure): … … 107 193 d.addErrback(self._oops) 108 194 return d 195 196 def nextIteration(self): 197 """ 198 """ 199 self.iteration += 1 200 self.zFetcher.restart() 201 202 def zSimulation(self, paramContainer, member, remote=False, local=False): 203 """ 204 Runs a new log-volatily simulation for the I{paramContainer}, saving 205 its result in my I{z} array and updating my instance of L{ZFetcher} 206 with the array when it has been completely updated. 207 208 Updates the container in place with the last simulated log-volatility 209 value. 210 211 Returns a deferred that fires when the simulation is done, with C{True} 212 if it succeeded or C{False} if not. 213 """ 214 def update(z=None): 215 simulationsDone = self.zFetcher.update(member, z) 216 217 def unpackResult(packedResult): 218 if packedResult: 219 return list(pack.Unpacker(packedResult)) 220 221 def simulationDone(result): 222 if result is None: 223 update() 224 return False 225 update(result[0]) 226 paramContainer.h = result[1] 227 return True 228 229 if (remote or self.remoteMode) and not local: 230 d = self.client.run( 231 'hybridGibbs', paramContainer, timeout=self.timeout) 232 d.addCallback(unpackResult) 233 else: 234 d = self.queue.call( 235 self.modelObj.hybridGibbs, paramContainer) 236 return d.addCallbacks(simulationDone, self._oops) 109 237 110 238 def likelihood(self, paramContainer, remote=False, local=False): … … 113 241 I{paramContainer} for my model. 114 242 115 Updates the container in-place with the log-likelihood and an array of 116 volatility shocks on which the likelihood computation is based, after 117 undergoing some nested MCMC using the specified jump deviation 118 I{sigma}. 243 Updates the container in-place with the log-likelihood. 119 244 120 245 Returns a deferred that fires with a reference to the paramContainer … … 125 250 # An error from the remote likelihood method call is treated as 126 251 # infinitely low likelihood 127 result = (-s.inf, 0, [], []) 128 elif isinstance(result, str): 129 # Unpack string result from remote call 130 result = list(pack.Unpacker(result)) 131 for k, name in enumerate(('Lx', 'z', 'h')): 132 setattr(paramContainer, name, result[k]) 252 result = -s.inf 253 paramContainer.Lx = result 133 254 return paramContainer 134 255 135 256 if (remote or self.remoteMode) and not local: 136 257 d = self.client.run( 137 'likelihood', paramContainer, timeout=self.timeout) 258 'likelihood', paramContainer, 259 self.iteration, timeout=self.timeout) 138 260 else: 139 261 d = self.queue.call( 140 self.modelObj.likelihood, paramContainer )262 self.modelObj.likelihood, paramContainer, self.iteration) 141 263 return d.addCallbacks(gotLikelihood, self._oops) 142 264 … … 184 306 generator. 185 307 186 @ivar Ni: Number of hybrid Gibbs iterations per l ikelihood evaluation.187 188 """ 189 keyAttrs = {'y':None, 'wiggle':1.0, 'Ni': 100}308 @ivar Ni: Number of hybrid Gibbs iterations per log-volatility simulation. 309 310 """ 311 keyAttrs = {'y':None, 'wiggle':1.0, 'Ni':50} 190 312 191 313 #--- Properties ----------------------------------------------------------- … … 228 350 i += 1 229 351 return cv 230 231 352 353 def setParams(self, paramContainer): 354 """ 355 """ 356 # Set my parameters 357 self.setParamSequence(paramContainer.paramSequence()) 358 359 # Set up array variables for the inline call, including innovations as 360 # residuals of my observations run through the reversed VAR(1)... 361 self.x = self.var.reverse(self.a, self.b) 362 # ...concurrent cross-correlations between volatility shocks and 363 # inverse of concurrent cross-correlations between innovation shocks 364 # (we'll bail out with no result if either is invalid), and... 365 try: 366 self.rz = linalg.cholesky( 367 self.covarMatrix(self.fs, self.fr), lower=True).astype('d') 368 self.ri = linalg.inv(linalg.cholesky(self.covarMatrix( 369 s.ones(self.p), self.cr), lower=True)).astype('d') 370 except linalg.LinAlgError: 371 return 372 # ...scaling constants for multivariate normal pdf 373 self.sc = s.empty((self.p, 2)) 374 self.sc[:,0] = 1.0 / s.sqrt(1.0 - self.g**2) 375 self.sc[:,1] = s.log(s.sqrt(2*s.pi) / self.sc[:,0]) 376 377 def getZ(self, zFilePath): 378 """ 379 Returns a 3-D array containing normal variates underlying simulated 380 multivariate log-volatilities for the N different sets of parameters in 381 the current iteration of the overall PMC simulation. 382 383 The array shape is (parameter set, time series, time-series sample). 384 385 """ 386 @defer.deferredGenerator 387 def getChunks(): 388 chunks = [] 389 while True: 390 wfd = defer.waitForDeferred( 391 self.zFetcherRemote.callRemote('get')) 392 yield wfd 393 chunk = wfd.getResult() 394 if not chunk: 395 break 396 chunks.append(chunk) 397 398 self._zArray = pack.Unpacker("".join(chunks))() 399 self._zArray = (ZFetcher.intScaleDown * self._zArray.astype('d')) 400 self._zID = iteration 401 402 # Local mode access 403 if hasattr(self, 'zFetcher'): 404 return defer.succeed(self.zFetcher.getZ(fullPrecision=True)) 405 # Remote mode access 406 if getattr(self, '_zID', None) != iteration: 407 if hasattr(self, '_d_getZ'): 408 d = defer.Deferred() 409 self._d_getZ.chainDeferred(d) 410 else: 411 d = self._d_getZ = getChunks() 412 else: 413 d = defer.succeed(None) 414 d.addCallback(lambda _: self._zArray) 415 return d 416 417 232 418 #--- The API -------------------------------------------------------------- 233 234 def likelihood(self, paramContainer): 235 """ 236 Call this method with a parameter object that defines: 237 238 1. A set of values for my parameters. 239 240 2. A latent parameter I{z} consisting of an array of IID normal 241 variates to be used as a starting point for another simulation 242 draw of log volatilities. 419 420 def hybridGibbs(self, paramContainer): 421 """ 422 Call this method with a parameter object that defines a set of values 423 for my parameters. 243 424 244 425 A new 2-D array of log-volatilities will be drawn from a hybrid Gibbs … … 255 436 returns a tuple containing the following: 256 437 257 1. The log-likelihood of my observations given the parameters, from 258 an integration of the join probability of the observations and 259 simulated log-volatilities over the simulation rounds. 438 1. The IID normal variates underlying the simulated 439 log-volatilities after the last simulation round. 440 441 2. The multivariate log-volatility value for the last time-series 442 sample after the last simulation round. (Useful for 443 extrapolating from the fitted model.) 444 445 """ 446 self.setParams(paramContainer) 447 # Create the normal variate array with a standard starting point for 448 # every log-volatility simulation 449 z = s.zeros_like(self.x) 450 # This will contain the simulated log-volatility of the last 451 # time-series sample 452 h = s.empty(self.p) 453 self.inline( 454 'z', 'h', 'x', 'w', 'ni', 455 'd', 'e', 'g', 'rz', 'ri', 'sc', 456 w=self.wiggle, ni=self.Ni) 457 return z, h 458 459 def likelihood(self, paramContainer, zFilePath): 460 """ 461 Call this method with a parameter object that defines a set of values 462 for my parameters and an integer specifying which PMC iteration being 463 run. 464 465 If a L{linalg.LinAlgError} is raised due to an invalid correlation 466 matrix parameter, the method returns with no result. Otherwise, it 467 returns the log-likelihood of my observations given the parameters, 468 from an integration of the joint probability of the observations and 469 simulated log-volatilities over the simulation rounds. 260 470 261 471 2. The IID normal variates underlying the simulated … … 272 482 what, making any call to this method with it a waste of computing time. 273 483 """ 274 # Set my parameters... 275 self.setParamSequence(paramContainer.paramSequence()) 276 # ...including the latent parameter of IID normal variates 277 z = paramContainer.z.copy() 278 279 # Set up array variables for the inline call, including innovations as 280 # residuals of my observations run through the reversed VAR(1)... 281 x = self.var.reverse(self.a, self.b) 282 # ... the log-volatility of the last time-series sample... 283 h = s.empty(self.p) 284 # ...concurrent cross-correlations between volatility shocks and 285 # inverse of concurrent cross-correlations between innovation shocks 286 # (we'll bail out with no result if either is invalid), and... 287 try: 288 rz = linalg.cholesky( 289 self.covarMatrix(self.fs, self.fr), lower=True).astype('d') 290 ri = linalg.inv(linalg.cholesky(self.covarMatrix( 291 s.ones(self.p), self.cr), lower=True)).astype('d') 292 except linalg.LinAlgError: 293 return 294 # ...scaling constants for multivariate normal pdf 295 sc = s.empty((self.p, 2)) 296 sc[:,0] = 1.0 / s.sqrt(1.0 - self.g**2) 297 sc[:,1] = s.log(s.sqrt(2*s.pi) / sc[:,0]) 298 299 # Run the hybrid Gibbs rounds 300 Lx = self.inline( 301 'z', 'h', 'x', 'w', 'ni', 302 'd', 'e', 'g', 'rz', 'ri', 'sc', 303 w=self.wiggle, ni=self.Ni) 304 return Lx, z, h 484 self.setParams(paramContainer) 485 return self.inline( 486 'z', 'x', 'd', 'e', 'g', 'rz', 'ri', 'sc', 487 z=self.getZ(zFilePath)) 488 489 490 projects/AsynCluster/trunk/svpmc/params.py
r203 r205 239 239 In addition to the parameter arrays, I house as key attributes: 240 240 241 - A latent parameter array I{z}.242 243 241 - A scalar I{Lx} with the log-likelihood of the model's data given my 244 parameters and the latent parameter I{z}. 242 parameters and some externally supplied normal variates underlying 243 sets of simulated log-volatilites. 245 244 246 245 - A scalar I{Lp} with the prior log-probability density of the … … 250 249 resulting in my parameters. 251 250 252 - A scalar I{Lh} with the log-probability density of the simulated253 log-volatilities, reflected in the simulated values of I{z}.254 255 """ 256 keyAttrs = {' z':[], 'Lx':None, 'Lp':None, 'Lj':None, 'Lh':None}251 - A 1-D array I{h} containing the last multivariate log-volatility 252 value of any log-volatility simulation yet done on me. 253 254 """ 255 keyAttrs = {'Lx':None, 'Lp':None, 'Lj':None, 'h':None} 257 256 258 257 … … 431 430 """ 432 431 Lp = 0 433 paramContainer = ParameterContainer( 434 paramNames=self.paramNames, z=s.randn(self.p, self.n)) 432 paramContainer = ParameterContainer(paramNames=self.paramNames) 435 433 for name in self.paramNames: 436 434 priorFlexArray = getattr(self, name) … … 439 437 setattr(paramContainer, name, paramArray) 440 438 paramContainer.Lp = Lp 439 paramContainer.Lj = 0 # There was no jump 441 440 return paramContainer 442 441 … … 449 448 deviations in the jump distribution. 450 449 451 The returned parameter container object will have a copy of the 452 supplied instance's latent parameter array I{z} and new values of I{Lp} 450 The returned parameter container object will have new values of I{Lp} 453 451 and I{Lj} corresponding to the proposal. 454 452 … … 464 462 "You must supply a ParameterContainer as the first "+\ 465 463 "argument, not a '%s'" % type(paramContainer)) 466 newVersion = ParameterContainer( 467 paramNames=self.paramNames, z=s.array(paramContainer.z).copy()) 464 newVersion = ParameterContainer(paramNames=self.paramNames) 468 465 Lp = 0 469 466 # Define a 1-D array holding the probability density for each jump, projects/AsynCluster/trunk/svpmc/pmc.py
r201 r205 52 52 self.rMax = N - self.rMin*(self.D-1) 53 53 self.updateAllocations() 54 self.II = None 54 55 55 56 def subsetIndex(self, k): … … 65 66 """ 66 67 Call this method with a reference to an empty list that will hold 67 assembled results. 68 69 For each of my bins, the method will yield the bin index, a 1-D array 70 of indices for the subset of my population allocated to that bin, and a 71 fresh deferred already fired with C{None}. You must add a callback to 72 the deferred for each iteration that returns a tuple of 1-D arrays, 73 each being the same length as the yielded index array. 68 results assembled from an allocation that I'll be (re)doing. If you 69 want to re-use the same allocation as from a previous call to this 70 method, set the I{lastAllocation} keyword C{True}. 71 72 The method will allocate the population members into my I{D} bins, 73 unless we're re-using the last allocation. For each bin, the method 74 will yield the bin index, a 1-D array of indices for the subset of my 75 population allocated to that bin, and a fresh deferred already fired 76 with C{None}. You must add a callback to the deferred for each 77 iteration that returns a tuple of 1-D arrays, each being the same 78 length as the yielded index array. 74 79 75 80 Each returned array for each subset will be assembled back into a … … 94 99 raise ValueError( 95 100 "You must supply an empty list to hold your assembled results") 96 self.II = [None] * self.D 101 if self.II is None: 102 lastAllocation = False 103 self.II = [None] * self.D 104 else: 105 lastAllocation = True 97 106 for k in s.flipud(s.argsort(self.R)): 98 I = self.II[k] = self.subsetIndex(k) 107 if lastAllocation: 108 I = self.II[k] 109 else: 110 I = self.II[k] = self.subsetIndex(k) 99 111 d = defer.succeed(None) 100 112 yield k, I, d … … 115 127 if I is None: 116 128 R = s.ones(self.D) / self.D 117 elif hasattr(self, 'II'):129 elif self.II is not None: 118 130 R = s.array( 119 131 [sum([x in self.II[k] for x in I]) for k in xrange(self.D)]) 120 132 R = R.astype(float) / R.sum() 133 self.II = None 121 134 else: 122 135 raise AttributeError( … … 140 153 141 154 """ 142 chunkSize = 200143 144 155 def __init__(self, projectManager, socket=None): 145 156 self.socket = socket … … 149 160 self.resampler = Resampler(logWeights=True) 150 161 self.allocator = Allocator(projectManager.m, len(projectManager.V)) 162 163 def _oops(self, failure, **kw): 164 text = "\nFailure" 165 if kw: 166 text += " with " 167 for name, value in kw.iteritems(): 168 text += "%s=%s" % (name, value) 169 print "%s\n%s" % (text, failure.getTraceback()) 151 170 152 def initialPopulation(self, N): 153 """ 154 Generates a new population of I{N} parameters from the priors and 155 returns a 1-D FlexArray of their parameter containers. 156 """ 157 print "Initializing population:" 158 X = params.FlexArray(N) 159 for k, paramContainer in enumerate(X): 160 X[k] = self.pm.priors.new() 161 print ".", 162 sys.stdout.flush() 163 return X 164 165 def proposals(self, X, vIndex): 166 """ 167 Returns a 1-D FlexArray of parameter containers resulting in proposals 168 from the supplied FlexArray of parameter containers I{X}, given the 169 jump deviation corresponding to the specified I{vIndex}. 170 171 Proposals with zero probability of occuring, given the parameters' 172 prior distributions, are censored. (There's no point considering the 173 likelihood of proposals that cannot occur.) 174 """ 175 XP = params.FlexArray(len(X)) 176 for k, paramContainer in enumerate(X): 177 XP[k] = self.pm.priors.proposal( 178 paramContainer, vIndex, self.allocator.W) 179 return XP 171 def proposals(self, *args): 172 """ 173 Call this method with a 1-D FlexArray of parameter containers and an 174 integer index of a particular jump deviation, and it will generate 175 proposals from the existing parameters. 176 177 Call it with just an integer number of proposals, and it will generate 178 new ones directly from the priors. 179 180 Runs a log-volatility simulation for each proposal via the model 181 manager, which constructs a 3-D array of the normal variates underlying 182 the entire set of simulations. 183 184 Returns a deferred that fires when all the log-volatility simulations 185 are done. The deferred fires with a 1-D FlexArray of the proposals 186 along with an equal-length 1-D bool array of True/False values 187 indicating which of them resulted in valid log-volatility simulations. 188 """ 189 def simulationDone(success): 190 if success: 191 print vIndex, 192 else: 193 print ".", 194 I.append(success) 195 196 def allDone(null): 197 return XP, s.array(I) 198 199 dList = [] 200 I = [] 201 if len(args) == 2: 202 new = False 203 X, vIndex = args 204 N = len(X) 205 else: 206 new = True 207 N = args[0] 208 vIndex = "+" 209 XP = params.FlexArray(N) 210 print "|", 211 for k in xrange(N): 212 if new: 213 proposalPC = self.pm.priors.new() 214 else: 215 proposalPC = self.pm.priors.proposal( 216 X[k], vIndex, self.allocator.W) 217 XP[k] = proposalPC 218 d = self.mm.zSimulation(proposalPC, k) 219 d.addCallback(simulationDone) 220 d.addErrback(self._oops, k=k) 221 dList.append(d) 222 return defer.DeferredList(dList).addCallback(allDone) 180 223 181 @defer.deferredGenerator 182 def weightedProposals(self, X, vIndex): 183 """ 184 Returns a deferred that fires with a 1-D FlexArray of proposals from 185 the parameter containers in the supplied FlexArray I{X}, given the 186 specified proposal deviation index I{vIndex}, along with a 187 like-dimensioned array of linear, fractional importance weights for the 188 proposals. 224 def weights(self, XP, vIndex): 225 """ 226 Returns a deferred that fires with a 1-D array of log importance 227 weights for the proposals supplied in the 1-D FlexArray I{XP}. 189 228 190 229 The importance weights are to be combined with those from other calls 191 230 to this method with different variance settings. The weights will be 192 used to resample the returned proposals so that the probability of any231 used to resample the supplied proposals so that the probability of any 193 232 given proposal being included in the final result for this iteration is 194 233 proportional to its likelihood and prior probability density, and … … 207 246 print "%6.4f\t%+12.2f\t%s" % (self.pm.V[vIndex], L, info) 208 247 return L 209 210 def evaluateChunk(XP): 211 XP_list.append(XP) 212 # Here's where the vast majority of the CPU time is expended! 213 dList = [ 214 self.mm.likelihood(XPk).addCallback(weight) for XPk in XP] 215 return defer.gatherResults(dList) 216 217 j = 0 218 N = len(X) 219 XP_list = [] 220 W = s.empty(len(X)) 221 while j < N: 222 N_this = min([self.chunkSize, N-j]) 223 X_this = X[j:j+N_this] 224 # Generate a chunk of proposals (somewhat time-consuming, done 225 # locally in a thread)... 226 d = self.queue.call(self.proposals, X_this, vIndex) 227 # ...and then evaluate the proposals (really time-consuming, done 228 # either locally in a thread or in the cluster) 229 d.addCallback(evaluateChunk) 230 wfd = defer.waitForDeferred(d) 231 yield wfd 232 W[j:j+N_this] = wfd.getResult() 233 j += N_this 234 yield params.FlexArray.concatenate(XP_list), W 248 249 # Evaluate the proposals (very time-consuming, done either locally in a 250 # thread or in the cluster) 251 dList = [self.mm.likelihood(XPk).addCallback(weight) for XPk in XP] 252 return defer.gatherResults(dList).addCallback(lambda W: s.array(W)) 235 253 236 254 @defer.deferredGenerator … … 249 267 250 268 """ 269 def gotTheseWeights(Wv, I, Iv): 270 """ 271 Returns a 1-D weights array of the same length as this allocation's 272 length, with infinitely negative values except for the valid 273 weights supplied in I{Wv}. 274 """ 275 W = -s.inf * s.ones(len(I)) 276 W[Iv] = Wv 277 return W 278 279 X = None 251 280 if self.socket: 252 281 wfd = defer.waitForDeferred(self.mm.setRemoteMode(self.socket)) 253 282 yield wfd; wfd.getResult() 254 283 N_members = self.pm.m 255 # Initialize some arrays256 X = self.initialPopulation(N_members)257 # Iteration258 284 print "\nRunning %d iterations with %d population members" \ 259 285 % (N_iter, N_members) 286 # For each iteration... 260 287 for i in xrange(N_iter): 261 288 print "\n%03d\n%s" % (i+1, "-"*100) 289 self.mm.nextIteration() 290 if X is None: 291 print "Generating %d proposals from priors:" % N_members 292 # No existing members, generate proposals directly from the 293 # priors 294 wfd = defer.waitForDeferred(self.proposals(N_members)) 295 yield wfd 296 XP, validXP = wfd.getResult() 297 else: 298 print "Generating %d proposals from previous population:" \ 299 % N_members 300 # Regular iteration, use proposals generated from the last 301 # iteration's members 302 dList, resultList = [], [] 303 for vIndex, I, d in self.allocator.assembler(resultList): 304 d.addCallback(lambda _: self.proposals(X[I], vIndex)) 305 dList.append(d) 306 wfd = defer.waitForDeferred(defer.DeferredList(dList)) 307 yield wfd 308 wfd.getResult() 309 XP, validXP = resultList 310 print "\n" 311 # Evaluate the proposals 262 312 dList, resultList = [], [] 263 313 for vIndex, I, d in self.allocator.assembler(resultList): 264 d.addCallback(lambda _: self.weightedProposals(X[I], vIndex)) 314 Iv = validXP[I].nonzero()[0] 315 d.addCallback(lambda _: self.weights(XP[I][Iv], vIndex)) 316 d.addCallback(gotTheseWeights, I, Iv) 317 d.addErrback(self._oops, vIndex=vIndex) 265 318 dList.append(d) 266 # Record the previous iteration's results while the nodes work on267 # the current iteration...268 if i > 0:319 # Record the previous iteration's members while the nodes work on 320 # evaluating the valid proposals for the current iteration... 321 if X is not None: 269 322 self.pm.writeParams(X)
