Changeset 171
- Timestamp:
- 05/01/08 15:10:42 (7 months ago)
- Files:
-
- projects/AsynCluster/trunk/doc/svpmc/example/svpmc.conf (moved) (moved from projects/AsynCluster/trunk/doc/svpmc/example/project-spec.txt)
- projects/AsynCluster/trunk/doc/svpmc/example/sv_pmc.py (added)
- projects/AsynCluster/trunk/setup.py (modified) (5 diffs)
- projects/AsynCluster/trunk/svpmc/model.py (modified) (7 diffs)
- projects/AsynCluster/trunk/svpmc/params.py (modified) (1 diff)
- projects/AsynCluster/trunk/svpmc/pmc.py (modified) (9 diffs)
- projects/AsynCluster/trunk/svpmc/project.py (modified) (1 diff)
- projects/AsynCluster/trunk/svpmc/sample.py (modified) (2 diffs)
- projects/AsynCluster/trunk/svpmc/test/svpmc.conf (moved) (moved from projects/AsynCluster/trunk/svpmc/test/project-spec.txt)
- projects/AsynCluster/trunk/svpmc/test/test_params.py (modified) (2 diffs)
- projects/AsynCluster/trunk/svpmc/weave.py (modified) (1 diff)
- projects/AsynCluster/trunk/sv_pmc (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/setup.py
r119 r171 40 40 41 41 ### Define setup options 42 kw = {'version':'0. 3',42 kw = {'version':'0.4', 43 43 'license':'GPL', 44 44 'platforms':'OS Independent', … … 55 55 'scripts':['ndm', 'console', 'coreworker'], 56 56 57 'zip_safe':True 57 'zip_safe':True, 58 'include_package_data':True, 58 59 } 59 60 … … 62 63 'taskqueue', 'queue', 'priority', 'tasks', 'jobs', 63 64 'cluster', 'clustering', 'parallel', 'grid', 64 ' genetic', 'evolution', 'evolutionary computing', 'GE', 'GA', 'GP']65 'Monte Carlo', 'stochastic', 'stochastic volatility', 'modeling'] 65 66 66 67 kw['classifiers'] = [ … … 79 80 80 81 kw['description'] = " ".join(""" 81 Asynchronous cluster management based on the Twisted framework, with 82 evolutionary computing tools for asynchronous node processing.82 Asynchronous cluster management based on the Twisted framework, with a 83 Population Monte Carlo package included as a usage example. 83 84 """.split("\n")) 84 85 … … 86 87 Asynchronous operation of a computing cluster with a Node Display Manager (NDM) 87 88 that allows regular workstation usage of cluster nodes with computing jobs 88 running behind the scenes. Includes evolutionary computing tools (under 89 construction) that make effective use of the asynchronous node processing 90 capabilities that are provided. 89 running behind the scenes. As a usage example, a package 'svpmc' for Population 90 Monte Carlo inference of multivariate stochastic volatility models is included. 91 91 """.split("\n")) 92 92 projects/AsynCluster/trunk/svpmc/model.py
r168 r171 23 23 from scipy import linalg 24 24 25 from twisted.internet import defer , reactor25 from twisted.internet import defer 26 26 27 27 from asyncluster.master import jobs … … 81 81 def __init__(self, paramNames, y): 82 82 self.modelObj = Model(paramNames=paramNames, y=y) 83 # DEBUG 83 84 self.queue = NullQueue(1) 84 reactor.addSystemEventTrigger(85 'before', 'shutdown', self.queue.shutdown)86 85 87 86 def _oops(self, failure): 88 87 print "FAILURE:\n%s" % failure.getTraceback() 88 89 def shutdown(self): 90 """ 91 Call this when I'm done being used. Returns a deferred that fires when 92 everything's shut down. 93 """ 94 if hasattr(self, 'client'): 95 d = self.client.shutdown() 96 else: 97 d = defer.succeed(None) 98 d.addCallback(lambda _: self.queue.shutdown()) 99 return d 89 100 90 101 def setLocalMode(self): … … 103 114 return defer.succeed(None) 104 115 self.client = jobs.JobClient(socket, codeString=self.nodecode) 105 reactor.addSystemEventTrigger(106 'before', 'shutdown', self.client.shutdown)107 116 d = self.client.startup() 108 117 d.addCallback( … … 121 130 Updates the container in-place with the log-likelihood and an array of 122 131 volatility shocks on which the likelihood computation is based, after 123 undergoing some nested MCMC. 132 undergoing some nested MCMC using the specified jump deviation 133 I{sigma}. 124 134 125 135 Returns a deferred that fires with a reference to the paramContainer … … 191 201 _logU_index = -1 192 202 193 keyAttrs = {'y':None, 'Mv':50, 'Mz':50} 203 # Mv=20, Mz=30 204 keyAttrs = {'y':None, 'Mv':5, 'Mz':10} 194 205 195 206 #--- Properties ----------------------------------------------------------- … … 355 366 h0 = LV[2][:,0] 356 367 L_total += LV[0] 368 # If the likelihood at this point is so bad that the parameter is 369 # guaranteed not to be resampled, just bail out now and save time 370 #if L_total < -10000: 371 # return L_total, s.zeros_like(h0) 357 372 return L_total, h0 358 373 … … 394 409 395 410 3. The IID normal variates underlying the log-volatilities. 396 397 411 398 412 The returned likelihood does not consider the prior probability of the projects/AsynCluster/trunk/svpmc/params.py
r168 r171 367 367 "You must supply a ParameterContainer as the first "+\ 368 368 "argument, not a '%s'" % type(paramContainer)) 369 newVersion = ParameterContainer(z=s.array(paramContainer.z).copy()) 369 newVersion = ParameterContainer( 370 paramNames=self.paramNames, z=s.array(paramContainer.z).copy()) 370 371 Lp, Lj = 0, 0 371 372 for name in self.paramNames: projects/AsynCluster/trunk/svpmc/pmc.py
r169 r171 20 20 """ 21 21 22 import sys 22 23 from random import sample as sampleWOR 23 24 … … 37 38 """ 38 39 def __init__(self, N, V): 40 if N < 2*len(V): 41 raise ValueError( 42 "Population size must be at least twice the number "+\ 43 "of jump deviations.") 39 44 self.N = N 40 45 self.V = V … … 126 131 127 132 """ 128 chunkSize = 5000 129 V = [1.0, 0.5, 0.1, 0.05, 0.01] 130 131 def __init__(self, projectManager): 133 chunkSize = 500 134 V = [0.5, 0.1, 0.05] # 0.01 135 136 def __init__(self, projectManager, socket=None): 137 self.socket = socket 132 138 self.pm = projectManager 133 139 self.mm = projectManager.mgr 134 self.resampler = Resampler( )140 self.resampler = Resampler(logWeights=True) 135 141 136 142 def _get_queue(self): … … 147 153 returns a 1-D FlexArray of their parameter containers. 148 154 """ 155 print "Initializing population:" 149 156 X = params.FlexArray(N) 150 157 for k, paramContainer in enumerate(X): 151 158 X[k] = self.pm.priors.new() 159 print ".", 160 sys.stdout.flush() 152 161 return X 153 162 … … 184 193 """ 185 194 def weight(paramContainer): 186 logWeight = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj 187 if s.isfinite(logWeight): 188 return s.exp(logWeight) 189 return 0.0 195 if s.isfinite(paramContainer.Lx): 196 L = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj 197 print "-+"[int(L>-1000)], 198 else: 199 print "|", 200 L = -s.inf 201 sys.stdout.flush() 202 return L 190 203 191 204 j = 0 … … 194 207 wfd = defer.waitForDeferred( 195 208 self.queue.call(self.proposals, X, v, series='proposals')) 196 yield wfd ;209 yield wfd 197 210 XP = wfd.getResult() 198 211 while j < N: … … 225 238 226 239 """ 240 if self.socket: 241 wfd = defer.waitForDeferred(self.mm.setRemoteMode(self.socket)) 242 yield wfd; wfd.getResult() 227 243 N_members = self.pm.m 228 244 # The variance settings (default of 6) … … 233 249 X = self.initialPopulation(N_members) 234 250 # Iteration 251 print "\nRunning %d iterations with %d population members" \ 252 % (N_iter, N_members) 235 253 for i in xrange(N_iter): 254 print "%03d:" % (i+1,), 236 255 dList, resultList = [], [] 237 256 for v, I, d in allocator.assembler(resultList): … … 240 259 # Record the previous iteration's results while the nodes work on 241 260 # the current iteration... 242 self.pm.writeParams(X) 261 if i > 0: 262 self.pm.writeParams(X) 243 263 wfd = defer.waitForDeferred(defer.DeferredList(dList)) 244 yield wfd; wfd.getResult() 264 yield wfd 265 wfd.getResult() 245 266 XP, W = resultList 246 # Resample everything together 247 I = self.resampler(W, N_members) 248 X = XP[I] 249 allocator.updateAllocations(I) 250 self.pm.done() 251 252 267 W = W[s.isfinite(W).nonzero()[0]] 268 print "\nW: %s" % (" ".join(["%+9.2g" % w for w in W]),) 269 if len(W): 270 # Resample everything together 271 I = self.resampler(W, N_members) 272 X = XP[I] 273 print "I: %s\n" % (" ".join(["%9d" % ii for ii in I]),) 274 allocator.updateAllocations(I) 275 else: 276 # Nothing in the proposed population had any plausibility 277 allocator.updateAllocations() 278 print "I []" 279 wfd = defer.waitForDeferred(self.pm.done()) 280 yield wfd 281 wfd.getResult() 282 print "\nPMC Run done" 283 284 285 projects/AsynCluster/trunk/svpmc/project.py
r168 r171 247 247 Call this when the project is done. 248 248 """ 249 self.cdf.close() 250 249 d = self.mgr.shutdown() 250 d.addCallback(lambda _: self.cdf.close()) 251 return d 252 projects/AsynCluster/trunk/svpmc/sample.py
r168 r171 51 51 if self.logWeights: 52 52 W = s.exp(W - W.max()) 53 if not W.sum(): 54 raise ValueError("Undefined result with all zero weights") 55 if not s.isfinite(W).all(): 56 W = 1 - s.isfinite(W) 53 57 W = W.astype(float) / sum(W) 54 58 I0 = (s.isfinite(W) * s.greater(W, 1E-7)).nonzero()[0] … … 59 63 return [] 60 64 if K == 1: 61 return [0]*N65 return I0.repeat(N) 62 66 x = s.zeros((K,2)) 63 67 x[:,0] = K*W[I0] projects/AsynCluster/trunk/svpmc/test/test_params.py
r168 r171 227 227 228 228 def test_init(self): 229 for k, name in enumerate(self.priorContainer.paramNames): 229 for k, name in enumerate(util.PARAM_NAMES): 230 self.failUnlessEqual(name, self.priorContainer.paramNames[k]) 230 231 priorArray = getattr(self.priorContainer, name) 231 232 self.failUnlessEqual(priorArray.shape, self.MPC.paramShapes[k]) … … 275 276 276 277 def test_proposal_dist(self): 277 N = 5000278 N = 1000 278 279 sigma = 0.12 279 280 paramArray = params.FlexArray(N) projects/AsynCluster/trunk/svpmc/weave.py
r164 r171 71 71 def _get_code(self): 72 72 if not hasattr(self, '_code'): 73 moduleName = self.__class__.__module__.split(".")[-1] 73 74 self._code = self._parseCode( 74 resource_string(__name__, "%s.c" % self.__class__.__module__))75 resource_string(__name__, "%s.c" % moduleName)) 75 76 return self._code 76 77 code = property(_get_code)
