Changeset 166
- Timestamp:
- 04/29/08 22:46:52 (7 months ago)
- Files:
-
- projects/AsynCluster/trunk/svpmc/model.py (modified) (2 diffs)
- projects/AsynCluster/trunk/svpmc/params.py (modified) (5 diffs)
- projects/AsynCluster/trunk/svpmc/pmc.py (modified) (5 diffs)
- projects/AsynCluster/trunk/svpmc/project.py (modified) (3 diffs)
- projects/AsynCluster/trunk/svpmc/test/test_params.py (modified) (1 diff)
- projects/AsynCluster/trunk/svpmc/test/test_project.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/svpmc/model.py
r165 r166 123 123 undergoing some nested MCMC. 124 124 125 Returns a deferred that fires (with C{None}) when the likelihood126 computation is done and the paramContainerhas been updated.125 Returns a deferred that fires with a reference to the paramContainer 126 when the likelihood computation is done and the it has been updated. 127 127 """ 128 128 def gotPackedLikelihood(result): … … 139 139 for k, name in enumerate(('Lx', 'z', 'h')): 140 140 setattr(paramContainer, name, result[k]) 141 return paramContainer 141 142 142 143 if (remote or self.remoteMode) and not local: projects/AsynCluster/trunk/svpmc/params.py
r165 r166 31 31 32 32 33 PARAM_NAMES = [34 # Total parameters: 3*p**2 + 6*p35 #------------------------------------------------------------36 'a', # Drift (p x 1)37 38 'b', # Lag-1 cross-correlation for VAR of returns (p x p)39 40 'cr', # Innovation shock cross-correlation coefficients (vector with41 # r01, r02,..., r0p, r12,..., r1p,...)42 43 'd', # Volatility offset (p x 1)44 45 'e', # Lag-1 cross-correlations for VAR of volatilities46 # (p x p)47 48 'fs', # Volatility shock standard deviations (vector with s0, s1,..., sp)49 50 'fr', # Volatility shock coefficients (vector with51 # r01, r02,..., r0p, r12,..., r1p,...)52 53 'g', # Volatility/innovation shock correlations (p x 1)54 55 ]56 57 58 33 class FlexArray(object): 59 34 """ … … 243 218 """ 244 219 keyAttrs = {'z':[], 'Lx':None, 'Lp':None, 'Lj':None} 245 paramNames = PARAM_NAMES246 220 247 221 … … 336 310 I am a container for array-like instances of L{FlexArray} that contain 337 311 L{Prior} objects for the array elements of my model's parameters. 338 """ 339 paramNames = PARAM_NAMES 340 312 313 @ivar n: The number of samples. 314 315 @ivar p: The number of time series. 316 317 @ivar paramNames: A list of the parameter names in sequence. 318 319 """ 320 def __init__(self, p, n): 321 self.p = p 322 self.n = n 323 341 324 def priorArray(self, paramName, *shape): 342 325 array = getattr(self, paramName, None) … … 352 335 """ 353 336 Lp = 0 354 paramContainer = ParameterContainer() 337 paramContainer = ParameterContainer( 338 paramNames=self.paramNames, z=s.randn(self.p, self.n)) 355 339 for name in self.paramNames: 356 340 priorFlexArray = getattr(self, name) … … 383 367 for name in self.paramNames: 384 368 priorFlexArray = getattr(self, name) 385 jumps = priorArray.call('rJump', wiggle) 386 paramArray = getattr(paramContainer, name) + jumps 387 Lp += s.log(priorFlexArray.call('pdf', paramArray)).sum() 369 for attempt in xrange(self.attempts): 370 jumps = priorFlexArray.call('rJump', wiggle) 371 paramArray = getattr(paramContainer, name) + jumps 372 pPriors = priorFlexArray.call('pdf', paramArray) 373 if s.greater(pPriors, 0).all(): 374 break 375 else: 376 raise ValueError("Failed to generate a valid proposal") 377 Lp += s.log(pPriors).sum() 388 378 Lj += s.log(priorFlexArray.call('pJump', jumps)).sum() 389 379 setattr(newVersion, name, paramArray) projects/AsynCluster/trunk/svpmc/pmc.py
r164 r166 17 17 18 18 """ 19 Monte Carlo sampling of a distribution (typically a posterior) using various 20 methods. 19 Population Monte Carlo sampling. 21 20 """ 22 21 23 import os.path, time24 22 from random import sample as sampleWOR 25 23 26 24 import scipy as s 27 25 from scipy import random 28 from Scientific.IO import NetCDF29 26 from twisted.internet import defer 30 31 27 import asynqueue 32 from twisted_goodies.pybywire.pack import Packer, Unpacker 33 from asyncluster.master import jobs 34 35 import model, params 28 29 import params 36 30 from sample import resample 37 31 … … 40 34 """ 41 35 Population Monte Carlo with per Cappe et al. 36 42 37 """ 43 38 chunkSize = 5000 … … 45 40 46 41 def __init__(self, projectManager): 47 self.mgr = projectManager 48 49 def setupRun(self, N_iter, N_members): 50 """ 51 Set up a new run with various parameters and a restarted iteration 52 producer. 53 """ 54 self.N_iter = N_iter 55 self.N_members = N_members 56 self.iterationProducer.restart() 57 42 self.pm = projectManager 43 self.mm = projectManager.mgr 44 58 45 def _get_queue(self): 59 46 if not hasattr(self, '_queue'): … … 64 51 queue = property(_get_queue) 65 52 66 def subsetIndex(self, k): 67 """ 68 Returns a subset index for the samples in my I{X} attribute that 69 correspond to the jump variance for the supplied index I{k}. 70 """ 71 I = sampleWOR(self.Is, self.R[k]) 72 self.Is = s.setdiff1d(self.Is, I) 73 return I 53 def initialPopulation(self, N): 54 """ 55 Generates a new population of I{N} parameters from the priors and 56 returns a 1-D FlexArray of their parameter containers. 57 """ 58 X = params.FlexArray(N) 59 for k, paramContainer in enumerate(X): 60 X[k] = self.pm.priors.new() 61 return X 74 62 75 63 def proposals(self, X, v): 76 64 """ 77 Returns a 3-tuple of info for jump proposals on the parameter 78 containers in the supplied 1-D parameter array I{X}, given a jump 79 deviation I{v}. Proposals with zero probability of occuring, given the 80 parameters' prior distributions, are censored from the result. (There's 81 no point considering the likelihood of proposals that cannot occur.) 82 83 The 3-tuple contains like-dimensioned 1-D arrays, with the following 84 respective elements: 85 86 1. A valid proposed parameter container. 87 88 2. The log-probability density of the proposal given the 89 parameters' prior distributions. 90 91 3. The log-probability density of the proposal jumps. 92 93 """ 94 XD = self.mgr.rProposal(len(X), v) 95 XP = X + XD 96 pPriors = self.mgr.pPriors(XP) 97 I = s.greater(pPriors, 0).all(1).nonzero() 98 return XP[I], \ 99 s.log(pPriors[I]).sum(1), \ 100 s.log(self.mgr.pProposal(XD[I], v)).sum(1) 65 Returns a 1-D FlexArray of parameter containers resulting in proposals 66 from the supplied FlexArray of parameter containers I{X}, given the 67 specified jump deviation I{v}. 68 69 Proposals with zero probability of occuring, given the parameters' 70 prior distributions, are censored. (There's no point considering the 71 likelihood of proposals that cannot occur.) 72 """ 73 XP = params.FlexArray(len(X)) 74 for k, paramContainer in enumerate(X): 75 XP[k] = self.pm.priors.proposal(paramContainer, v) 76 return XP 101 77 102 78 @defer.deferredGenerator 103 79 def weightedProposals(self, X, v): 104 80 """ 105 Returns a deferred that fires with a parameter array of valid proposals 106 from the parameters in the supplied parameter array I{X}, given the 107 specified proposal variance I{v}, along with log-importance weights for 108 those proposals. 109 110 Valid proposals are those with both non-zero prior probability and 111 non-zero likelihood. The censoring to only valid proposals means that 112 fewer proposals may be returned than supplied parameter sequences, 113 possibly no proposals at all. 81 Returns a deferred that fires with a 1-D FlexArray of proposals from 82 the parameter containers in the supplied FlexArray I{X}, given the 83 specified proposal variance I{v}, along with a like-dimensioned array 84 of linear, fractional importance weights for the proposals. 114 85 115 86 The importance weights are to be combined with those from other calls … … 119 90 proportional to its likelihood and prior probability density, and 120 91 inversely proportional to the probability density of the proposal 121 offest. 122 """ 123 k = 0 124 XP, W = [], [] 92 offset. 93 """ 94 def weight(paramContainer): 95 logWeight = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj 96 if s.isfinite(logWeight): 97 return s.exp(logWeight) 98 return 0.0 99 100 j = 0 101 N = len(X) 102 W = s.empty(len(X)) 125 103 wfd = defer.waitForDeferred( 126 104 self.queue.call(self.proposals, X, v, series='proposals')) 127 105 yield wfd; 128 XPJ = wfd.getResult() 129 fullChunks, partialChunkSize = divmod(len(XPJ[0]), self.chunkSize) 130 chunkSizes = [self.chunkSize]*fullChunks 131 if partialChunkSize: 132 chunkSizes.append(partialChunkSize) 133 for N in chunkSizes: 134 Ic = slice(k, k+N) 135 k += N 136 wfd = defer.waitForDeferred(self.mgr.likelihoods(XPJ[0][Ic])) 137 yield wfd; 138 L = wfd.getResult() 139 If = s.isfinite(L).nonzero()[0] 140 if len(If): 141 XP.append(XPJ[0][Ic][If]) 142 W.append(L[If] + XPJ[1][Ic][If] - XPJ[2][Ic][If]) 143 if XP: 144 arrayXP = params.FlexArray(len(XP)) 145 for k, thisXP in enumerate(XP): 146 arrayXP[k] = thisXP 147 yield arrayXP, s.concatenate(W) 148 else: 149 yield [], [] 150 106 XP = wfd.getResult() 107 while j < N: 108 N_this = min([self.chunkSize, N-j]) 109 # Here's where the vast majority of the CPU time is expended! 110 dList = [ 111 self.mm.likelihood(XP[k], v).addCallback(weight) 112 for k in xrange(j, j+N_this)] 113 wfd = defer.waitForDeferred(defer.gatherResults(dList)) 114 yield wfd 115 W[j:j+N_this] = wfd.getResult() 116 j += N_this 117 return XP, W 118 151 119 @defer.deferredGenerator 152 def run(self, N_ members, **kw):153 """ 154 Does a PMC run with I{N_ members} population members and jump variances155 in the supplied 1-D array I{V}. The number of population members for120 def run(self, N_iter, V=None): 121 """ 122 Does a PMC run with I{N_iter} iterations and the supplied sequence I{V} 123 of jump deviations. The portion of the population members allocated to 156 124 each jump variance will go up or down, depending on the performance for 157 125 that setting. 158 126 159 B{NOTE}: What I've been calling variances are actually computed as160 standard deviations, i.e., not squared. Should fix this either by161 relabeling or adjusting the code.162 163 127 Returns a deferred that fires when the run is done. No output value is 164 provided via the deferred, however. 165 166 Use a provider of L{interfaces.IConsumer} to get the samples as they 167 are produced. Each attached consumer is updated after every iteration 168 with my array of parameter vectors (one vector per row) for each 169 iteration. 128 provided via the deferred, however. Instead, everything is written to 129 my project manager's open NetCDF file. 170 130 171 131 @param N_iter: The number of iterations to produce after burn-in. 172 132 173 @param N_bi: The number of burn-in iterations to discard before174 producing any.175 176 133 @param V: A sequence of variance values to use instead of the default. 177 134 178 135 """ 179 def gotIndex(I, k, v): 180 II[k] = I 181 d = self.weightedProposals(X[I], v) 182 d.addCallback(gotWeightedProposals, k) 183 return d 184 185 def gotWeightedProposals(results, k): 186 XP[k] = results[0] 187 W[k] = results[1] 188 189 def normalize(R): 190 # Rescale so that the proportions are global rather than 191 # individual. Highly successful settings will have a large portion 192 # of the next sample even if they last operated on only a few of 193 # the samples. 136 N_members = self.pm.m 137 # The variance settings (default of 6) 138 if V is None: 139 V = self.V 140 allocator = Allocator(N_members, V) 141 # Initialize some arrays 142 X = self.initialPopulation(N_members) 143 # Iteration 144 for i in xrange(N_iter): 145 dList, resultList = [], [] 146 for v, I, d in allocator.assembler(resultList): 147 d.addCallback(lambda _: self.weightedProposals(X[I], v)) 148 dList.append(d) 149 # Record the previous iteration's results while the nodes work on 150 # the current iteration... 151 self.pm.writeParams(X) 152 wfd = defer.waitForDeferred(defer.DeferredList(dList)) 153 yield wfd; wfd.getResult() 154 XP, W = resultList 155 # Resample everything together 156 I = resample(W, N_members) 157 X = XP[I] 158 allocator.updateAllocations(I) 159 self.recorder.done() 160 161 162 class Allocator(object): 163 """ 164 Construct me with a population size I{N} and a sequence I{V} of jump 165 deviations. 166 """ 167 def __init__(self, N, V): 168 self.N = N 169 self.V = V 170 self.P = len(V) 171 self.rMin = max([2, int(round(0.01 * N))]) 172 self.rMax = N - self.rMin*(self.P-1) 173 self.updateAllocations() 174 175 def subsetIndex(self, k): 176 """ 177 Returns a subset index for population members that correspond to the 178 jump deviation for the supplied index I{k}. 179 """ 180 I = sampleWOR(self.Is, self.R[k]) 181 self.Is = s.setdiff1d(self.Is, I) 182 return I 183 184 def assembler(self, resultList): 185 """ 186 Call this method with a reference to an empty list that will hold 187 assembled results. 188 189 For each of my jump deviations, the method will yield the deviation 190 value, a 1-D array of indices for the subset of my population allocated 191 to that deviation value, and a fresh deferred already fired with 192 C{None}. You must add a callback to the deferred for each iteration 193 that returns one or more 1-D arrays of the same length as the yielded 194 index array. 195 196 Each returned array for each subset will be assembled back into a 197 single population-size array and placed into the supplied list, in 198 order. 199 """ 200 def gotResults(results, I): 201 if not resultList: 202 for result in results: 203 if isinstance(result, (int, float)): 204 resultList.append(s.empty(self.N)) 205 else: 206 resultList.append(params.FlexArray(self.N)) 207 for k, result in enumerate(results): 208 resultList[k][I] = result 209 210 if resultList != []: 211 raise ValueError( 212 "You must supply an empty list to hold your assembled results") 213 self.II = [None] * self.P 214 for k in s.flipud(s.argsort(self.R)): 215 I = self.II[k] = self.subsetIndex(k) 216 d = defer.succeed(None) 217 yield self.V[k], I, d 218 d.addCallback(gotResults, I) 219 220 def updateAllocations(self, I=None): 221 """ 222 Rescales my allocations based on the supplied 1-D array of resampling 223 Highly successful allocations will have a large portion of the next 224 allocation even if they last operated on only a few members of the 225 population. 226 """ 227 if I is None: 228 R = self.N * s.ones(self.P) / self.P 229 elif hasattr(self, 'II'): 230 R = s.array( 231 [sum([x in self.II[k] for x in I]) for k in xrange(self.P)]) 194 232 R = R.astype(float) / R.sum() 195 # Turn the scaled array into an array of subsample sizes, with a 196 # minimum size of two apiece. 197 R = s.clip( 198 s.round_(N_members*R), rMin, N_members-rMin*(P-1)).astype(int) 199 # Twiddle the biggest one as needed to keep sum = N_members 200 R[s.argmax(R)] += N_members - sum(R) 201 # Replace the old list and subset index 202 self.R = R 203 self.Is = s.arange(N_members) 204 205 N_iter, N_bi = self.setupRun(N_members, **kw) 206 # The variance settings (default of 6) 207 V = kw.pop('V', self.V) 208 # Some constants 209 P = len(V) 210 rMin = max([2, int(round(0.01*N_members))]) 211 # Initialize some arrays 212 X = self.mgr.rPriors(N_members) 213 normalize(s.ones(P)) 214 V = s.array(V) 215 # Iteration 216 for i in xrange(N_iter + N_bi): 217 t0 = time.time() 218 # Generate and weight some proposals 219 XP, W, II = [[None]*P for k in (1,2,3)] 220 dList = [] 221 # Get the iteration going with the biggest subpopulations first 222 for k in s.flipud(s.argsort(self.R)): 223 d = self.queue.call(self.subsetIndex, k, series='subsets') 224 d.addCallback(gotIndex, k, V[k]) 225 dList.append(d) 226 d = defer.DeferredList(dList) 227 # Provide the previous iteration's results to any consumers while 228 # the nodes work on the current iteration... 229 if i >= N_bi: 230 self.iterationProducer.gotNext(X) 231 # ...then yield until the nodes are done 232 wfd = defer.waitForDeferred(d) 233 yield wfd; wfd.getResult() 234 # Resample everything together 235 Wnz = [w for w in W if len(w)] 236 if Wnz: 237 I = resample(s.concatenate(Wnz), N_members, logWeights=True) 238 if len(I): 239 X = s.row_stack([XP[k] for k in xrange(P) if len(W[k])])[I] 240 R = s.array([ 241 sum([x in II[k] for x in I]) for k in xrange(P)]) 242 else: 243 R = s.ones(P) 244 # Normalize R to maintain a total of N_members population members 245 normalize(R) 246 # Provide some info 247 vInfo = ", ".join(["%4d" % r for r in self.R]) 248 msg = "%04d, %5.2f sec, VR={%s} : %s" % ( 249 i, time.time()-t0, vInfo, 250 ", ".join(["%9.3g" % x for x in X.mean(0)])) 251 print msg 252 self.recorder.done() 233 else: 234 raise AttributeError( 235 "You can only update with an index array after completion "+\ 236 "of an assembler run") 237 238 # Turn the scaled array into an array of subsample sizes, with a 239 # minimum size of two apiece. 240 R = s.clip(s.round_(self.N*R), self.rMin, self.rMax).astype(int) 241 # Twiddle the biggest one as needed to keep sum = Number of members 242 R[s.argmax(R)] += - sum(R) 243 # Replace the old list and subset index 244 self.R = R 245 self.Is = s.arange(self.N) 246 projects/AsynCluster/trunk/svpmc/project.py
r165 r166 46 46 @ivar m: The number of population members in each population monte carlo 47 47 iteration. 48 49 @ivar paramNames: A list of the names of parameter arrays in the parameter 50 sequences used in the project. 51 52 @ivar priors: An instance of L{params.PriorContainer} constructed in 53 accordance with my project specification. 48 54 49 55 """ … … 63 69 self.tables = self._parseSpec(specFile) 64 70 tsList, seriesTitles = self._setupTimeSeries(specDir) 71 tsData = s.row_stack([ts() for ts in tsList]) 72 self.p, self.n = tsData.shape 65 73 paramTitles, dimensions = self._setupParams() 66 74 self._setupCDF( 67 75 os.path.join(specDir, ncFileName), 68 ts List, seriesTitles, paramTitles, dimensions)76 tsData, seriesTitles, paramTitles, dimensions) 69 77 self.mgr = model.ModelManager(self.paramNames, tsList) 70 78 … … 164 172 self.paramNames = [] 165 173 titles, dimensions = [], [] 166 p = self.p = len(self.tables['series']) 167 xcorrs = self.xcorrs = int(0.5*(p**2 + p)) 168 self.paramContainer = params.PriorContainer() 174 xcorrs = self.xcorrs = int(0.5*(self.p**2 + self.p)) 175 self.priors = params.PriorContainer(self.p, self.n) 169 176 for name, dims, title in self.tables['parameter']: 170 177 shape = parseDims(dims) 171 pa = self.p aramContainer.priorArray(name, *shape)178 pa = self.priors.priorArray(name, *shape) 172 179 for priorSpec in self.tables[name]: 173 180 constructPriors(priorSpec, shape) 174 181 titles.append(title) 175 182 self.paramNames.append(name) 183 self.priors.paramNames = self.paramNames 176 184 return titles, dimensions 177 185 178 def _setupCDF(self, filePath, tsList, sTitles, pTitles, pDims): 179 """ 180 """ 181 tsData = s.row_stack([ts() for ts in tsList]) 182 self.n = tsData.shape[1] 186 def _setupCDF(self, filePath, tsData, sTitles, pTitles, pDims): 187 """ 188 """ 183 189 cdf = self.cdf = NetCDF.NetCDFFile(filePath, 'w') 184 190 cdf.title = self.tables['project'][0][0] projects/AsynCluster/trunk/svpmc/test/test_params.py
r165 r166 217 217 218 218 def setUp(self): 219 self.priorContainer = params.PriorContainer( )219 self.priorContainer = params.PriorContainer(3, 100) 220 220 self.priorContainer.paramNames = ['a', 'b', 'c'] 221 221 kw = {'dname':'norm', 'loc':0} projects/AsynCluster/trunk/svpmc/test/test_project.py
r165 r166 33 33 class Testable_ProjectManager(project.ProjectManager): 34 34 def __init__(self): 35 pass 36 35 self.p = 3 36 self.n = 937 37 37 38 38 39 class Test_ProjectManager(util.TestCase): … … 76 77 for title in titles: 77 78 self.failUnless(isinstance(title, str)) 78 container = self.mgr.p aramContainer79 container = self.mgr.priors 79 80 self.failUnlessEqual(container.a.shape, (3,)) 80 81 self.failUnlessEqual(container.b.shape, (3,3)) … … 92 93 self.mgr.m = 10000 93 94 tsList, seriesTitles = self.mgr._setupTimeSeries(self.specDir) 95 tsData = s.row_stack([ts() for ts in tsList]) 94 96 paramTitles, dimensions = self.mgr._setupParams() 95 97 cdf = self.mgr._setupCDF( 96 cdfPath, ts List, seriesTitles, paramTitles, dimensions)98 cdfPath, tsData, seriesTitles, paramTitles, dimensions) 97 99 # Make sure we can write to the open file 98 100 var = cdf.variables['a']
