Changeset 206
- Timestamp:
- 06/01/08 02:06:56 (6 months ago)
- Files:
-
- projects/AsynCluster/trunk/asyncluster/master/control.py (modified) (1 diff)
- projects/AsynCluster/trunk/svpmc/model.py (modified) (11 diffs)
- projects/AsynCluster/trunk/svpmc/params.py (modified) (7 diffs)
- projects/AsynCluster/trunk/svpmc/pmc.py (modified) (1 diff)
- projects/AsynCluster/trunk/svpmc/project.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/control.py
r175 r206 380 380 to the callable object specified in the job's namespace on each node 381 381 before it runs another call for that job. 382 383 If you don't want the task saved for future workers, but only run on 384 the workers currently attached, set the I{ephemeral} keyword C{True}. 382 385 """ 383 386 return self.ctl.jobber.update(jobID, callName, *args, **kw) projects/AsynCluster/trunk/svpmc/model.py
r205 r206 35 35 36 36 37 INT_SCALE = 2000 38 39 37 40 class ZFetcher(pb.Referenceable): 38 41 """ … … 40 43 latest log-volatility simulations. 41 44 42 ???????????????????????????? 45 @ivar z: The full-precision 3-D array of normal variates underlying the 46 current set of valid log-volatility simulations. 47 48 @ivar zPackedChunks: A list of strings containing a chunked, packed version 49 of z. 50 43 51 """ 44 52 chunkSize = 60000 45 intScaleUp = 200046 intScaleDown = 1.0 / intScaleUp47 53 48 54 def __init__(self, m, p, n): 49 55 self.m = m 50 self.z = s.empty((m, p, n)) 51 52 def getZ(self, fullPrecision=False): 53 # Transmitting at short int cuts data size to one fourth, still offers 54 # full needed range with reasonable precision 55 result = self.z[self.successes,:,:] 56 if fullPrecision: 57 return result 58 return (self.intScaleUp*result).astype('h') 59 60 def _packageZ(self): 61 zPacked = pack.Packer(self.getZ())() 62 k = 0 63 N = len(zPacked) 64 self.zPackedChunks = [] 65 while k < N: 66 chunk = zPacked[k:k+self.chunkSize] 67 self.zPackedChunks.append(chunk) 68 k += self.chunkSize 69 56 self._z = s.empty((m, p, n)) 57 self.restart() 58 59 def _get_z(self): 60 return self._z[self.successes,:,:] 61 z = property(_get_z) 62 70 63 def restart(self): 71 64 self.failures = [] 72 65 self.successes = [] 73 self.chunks = {}74 66 self.zPackedChunks = None 75 67 … … 95 87 else: 96 88 self.successes.append(member) 97 self. z[member,:,:] = z89 self._z[member,:,:] = z 98 90 if len(self.failures) + len(self.successes) == self.m: 99 self._packageZ() 91 zPacked = pack.Packer(self.z)() 92 k = 0 93 N = len(zPacked) 94 self.zPackedChunks = [] 95 while k < N: 96 chunk = zPacked[k:k+self.chunkSize] 97 self.zPackedChunks.append(chunk) 98 k += self.chunkSize 100 99 return True 101 100 return False 102 101 103 def view_get(self, perspective):104 if perspective not in self.chunks:105 self.chunks[perspective] = copy(self.zPackedChunks)106 chunks = self.chunks[perspective]107 if chunks:108 return self.chunks[perspective].pop(0)109 110 102 111 103 class ModelManager(object): … … 119 111 I'm going to manage. 120 112 121 @ivar zFetcher: An instance of L{ZFetcher} that is supplied to my model,122 through which remote (and local) instances of the model access the normal123 variates underlying the latest log-volatility simulations.124 125 113 """ 126 114 remoteMode = False … … 129 117 nodecode = """ 130 118 ########################################################################### 131 from twisted_goodies.pybywire.pack import Packer 132 133 M = None 119 from twisted_goodies.pybywire import pack 120 121 MODEL = None 122 CHUNKS = [] 134 123 135 124 def newModel(modelObj): 136 global M 137 M = modelObj 138 125 global MODEL 126 MODEL = modelObj 127 128 def newZ(zPacked, k, N): 129 global CHUNKS 130 if k == 0: 131 CHUNKS = [] 132 CHUNKS.append(zPacked) 133 if k == N-1: 134 MODEL.z = %f * pack.Unpacker(''.join(CHUNKS)).astype('d') 135 139 136 def hybridGibbs(paramContainer): 140 return Packer(*M.hybridGibbs(paramContainer))141 142 def likelihood(paramContainer , zFilePath):143 return M .likelihood(paramContainer, iteration)137 return pack.Packer(*MODEL.hybridGibbs(paramContainer)) 138 139 def likelihood(paramContainer): 140 return MODEL.likelihood(paramContainer) 144 141 145 142 ########################################################################### 146 """ 143 """ % (1.0/INT_SCALE,) 144 147 145 def __init__(self, projectManager, y, **kw): 148 146 kw['y'] = y 149 147 kw['paramNames'] = projectManager.paramNames 150 kw['zFetcher'] =self.zFetcher = ZFetcher(148 self.zFetcher = ZFetcher( 151 149 projectManager.m, projectManager.p, projectManager.n) 152 #???????????????????????????153 #kw['zFetcherRemote'] =154 150 self.modelObj = Model(**kw) 155 151 self.queue = projectManager.queue 156 self.iteration = -1157 152 158 153 def _oops(self, failure): … … 188 183 d.addCallback( 189 184 lambda _: self.client.registerClasses(*Parameterized.registry)) 190 d.addCallback( 191 lambda _: self.client.update('newModel', self.modelObj)) 185 d.addCallback(lambda _: self.client.update('newModel', self.modelObj)) 192 186 d.addCallback(lambda _: setattr(self, 'remoteMode', True)) 193 187 d.addErrback(self._oops) 194 188 return d 195 189 196 def nextIteration(self): 197 """ 198 """ 199 self.iteration += 1 190 @defer.deferredGenerator 191 def nextIteration(self, remote=False, local=False): 192 """ 193 """ 194 self.modelObj.z = self.zFetcher.z 195 if (remote or self.remoteMode) and not local: 196 chunks = self.zFetcher.zPackedChunks 197 N = len(chunks) 198 for k, chunk in enumerate(chunks): 199 wfd = defer.waitForDeferred( 200 self.client.update('newZ', chunk, k, N, ephemeral=True)) 201 yield wfd 202 wfd.getResult() 200 203 self.zFetcher.restart() 201 204 202 205 def zSimulation(self, paramContainer, member, remote=False, local=False): 203 206 """ … … 253 256 paramContainer.Lx = result 254 257 return paramContainer 255 258 256 259 if (remote or self.remoteMode) and not local: 257 260 d = self.client.run( 258 'likelihood', paramContainer, 259 self.iteration, timeout=self.timeout) 261 'likelihood', paramContainer, timeout=self.timeout) 260 262 else: 261 263 d = self.queue.call( 262 self.modelObj.likelihood, paramContainer , self.iteration)264 self.modelObj.likelihood, paramContainer) 263 265 return d.addCallbacks(gotLikelihood, self._oops) 264 266 … … 374 376 self.sc[:,0] = 1.0 / s.sqrt(1.0 - self.g**2) 375 377 self.sc[:,1] = s.log(s.sqrt(2*s.pi) / self.sc[:,0]) 376 377 def getZ(self, zFilePath):378 """379 Returns a 3-D array containing normal variates underlying simulated380 multivariate log-volatilities for the N different sets of parameters in381 the current iteration of the overall PMC simulation.382 383 The array shape is (parameter set, time series, time-series sample).384 385 """386 @defer.deferredGenerator387 def getChunks():388 chunks = []389 while True:390 wfd = defer.waitForDeferred(391 self.zFetcherRemote.callRemote('get'))392 yield wfd393 chunk = wfd.getResult()394 if not chunk:395 break396 chunks.append(chunk)397 398 self._zArray = pack.Unpacker("".join(chunks))()399 self._zArray = (ZFetcher.intScaleDown * self._zArray.astype('d'))400 self._zID = iteration401 402 # Local mode access403 if hasattr(self, 'zFetcher'):404 return defer.succeed(self.zFetcher.getZ(fullPrecision=True))405 # Remote mode access406 if getattr(self, '_zID', None) != iteration:407 if hasattr(self, '_d_getZ'):408 d = defer.Deferred()409 self._d_getZ.chainDeferred(d)410 else:411 d = self._d_getZ = getChunks()412 else:413 d = defer.succeed(None)414 d.addCallback(lambda _: self._zArray)415 return d416 378 417 379 … … 457 419 return z, h 458 420 459 def likelihood(self, paramContainer , zFilePath):421 def likelihood(self, paramContainer): 460 422 """ 461 423 Call this method with a parameter object that defines a set of values 462 for my parameters and an integer specifying which PMC iteration being 463 run. 424 for my parameters. 464 425 465 426 If a L{linalg.LinAlgError} is raised due to an invalid correlation … … 467 428 returns the log-likelihood of my observations given the parameters, 468 429 from an integration of the joint probability of the observations and 469 simulated log-volatilities over the simulation rounds. 470 471 2. The IID normal variates underlying the simulated 472 log-volatilities after the last simulation round. 473 474 3. The multivariate log-volatility value for the last time-series 475 sample after the last simulation round. (Useful for 476 extrapolating from the fitted model.) 430 log-volatilities simulated from all the other parameters in the current 431 PMC population. 477 432 478 433 The returned likelihood does not consider the prior probability of the … … 483 438 """ 484 439 self.setParams(paramContainer) 485 return self.inline( 486 'z', 'x', 'd', 'e', 'g', 'rz', 'ri', 'sc', 487 z=self.getZ(zFilePath)) 440 return self.inline('z', 'x', 'd', 'e', 'g', 'rz', 'ri', 'sc') 488 441 489 442 projects/AsynCluster/trunk/svpmc/params.py
r205 r206 284 284 285 285 class Jump(object): 286 __slots__ = ('x', 'px', 'p V')287 def __init__(self, x, px, p V):286 __slots__ = ('x', 'px', 'pJump') 287 def __init__(self, x, px, pJump): 288 288 self.x = x 289 289 self.px = px 290 self.p V = pV290 self.pJump = pJump 291 291 292 292 def _get_distObj(self): … … 320 320 D = len(self.V) 321 321 rJumps = self.rdsObj.rvs(self.N_draw) 322 # Right side 323 V = self.V[::-1] 324 pJumpsL = self.rdsObj.pdf( 325 s.kron(rJumps, V).reshape(self.N_draw, D)) 326 # Left side 327 V = 1.0/V[:0:-1] 328 pJumpsR = self.rdsObj.pdf( 329 s.kron(rJumps, V).reshape(self.N_draw, D-1)) 330 pJumps = s.column_stack([pJumpsL, pJumpsR]) 322 pJumps = self.rdsObj.pdf(rJumps) 331 323 return {'k':-1, 'N':self.N_draw, 'R':rJumps, 'P':pJumps} 332 324 … … 340 332 draw = self.cache['draw'] = self._newDraw() 341 333 k = draw['k'] 342 return draw['R'][k], draw['P'][k ,:]334 return draw['R'][k], draw['P'][k] 343 335 344 336 def pdf(self, x): … … 367 359 standard deviation width of a normal distribution.) 368 360 369 Returns a tiny object with t woattributes, conveniently accessible as361 Returns a tiny object with three attributes, conveniently accessible as 370 362 part of a L{FlexArray}: 371 363 372 364 - B{x}: the value after the jump, and 373 365 374 - B{pV}: a 1-D array of probabilities of the jump for each jump 375 deviation in my array I{V}. 366 - B{px}: the probability density of I{x}, given the prior 367 distribution. 368 369 - B{pj}: the probability density of the jump, given the jump 370 deviation used. 376 371 377 372 For my Gaussian-PDF deviate generator, the probability density of a … … 380 375 each possible jump deviation in the I{p} array of the result. 381 376 """ 382 D = len(self.V) 377 v = self.V[vIndex] 378 vInverse = 1.0 / v 383 379 for k in xrange(self.N_attempts): 384 jumpValue, P= self._rJumpDraw()385 jumpValue *= self.V[vIndex]386 p V = P[D-vIndex-1:2*D-vIndex-1] / self.V387 x = startValue + jumpValue380 rJump, pJump = self._rJumpDraw() 381 rJump *= v 382 pJump *= vInverse 383 x = startValue + rJump 388 384 px = self.pdf(x) 389 385 if px > 0: … … 395 391 x = self.rvs() 396 392 px = self.pdf(x) 397 p V = px * s.ones(D)398 return self.Jump(x, px, p V)393 pJump = 1.0 # No jump involved 394 return self.Jump(x, px, pJump) 399 395 400 396 … … 463 459 "argument, not a '%s'" % type(paramContainer)) 464 460 newVersion = ParameterContainer(paramNames=self.paramNames) 465 Lp =0461 Lp, Lj = 0, 0 466 462 # Define a 1-D array holding the probability density for each jump, 467 463 # joint across all parameters. Since the densities will be scaled by 468 464 # the weight for each jump deviation, just start the array with the 469 465 # weights. 470 pJumps = vWeights.copy()471 466 for name in self.paramNames: 472 467 priorsFA = getattr(self, name) 473 468 startValues = getattr(paramContainer, name) 474 469 jumpsFA = priorsFA.call('rJump', startValues, vIndex) 470 setattr(newVersion, name, jumpsFA.x) 475 471 Lp += s.log(jumpsFA.px).sum() 476 for k in xrange(len(vWeights)): 477 pJumps[k] *= jumpsFA.pV.call('item', k).prod() 478 setattr(newVersion, name, jumpsFA.x) 472 Lj += s.log(jumpsFA.pJump).sum() 479 473 newVersion.Lp = Lp 480 newVersion.Lj = s.log(pJumps.sum())474 newVersion.Lj = Lj 481 475 return newVersion projects/AsynCluster/trunk/svpmc/pmc.py
r205 r206 208 208 vIndex = "+" 209 209 XP = params.FlexArray(N) 210 print "|",211 210 for k in xrange(N): 212 211 if new: projects/AsynCluster/trunk/svpmc/project.py
r205 r206 126 126 setattr(obs, "series_%02d" % (k,), title) 127 127 # ...and construct a queue and model manager for a simulation 128 #self.queue = asynqueue.ThreadQueue(1)129 self.queue = NullQueue()128 self.queue = asynqueue.ThreadQueue(1) 129 #self.queue = NullQueue() 130 130 self.mgr = model.ModelManager( 131 131 self, tsData, wiggle=self.wiggle, Ni=self.Ni)
