Changeset 150
- Timestamp:
- 04/17/08 21:17:12 (9 months ago)
- Files:
-
- projects/AsynCluster/trunk/svpmc/model.py (modified) (13 diffs)
- projects/AsynCluster/trunk/svpmc/params.py (added)
- projects/AsynCluster/trunk/svpmc/pmc.py (modified) (4 diffs)
- projects/AsynCluster/trunk/svpmc/sample.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/svpmc/model.py
r149 r150 31 31 from twisted_goodies.pybywire.pack import Packer, Unpacker 32 32 33 import sample, weave 33 import sample, weave, params 34 34 35 35 … … 49 49 50 50 51 class ModelCaller(object): 52 """ 53 I handle deferred calls to a L{Model} object in either local or remote 54 mode. Calls in local mode are run through an instance of L{NullQueue} in 55 the main thread. 51 class ModelManager(object): 52 """ 53 I manage a L{Model} object and a collection of L{priors.Prior} objects. 54 55 I handle calls to the model object in either local or remote mode. Calls in 56 local mode are run through an instance of L{NullQueue} in the main thread. 56 57 """ 57 58 remoteMode = False … … 60 61 nodecode = """ 61 62 ########################################################################### 62 from scipy import isfinite 63 from twisted_goodies.pybywire import pack 63 from twisted_goodies.pybywire.pack import Packer 64 64 65 65 M = None … … 69 69 M = modelObj 70 70 71 def callModel(methodName, *possiblyPackedArgs): 72 method = getattr(M, methodName) 73 return pack.packwrap(method)(*possiblyPackedArgs) 74 75 def likelihood(packedParams): 76 L = M.likelihood(list(pack.Unpacker(packedParams))) 77 return str(float(L)) 71 def likelihood(paramContainer, wiggle): 72 v, L = M.likelihood(paramContainer, wiggle) 73 return Packer(v, L)() 78 74 79 75 ########################################################################### … … 114 110 return d 115 111 116 def _gotPossiblyPackedResult(self, result): 117 if isinstance(result, str): 118 result = list(Unpacker(result)) 119 if len(result) == 1: 120 result = result[0] 121 return result 122 123 def call(self, methodName, *args, **kw): 124 """ 125 Calls my model's method identified by I{methodName} with any supplied 126 args. 127 128 If I am in local mode, runs the call in a dedicated thread of the 129 current interpreter. If in remote mode, runs the call on a cluster node 130 via my job client object. In either case, returns a deferred that fires 131 with the result. 132 133 You can force local or remote calling by setting the I{local} or 134 I{remote} keyword to C{True}, with local mode taking precedence and 135 being the default. 136 137 Remote calls have a default timeout of 40 seconds, after which the call 138 will be retried. Make sure none of your model's methods take that long, 139 or set my I{timeout} attribute to something larger. 140 """ 141 if kw.get('remote', self.remoteMode) and not kw.get('local', False): 142 if args: 143 args = (Packer(*args)(),) 144 d = self.client.run( 145 'callModel', methodName, *args, **{'timeout':self.timeout}) 146 d.addCallback(self._gotPossiblyPackedResult) 147 else: 148 d = self.queue.call(getattr(self.modelObj, methodName), *args) 149 d.addErrback(self._oops) 150 return d 151 152 def likelihood(self, paramSequence, remote=False, local=False): 153 """ 154 Returns the log likelihood of the supplied parameter sequence for my 155 model. 156 """ 157 def gotLikelihood(value): 158 if value is None: 112 def likelihood(self, paramContainer, wiggle, remote=False, local=False): 113 """ 114 Returns the log-likelihood of the parameters in the supplied 115 I{paramContainer} for my model. 116 117 Updates in-place the container's sole latent parameter, an array of 118 volatility shocks after some MCMC involved with the likelihood 119 compution. 120 """ 121 def gotPackedLikelihood(result): 122 if result is None: 159 123 # An error from the remote likelihood method call is treated as 160 124 # infinitely low likelihood 161 value = '-inf' 162 return float(value) 125 return -s.inf 126 up = pack.Unpacker(result) 127 paramContainer.v = up() 128 return up() 129 130 def gotLikelihood(result): 131 paramContainer.v = result[0] 132 return result[1] 163 133 164 134 if (remote or self.remoteMode) and not local: 165 packedParams = Packer(*paramSequence)166 135 d = self.client.run( 167 'likelihood', packedParams(), timeout=self.timeout) 136 'likelihood', paramContainer, wiggle, timeout=self.timeout) 137 d.addCallback(gotPackedLikelihood) 138 else: 139 d = self.queue.call( 140 self.modelObj.likelihood, paramContainer, wiggle) 168 141 d.addCallback(gotLikelihood) 169 else:170 d = self.queue.call(self.modelObj.likelihood, paramSequence)171 142 return d.addErrback(self._oops) 172 173 174 class Prior(Parameterized):175 """176 You must define the name of an underlying dist object from the scipy.stats177 L{distributions} module via my parameter I{dname}.178 179 For all underlying dist objects, you must also specify I{loc} and I{scale} as180 additional parameters. Some of the objects also require I{a} and I{b}.181 """182 paramNames = ['dname', 'loc', 'scale', 'a', 'b']183 184 def _get_distObj(self):185 if 'dist' not in self.cache:186 if not hasattr(distributions, self.dname):187 raise ImportError(188 "No distribution '%s' in scipy.stats.distributions" \189 % self.dname)190 distClass = getattr(distributions, self.dname)191 args = []192 for xName in ('a', 'b'):193 xValue = getattr(self, xName, None)194 if xValue is not None:195 args.append(xValue)196 kw = {'loc':self.loc, 'scale':self.scale}197 self.cache['dist'] = distClass(*args, **kw)198 return self.cache['dist']199 distObj = property(_get_distObj)200 201 def _get_rdsObj(self):202 if 'rds' not in self.cache:203 width = s.diff(self.distObj.ppf([0.15, 0.85]))[0]204 self.cache['rds'] = distributions.norm(0, width)205 return self.cache['rds']206 rdsObj = property(_get_rdsObj)207 208 def pdf(self, x):209 """210 Returns probability values of the supplied parameter value I{x}.211 """212 return self.distObj.pdf(x)213 214 def rvs(self, N):215 """216 Returns I{N} parameter values drawn from my distribution.217 """218 return self.distObj.rvs(N)219 220 def rds(self, N, wiggle):221 """222 Returns I{N} values that deviate from zero by random amounts drawn from223 a normal distribution with a pre-scaled standard deviation that is set224 to approximate the 70% probability width of my distribution. (That is225 the +/- 1 standard deviation width of a normal distribution.)226 227 The deviates are then scaled by a I{wiggle} value.228 """229 if wiggle < 0.0:230 raise ValueError(231 "Specify a positive wiggle value, '%s' obj invalid" \232 % str(wiggle))233 return wiggle * self.rdsObj.rvs(N)234 235 def pds(self, X, wiggle):236 """237 Returns the probability densities of the supplied zero-mean deviates,238 under the assumption that they were generated from my Gaussian-PDF239 L{rds} method with the specified I{wiggle}.240 241 For my Gaussian-PDF deviate generator, the probability density of a242 given deviate is inversely proportional to the scaling to its standard243 deviation imparted by I{wiggle}. Thus, the unit-normal PDF is divided244 by the I{wiggle} in the result.245 """246 return self.rdsObj.pdf(X/wiggle) / wiggle247 143 248 144 … … 281 177 observation data. 282 178 283 @ cvar M: Number of volatility draws per likelihood evaluation.179 @ivar M: Number of volatility draws per likelihood evaluation. 284 180 285 181 """ 286 182 _logU_index = -1 287 183 288 M = 100 289 keyAttrs = {'tsList':None} 290 paramNames = [ 291 # Approximately p*(3+2.5) parameters total, e.g., 125 for p=5 292 #------------------------------------------------------------ 293 'a', # Drift (p x 1) 294 295 'b', # Lag-1 cross-correlation for VAR of returns (p x p) 296 297 'c', # Innovation shock concurrent correlations (vector with 298 # r01, r02,..., r0p, r12,..., r1p,...) 299 300 'd', # Volatility offset (p x 1) 301 302 'e', # Lag-1 cross-correlations for VAR of volatilities 303 # (p x p) 304 305 'f', # Volatility shock concurrent correlations (vector with 306 # r01, r02,..., r0p, r12,..., r1p,...) 307 308 'g', # Volatility/innovation shock correlations (p x 1) 309 310 ] 311 184 keyAttrs = {'tsList':None, 'M':500} 185 paramNames = params.PARAM_NAMES 312 186 313 187 #--- Properties ----------------------------------------------------------- … … 356 230 """ 357 231 if not hasattr(self, 'h'): 358 self.h = s.empty_like(self.nw .x)232 self.h = s.empty_like(self.nw()) 359 233 self.nw.step(wiggle) 360 234 return self.inline('h', 'v', 'd', 'e', v=self.nw.correlate(self.f)) … … 397 271 log-likelihood is disregarded, C{sqrt(2*pi)}. 398 272 """ 399 mu = self.g * self.nw .x273 mu = self.g * self.nw() 400 274 sigma2 = 1 - self.g**2 401 275 logProbs = -(y - mu)**2 / (2*sigma2) - 0.5 * s.log(sigma2) … … 420 294 #--- The API -------------------------------------------------------------- 421 295 422 def likelihood(self, param Sequence, v, wiggle):423 """ 424 Call this method with a sequence of arrays I{paramSequence} defining my425 model parameters and a C{[pn]} array of volatility shocks I{v}to be296 def likelihood(self, paramContainer, wiggle): 297 """ 298 Call this method with a parameter object that defines (1) a set of 299 values for my parameters and (2) an array of volatility shocks to be 426 300 used as a starting point for another simulation draw of volatilities. 427 301 … … 435 309 436 310 The value of that log-likelihood at the end of the log-volatility 437 simulation is returned along with the withthe volatility shocks311 simulation is returned in a tuple, preceded by the volatility shocks 438 312 underlying the log-volatilities drawn at that point. 439 313 … … 442 316 calling this method. If the prior probability of any parameter is zero, 443 317 the posterior density for that parameter will also be zero no matter 444 what, making any call to this method that uses it a waste of computing 445 time. 446 """ 447 self.setParamSequence(paramSequence) 318 what, making any call to this method with it a waste of computing time. 319 """ 320 if paramContainer.v: 321 self.nw(paramContainer.v) 322 self.setParamSequence(paramContainer.paramSequence()) 448 323 # Obtain innovations as residuals of the reversed VAR(1) 449 324 x = self.var.reverse(self.a, self.b) … … 452 327 for j in xrange(self.M): 453 328 # Draw a proposal array of log-volatilities 454 h = self.draw_h( v,wiggle)329 h = self.draw_h(wiggle) 455 330 # Compute the total log-likelihood conditional on the proposed 456 331 # log-volatilities … … 459 334 if Lp > L or (Lp-L) > self.logUniform(): 460 335 L = Lp 461 vShocks = self.nw .x.copy()462 return L, vShocks336 vShocks = self.nw() 337 return vShocks, L projects/AsynCluster/trunk/svpmc/pmc.py
r135 r150 106 106 setattr(self, name, value) 107 107 108 def _get_paramNames(self):109 return self.mgr.paramNames110 paramNames = property(_get_paramNames)111 112 def _get_N_params(self):113 return len(self.mgr.paramNames)114 N_params = property(_get_N_params)115 116 108 def setupRun(self, N_chains, **kw): 117 109 """ … … 155 147 def proposals(self, X, v): 156 148 """ 157 Returns a 3-tuple of info for jump proposals on the parameters in I{X}, 158 given a jump variance I{v}. Proposals with zero probability of 159 occuring, given the parameters' prior distributions, are censored from 160 the result. (There's no point considering the likelihood of proposals 161 that cannot occur.) 162 163 The tuple contains an array of the valid proposed parameters (if any, 164 given the priors), the log-probability density of the proposals given 165 the parameters' prior distributions, and the log-probability density of 166 the proposal jumps. 149 Returns a 3-tuple of info for jump proposals on the parameter 150 containers in the supplied parameter array I{X}, given a jump variance 151 I{v}. Proposals with zero probability of occuring, given the 152 parameters' prior distributions, are censored from the result. (There's 153 no point considering the likelihood of proposals that cannot occur.) 154 155 The 3-tuple contains: 156 157 1. a parameter array of valid proposed parameter 158 sequences (if any, given the priors), 159 160 2. the log-probability density of the proposals given the 161 parameters' prior distributions, and 162 163 3. the log-probability density of the proposal jumps. 164 167 165 """ 168 166 XD = self.mgr.rProposal(len(X), v) … … 177 175 def weightedProposals(self, X, v): 178 176 """ 179 Returns a deferred that fires with a 1-D array of valid proposals on 180 the supplied parameter array I{X}, given the specified proposal 181 variance I{v}, along with log-importance weights for those proposals. 182 177 Returns a deferred that fires with a parameter array of valid proposals 178 from the parameters in the supplied parameter array I{X}, given the 179 specified proposal variance I{v}, along with log-importance weights for 180 those proposals. 181 183 182 Valid proposals are those with both non-zero prior probability and 184 183 non-zero likelihood. The censoring to only valid proposals means that 185 fewer proposals may be returned than supplied parameter s, possibly no186 p roposals at all.184 fewer proposals may be returned than supplied parameter sequences, 185 possibly no proposals at all. 187 186 188 187 The importance weights are to be combined with those from other calls … … 195 194 """ 196 195 k = 0 197 XP, W = [], []196 XP, W = params.ParamArray, [] 198 197 wfd = defer.waitForDeferred( 199 198 self.queue.call(self.proposals, X, v, series='proposals')) projects/AsynCluster/trunk/svpmc/sample.py
r149 r150 75 75 variance normal distribution. 76 76 77 @ivar x: My current random walk values.78 77 You can call me to access my current array of values, with no argument to 78 get a copy of the array or with a conforming-dimension array of new values. 79 79 """ 80 80 m = 10 … … 83 83 self.x = s.randn(rows, cols) 84 84 self.p = s.zeros_like(self.x) 85 86 def __call__(self, x=None): 87 if x is None: 88 return self.x.copy() 89 if s.shape(x) != self.x.shape: 90 raise ValueError( 91 "You can only update me with an equivalent-dimensioned array") 92 self.x = x.copy() 85 93 86 94 def step(self, wiggle):
