Changeset 172

Show
Ignore:
Timestamp:
05/01/08 21:33:08 (7 months ago)
Author:
edsuom
Message:

Trying to get an actual PMC run going

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/svpmc/model.py

    r171 r172  
    3232 
    3333 
    34 class NullQueue(object): 
    35     """ 
    36     I act like an AsynQueue except that I run everything in the main 
    37     thread. I'm mostly useful for debugging. 
    38     """ 
    39     def __init__(self, *args): 
    40         pass 
    41  
    42     def call(self, f, *args): 
    43         return defer.succeed(f(*args)) 
    44  
    45     def shutdown(self): 
    46         return defer.succeed(None) 
    47  
    48  
    4934class ModelManager(object): 
    5035    """ 
     
    5237 
    5338    I handle calls to the model object in either local or remote mode. Calls in 
    54     local mode are run through an instance of L{NullQueue} in the main thread
     39    local mode are run through my project manager's thread queue
    5540 
    5641    Instantiate me with the text of a specification for the project and model 
     
    7964    ########################################################################### 
    8065    """ 
    81     def __init__(self, paramNames, y): 
    82         self.modelObj = Model(paramNames=paramNames, y=y) 
    83         # DEBUG 
    84         self.queue = NullQueue(1) 
     66    def __init__(self, projectManager, y): 
     67        self.modelObj = Model(paramNames=projectManager.paramNames, y=y) 
     68        self.queue = projectManager.queue 
    8569 
    8670    def _oops(self, failure): 
     
    9680        else: 
    9781            d = defer.succeed(None) 
    98         d.addCallback(lambda _: self.queue.shutdown()) 
    9982        return d 
    10083 
     
    201184    _logU_index = -1 
    202185 
    203     # Mv=20, Mz=30 
    204     keyAttrs = {'y':None, 'Mv':5, 'Mz':10} 
     186    keyAttrs = {'y':None, 'Mv':20, 'Mz':30} 
    205187 
    206188    #--- Properties ----------------------------------------------------------- 
     
    368350            # If the likelihood at this point is so bad that the parameter is 
    369351            # guaranteed not to be resampled, just bail out now and save time 
    370             #if L_total < -10000: 
    371             #    return L_total, s.zeros_like(h0) 
     352            if L_total < -1E20: 
     353                return -s.inf, s.zeros_like(h0) 
    372354        return L_total, h0 
    373355     
  • projects/AsynCluster/trunk/svpmc/params.py

    r171 r172  
    318318 
    319319    """ 
    320     attempts = 2
     320    attempts = 3
    321321    typeChecking = True 
    322322     
     
    379379                    break 
    380380            else: 
    381                 raise ValueError("Failed to generate a valid proposal") 
     381                print "\nWARNING: Failed to generate a valid proposal" 
     382                return paramContainer 
    382383            Lp += s.log(pPriors).sum() 
    383384            Lj += s.log(priorFlexArray.call('pJump', jumps, wiggle)).sum() 
  • projects/AsynCluster/trunk/svpmc/pmc.py

    r171 r172  
    2626from scipy import random 
    2727from twisted.internet import defer 
    28 import asynqueue 
    2928 
    3029import params 
     
    131130 
    132131    """ 
    133     chunkSize = 500 
    134     V = [0.5, 0.1, 0.05] # 0.01 
     132    chunkSize = 100 
     133    V = [0.5, 0.1, 0.02, 0.004] 
    135134     
    136135    def __init__(self, projectManager, socket=None): 
     
    138137        self.pm = projectManager 
    139138        self.mm = projectManager.mgr 
     139        self.queue = projectManager.queue 
    140140        self.resampler = Resampler(logWeights=True)             
    141      
    142     def _get_queue(self): 
    143         if not hasattr(self, '_queue'): 
    144             self._queue = asynqueue.TaskQueue() 
    145             for specialty in ('subsets', 'proposals'): 
    146                 self._queue.attachWorker(asynqueue.ThreadWorker(specialty)) 
    147         return self._queue 
    148     queue = property(_get_queue) 
    149141     
    150142    def initialPopulation(self, N): 
     
    205197        N = len(X) 
    206198        W = s.empty(len(X)) 
    207         wfd = defer.waitForDeferred( 
    208             self.queue.call(self.proposals, X, v, series='proposals')) 
     199        wfd = defer.waitForDeferred(self.queue.call(self.proposals, X, v)) 
    209200        yield wfd 
    210201        XP = wfd.getResult() 
     
    252243              % (N_iter, N_members) 
    253244        for i in xrange(N_iter): 
    254             print "%03d:" % (i+1,), 
     245            print "\n%03d:" % (i+1,), 
    255246            dList, resultList = [], [] 
    256247            for v, I, d in allocator.assembler(resultList): 
     
    265256            wfd.getResult() 
    266257            XP, W = resultList 
     258            #print "\n   %s" % \ 
     259            #      (" ".join(["%9d" % ii for ii in s.arange(N_members)]),) 
     260            #print "W: %s" % (" ".join(["%+9.2g" % w for w in W]),) 
    267261            W = W[s.isfinite(W).nonzero()[0]] 
    268             print "\nW: %s" % (" ".join(["%+9.2g" % w for w in W]),) 
    269262            if len(W): 
    270263                # Resample everything together 
    271264                I = self.resampler(W, N_members) 
    272265                X = XP[I] 
    273                 print "I: %s\n" % (" ".join(["%9d" % ii for ii in I]),) 
     266                #print "I: %s\n" % (" ".join(["%9d" % ii for ii in I]),) 
    274267                allocator.updateAllocations(I) 
    275268            else: 
    276269                # Nothing in the proposed population had any plausibility 
    277270                allocator.updateAllocations() 
    278                 print "I []" 
     271                #print "I []" 
    279272        wfd = defer.waitForDeferred(self.pm.done()) 
    280273        yield wfd 
  • projects/AsynCluster/trunk/svpmc/project.py

    r171 r172  
    2323import scipy as s 
    2424from Scientific.IO import NetCDF 
     25import asynqueue 
    2526 
    2627import tseries, params, model 
     28 
     29 
     30class NullQueue(object): 
     31    """ 
     32    I act like an AsynQueue except that I run everything in the main 
     33    thread. I'm mostly useful for debugging. 
     34    """ 
     35    def __init__(self, *args): 
     36        from twisted.internet import defer 
     37        self.defer = defer 
     38 
     39    def call(self, f, *args): 
     40        return self.defer.succeed(f(*args)) 
     41 
     42    def shutdown(self): 
     43        return self.defer.succeed(None) 
    2744 
    2845 
     
    6683        self.m = m 
    6784        self.iteration = 0 
     85        # Use the NullQueue for debugging 
     86        self.queue = NullQueue() 
     87        #self.queue = asynqueue.ThreadQueue(1) 
    6888        specDir = os.path.dirname(specFile) 
    6989        self.tables = self._parseSpec(specFile) 
     
    7494            os.path.join(specDir, ncFileName), 
    7595            tsData, seriesTitles, paramTitles, dimensions) 
    76         self.mgr = model.ModelManager(self.paramNames, tsData) 
     96        self.mgr = model.ModelManager(self, tsData) 
    7797     
    7898    def _parseSpec(self, filePath): 
     
    248268        """ 
    249269        d = self.mgr.shutdown() 
     270        d.addCallback(lambda _: self.queue.shutdown()) 
    250271        d.addCallback(lambda _: self.cdf.close()) 
    251272        return d