Changeset 171

Show
Ignore:
Timestamp:
05/01/08 15:10:42 (7 months ago)
Author:
edsuom
Message:

Finally doing some inital runs & debugging

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/setup.py

    r119 r171  
    4040 
    4141### Define setup options 
    42 kw = {'version':'0.3', 
     42kw = {'version':'0.4', 
    4343      'license':'GPL', 
    4444      'platforms':'OS Independent', 
     
    5555      'scripts':['ndm', 'console', 'coreworker'], 
    5656       
    57       'zip_safe':True 
     57      'zip_safe':True, 
     58      'include_package_data':True, 
    5859      } 
    5960 
     
    6263    'taskqueue', 'queue', 'priority', 'tasks', 'jobs', 
    6364    'cluster', 'clustering', 'parallel', 'grid', 
    64     'genetic', 'evolution', 'evolutionary computing', 'GE', 'GA', 'GP'] 
     65    'Monte Carlo', 'stochastic', 'stochastic volatility', 'modeling'] 
    6566 
    6667kw['classifiers'] = [ 
     
    7980 
    8081kw['description'] = " ".join(""" 
    81 Asynchronous cluster management based on the Twisted framework, with 
    82 evolutionary computing tools for asynchronous node processing
     82Asynchronous cluster management based on the Twisted framework, with a 
     83Population Monte Carlo package included as a usage example
    8384""".split("\n")) 
    8485 
     
    8687Asynchronous operation of a computing cluster with a Node Display Manager (NDM) 
    8788that allows regular workstation usage of cluster nodes with computing jobs 
    88 running behind the scenes. Includes evolutionary computing tools (under 
    89 construction) that make effective use of the asynchronous node processing 
    90 capabilities that are provided. 
     89running behind the scenes. As a usage example, a package 'svpmc' for Population 
     90Monte Carlo inference of multivariate stochastic volatility models is included. 
    9191""".split("\n")) 
    9292 
  • projects/AsynCluster/trunk/svpmc/model.py

    r168 r171  
    2323from scipy import linalg 
    2424 
    25 from twisted.internet import defer, reactor 
     25from twisted.internet import defer 
    2626 
    2727from asyncluster.master import jobs 
     
    8181    def __init__(self, paramNames, y): 
    8282        self.modelObj = Model(paramNames=paramNames, y=y) 
     83        # DEBUG 
    8384        self.queue = NullQueue(1) 
    84         reactor.addSystemEventTrigger( 
    85             'before', 'shutdown',  self.queue.shutdown) 
    8685 
    8786    def _oops(self, failure): 
    8887        print "FAILURE:\n%s" % failure.getTraceback() 
     88 
     89    def shutdown(self): 
     90        """ 
     91        Call this when I'm done being used. Returns a deferred that fires when 
     92        everything's shut down. 
     93        """ 
     94        if hasattr(self, 'client'): 
     95            d = self.client.shutdown() 
     96        else: 
     97            d = defer.succeed(None) 
     98        d.addCallback(lambda _: self.queue.shutdown()) 
     99        return d 
    89100 
    90101    def setLocalMode(self): 
     
    103114            return defer.succeed(None) 
    104115        self.client = jobs.JobClient(socket, codeString=self.nodecode) 
    105         reactor.addSystemEventTrigger( 
    106             'before', 'shutdown',  self.client.shutdown) 
    107116        d = self.client.startup() 
    108117        d.addCallback( 
     
    121130        Updates the container in-place with the log-likelihood and an array of 
    122131        volatility shocks on which the likelihood computation is based, after 
    123         undergoing some nested MCMC. 
     132        undergoing some nested MCMC using the specified jump deviation 
     133        I{sigma}. 
    124134 
    125135        Returns a deferred that fires with a reference to the paramContainer 
     
    191201    _logU_index = -1 
    192202 
    193     keyAttrs = {'y':None, 'Mv':50, 'Mz':50} 
     203    # Mv=20, Mz=30 
     204    keyAttrs = {'y':None, 'Mv':5, 'Mz':10} 
    194205 
    195206    #--- Properties ----------------------------------------------------------- 
     
    355366            h0 = LV[2][:,0] 
    356367            L_total += LV[0] 
     368            # If the likelihood at this point is so bad that the parameter is 
     369            # guaranteed not to be resampled, just bail out now and save time 
     370            #if L_total < -10000: 
     371            #    return L_total, s.zeros_like(h0) 
    357372        return L_total, h0 
    358373     
     
    394409 
    395410            3. The IID normal variates underlying the log-volatilities. 
    396  
    397411         
    398412        The returned likelihood does not consider the prior probability of the 
  • projects/AsynCluster/trunk/svpmc/params.py

    r168 r171  
    367367                "You must supply a ParameterContainer as the first "+\ 
    368368                "argument, not a '%s'" % type(paramContainer)) 
    369         newVersion = ParameterContainer(z=s.array(paramContainer.z).copy()) 
     369        newVersion = ParameterContainer( 
     370            paramNames=self.paramNames, z=s.array(paramContainer.z).copy()) 
    370371        Lp, Lj = 0, 0 
    371372        for name in self.paramNames: 
  • projects/AsynCluster/trunk/svpmc/pmc.py

    r169 r171  
    2020""" 
    2121 
     22import sys 
    2223from random import sample as sampleWOR 
    2324 
     
    3738    """ 
    3839    def __init__(self, N, V): 
     40        if N < 2*len(V): 
     41            raise ValueError( 
     42                "Population size must be at least twice the number "+\ 
     43                "of jump deviations.") 
    3944        self.N = N 
    4045        self.V = V 
     
    126131 
    127132    """ 
    128     chunkSize = 5000 
    129     V = [1.0, 0.5, 0.1, 0.05, 0.01] 
    130      
    131     def __init__(self, projectManager): 
     133    chunkSize = 500 
     134    V = [0.5, 0.1, 0.05] # 0.01 
     135     
     136    def __init__(self, projectManager, socket=None): 
     137        self.socket = socket 
    132138        self.pm = projectManager 
    133139        self.mm = projectManager.mgr 
    134         self.resampler = Resampler() 
     140        self.resampler = Resampler(logWeights=True)             
    135141     
    136142    def _get_queue(self): 
     
    147153        returns a 1-D FlexArray of their parameter containers. 
    148154        """ 
     155        print "Initializing population:" 
    149156        X = params.FlexArray(N) 
    150157        for k, paramContainer in enumerate(X): 
    151158            X[k] = self.pm.priors.new() 
     159            print ".", 
     160            sys.stdout.flush() 
    152161        return X 
    153162 
     
    184193        """ 
    185194        def weight(paramContainer): 
    186             logWeight = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj 
    187             if s.isfinite(logWeight): 
    188                 return s.exp(logWeight) 
    189             return 0.0 
     195            if s.isfinite(paramContainer.Lx): 
     196                L = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj 
     197                print "-+"[int(L>-1000)], 
     198            else: 
     199                print "|", 
     200                L = -s.inf 
     201            sys.stdout.flush() 
     202            return L 
    190203         
    191204        j = 0 
     
    194207        wfd = defer.waitForDeferred( 
    195208            self.queue.call(self.proposals, X, v, series='proposals')) 
    196         yield wfd; 
     209        yield wfd 
    197210        XP = wfd.getResult() 
    198211        while j < N: 
     
    225238         
    226239        """ 
     240        if self.socket: 
     241            wfd = defer.waitForDeferred(self.mm.setRemoteMode(self.socket)) 
     242            yield wfd; wfd.getResult() 
    227243        N_members = self.pm.m 
    228244        # The variance settings (default of 6) 
     
    233249        X = self.initialPopulation(N_members) 
    234250        # Iteration 
     251        print "\nRunning %d iterations with %d population members" \ 
     252              % (N_iter, N_members) 
    235253        for i in xrange(N_iter): 
     254            print "%03d:" % (i+1,), 
    236255            dList, resultList = [], [] 
    237256            for v, I, d in allocator.assembler(resultList): 
     
    240259            # Record the previous iteration's results while the nodes work on 
    241260            # the current iteration... 
    242             self.pm.writeParams(X) 
     261            if i > 0: 
     262                self.pm.writeParams(X) 
    243263            wfd = defer.waitForDeferred(defer.DeferredList(dList)) 
    244             yield wfd; wfd.getResult() 
     264            yield wfd 
     265            wfd.getResult() 
    245266            XP, W = resultList 
    246             # Resample everything together 
    247             I = self.resampler(W, N_members) 
    248             X = XP[I] 
    249             allocator.updateAllocations(I) 
    250         self.pm.done() 
    251  
    252  
     267            W = W[s.isfinite(W).nonzero()[0]] 
     268            print "\nW: %s" % (" ".join(["%+9.2g" % w for w in W]),) 
     269            if len(W): 
     270                # Resample everything together 
     271                I = self.resampler(W, N_members) 
     272                X = XP[I] 
     273                print "I: %s\n" % (" ".join(["%9d" % ii for ii in I]),) 
     274                allocator.updateAllocations(I) 
     275            else: 
     276                # Nothing in the proposed population had any plausibility 
     277                allocator.updateAllocations() 
     278                print "I []" 
     279        wfd = defer.waitForDeferred(self.pm.done()) 
     280        yield wfd 
     281        wfd.getResult() 
     282        print "\nPMC Run done" 
     283         
     284 
     285 
  • projects/AsynCluster/trunk/svpmc/project.py

    r168 r171  
    247247        Call this when the project is done. 
    248248        """ 
    249         self.cdf.close() 
    250  
     249        d = self.mgr.shutdown() 
     250        d.addCallback(lambda _: self.cdf.close()) 
     251        return d 
     252 
  • projects/AsynCluster/trunk/svpmc/sample.py

    r168 r171  
    5151        if self.logWeights: 
    5252            W = s.exp(W - W.max()) 
     53        if not W.sum(): 
     54            raise ValueError("Undefined result with all zero weights") 
     55        if not s.isfinite(W).all(): 
     56            W = 1 - s.isfinite(W) 
    5357        W = W.astype(float) / sum(W) 
    5458        I0 = (s.isfinite(W) * s.greater(W, 1E-7)).nonzero()[0] 
     
    5963            return [] 
    6064        if K == 1: 
    61             return [0]*N             
     65            return I0.repeat(N) 
    6266        x = s.zeros((K,2)) 
    6367        x[:,0] = K*W[I0] 
  • projects/AsynCluster/trunk/svpmc/test/test_params.py

    r168 r171  
    227227 
    228228    def test_init(self): 
    229         for k, name in enumerate(self.priorContainer.paramNames): 
     229        for k, name in enumerate(util.PARAM_NAMES): 
     230            self.failUnlessEqual(name, self.priorContainer.paramNames[k]) 
    230231            priorArray = getattr(self.priorContainer, name) 
    231232            self.failUnlessEqual(priorArray.shape, self.MPC.paramShapes[k]) 
     
    275276     
    276277    def test_proposal_dist(self): 
    277         N = 5000 
     278        N = 1000 
    278279        sigma = 0.12 
    279280        paramArray = params.FlexArray(N) 
  • projects/AsynCluster/trunk/svpmc/weave.py

    r164 r171  
    7171    def _get_code(self): 
    7272        if not hasattr(self, '_code'): 
     73            moduleName = self.__class__.__module__.split(".")[-1] 
    7374            self._code = self._parseCode( 
    74                 resource_string(__name__, "%s.c" % self.__class__.__module__)) 
     75                resource_string(__name__, "%s.c" % moduleName)) 
    7576        return self._code 
    7677    code = property(_get_code)