Changeset 172
- Timestamp:
- 05/01/08 21:33:08 (7 months ago)
- Files:
-
- projects/AsynCluster/trunk/svpmc/model.py (modified) (6 diffs)
- projects/AsynCluster/trunk/svpmc/params.py (modified) (2 diffs)
- projects/AsynCluster/trunk/svpmc/pmc.py (modified) (6 diffs)
- projects/AsynCluster/trunk/svpmc/project.py (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/svpmc/model.py
r171 r172 32 32 33 33 34 class NullQueue(object):35 """36 I act like an AsynQueue except that I run everything in the main37 thread. I'm mostly useful for debugging.38 """39 def __init__(self, *args):40 pass41 42 def call(self, f, *args):43 return defer.succeed(f(*args))44 45 def shutdown(self):46 return defer.succeed(None)47 48 49 34 class ModelManager(object): 50 35 """ … … 52 37 53 38 I handle calls to the model object in either local or remote mode. Calls in 54 local mode are run through an instance of L{NullQueue} in the main thread.39 local mode are run through my project manager's thread queue. 55 40 56 41 Instantiate me with the text of a specification for the project and model … … 79 64 ########################################################################### 80 65 """ 81 def __init__(self, paramNames, y): 82 self.modelObj = Model(paramNames=paramNames, y=y) 83 # DEBUG 84 self.queue = NullQueue(1) 66 def __init__(self, projectManager, y): 67 self.modelObj = Model(paramNames=projectManager.paramNames, y=y) 68 self.queue = projectManager.queue 85 69 86 70 def _oops(self, failure): … … 96 80 else: 97 81 d = defer.succeed(None) 98 d.addCallback(lambda _: self.queue.shutdown())99 82 return d 100 83 … … 201 184 _logU_index = -1 202 185 203 # Mv=20, Mz=30 204 keyAttrs = {'y':None, 'Mv':5, 'Mz':10} 186 keyAttrs = {'y':None, 'Mv':20, 'Mz':30} 205 187 206 188 #--- Properties ----------------------------------------------------------- … … 368 350 # If the likelihood at this point is so bad that the parameter is 369 351 # guaranteed not to be resampled, just bail out now and save time 370 #if L_total < -10000:371 # return L_total, s.zeros_like(h0)352 if L_total < -1E20: 353 return -s.inf, s.zeros_like(h0) 372 354 return L_total, h0 373 355 projects/AsynCluster/trunk/svpmc/params.py
r171 r172 318 318 319 319 """ 320 attempts = 20320 attempts = 30 321 321 typeChecking = True 322 322 … … 379 379 break 380 380 else: 381 raise ValueError("Failed to generate a valid proposal") 381 print "\nWARNING: Failed to generate a valid proposal" 382 return paramContainer 382 383 Lp += s.log(pPriors).sum() 383 384 Lj += s.log(priorFlexArray.call('pJump', jumps, wiggle)).sum() projects/AsynCluster/trunk/svpmc/pmc.py
r171 r172 26 26 from scipy import random 27 27 from twisted.internet import defer 28 import asynqueue29 28 30 29 import params … … 131 130 132 131 """ 133 chunkSize = 500134 V = [0.5, 0.1, 0.0 5] # 0.01132 chunkSize = 100 133 V = [0.5, 0.1, 0.02, 0.004] 135 134 136 135 def __init__(self, projectManager, socket=None): … … 138 137 self.pm = projectManager 139 138 self.mm = projectManager.mgr 139 self.queue = projectManager.queue 140 140 self.resampler = Resampler(logWeights=True) 141 142 def _get_queue(self):143 if not hasattr(self, '_queue'):144 self._queue = asynqueue.TaskQueue()145 for specialty in ('subsets', 'proposals'):146 self._queue.attachWorker(asynqueue.ThreadWorker(specialty))147 return self._queue148 queue = property(_get_queue)149 141 150 142 def initialPopulation(self, N): … … 205 197 N = len(X) 206 198 W = s.empty(len(X)) 207 wfd = defer.waitForDeferred( 208 self.queue.call(self.proposals, X, v, series='proposals')) 199 wfd = defer.waitForDeferred(self.queue.call(self.proposals, X, v)) 209 200 yield wfd 210 201 XP = wfd.getResult() … … 252 243 % (N_iter, N_members) 253 244 for i in xrange(N_iter): 254 print " %03d:" % (i+1,),245 print "\n%03d:" % (i+1,), 255 246 dList, resultList = [], [] 256 247 for v, I, d in allocator.assembler(resultList): … … 265 256 wfd.getResult() 266 257 XP, W = resultList 258 #print "\n %s" % \ 259 # (" ".join(["%9d" % ii for ii in s.arange(N_members)]),) 260 #print "W: %s" % (" ".join(["%+9.2g" % w for w in W]),) 267 261 W = W[s.isfinite(W).nonzero()[0]] 268 print "\nW: %s" % (" ".join(["%+9.2g" % w for w in W]),)269 262 if len(W): 270 263 # Resample everything together 271 264 I = self.resampler(W, N_members) 272 265 X = XP[I] 273 print "I: %s\n" % (" ".join(["%9d" % ii for ii in I]),)266 #print "I: %s\n" % (" ".join(["%9d" % ii for ii in I]),) 274 267 allocator.updateAllocations(I) 275 268 else: 276 269 # Nothing in the proposed population had any plausibility 277 270 allocator.updateAllocations() 278 print "I []"271 #print "I []" 279 272 wfd = defer.waitForDeferred(self.pm.done()) 280 273 yield wfd projects/AsynCluster/trunk/svpmc/project.py
r171 r172 23 23 import scipy as s 24 24 from Scientific.IO import NetCDF 25 import asynqueue 25 26 26 27 import tseries, params, model 28 29 30 class NullQueue(object): 31 """ 32 I act like an AsynQueue except that I run everything in the main 33 thread. I'm mostly useful for debugging. 34 """ 35 def __init__(self, *args): 36 from twisted.internet import defer 37 self.defer = defer 38 39 def call(self, f, *args): 40 return self.defer.succeed(f(*args)) 41 42 def shutdown(self): 43 return self.defer.succeed(None) 27 44 28 45 … … 66 83 self.m = m 67 84 self.iteration = 0 85 # Use the NullQueue for debugging 86 self.queue = NullQueue() 87 #self.queue = asynqueue.ThreadQueue(1) 68 88 specDir = os.path.dirname(specFile) 69 89 self.tables = self._parseSpec(specFile) … … 74 94 os.path.join(specDir, ncFileName), 75 95 tsData, seriesTitles, paramTitles, dimensions) 76 self.mgr = model.ModelManager(self .paramNames, tsData)96 self.mgr = model.ModelManager(self, tsData) 77 97 78 98 def _parseSpec(self, filePath): … … 248 268 """ 249 269 d = self.mgr.shutdown() 270 d.addCallback(lambda _: self.queue.shutdown()) 250 271 d.addCallback(lambda _: self.cdf.close()) 251 272 return d
