Changeset 166

Show
Ignore:
Timestamp:
04/29/08 22:46:52 (7 months ago)
Author:
edsuom
Message:

Got pmc module cleanly prototyped, now need to unit test; then we should be ready to start running stuff, finally!

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/svpmc/model.py

    r165 r166  
    123123        undergoing some nested MCMC. 
    124124 
    125         Returns a deferred that fires (with C{None}) when the likelihood 
    126         computation is done and the paramContainer has been updated. 
     125        Returns a deferred that fires with a reference to the paramContainer 
     126        when the likelihood computation is done and the it has been updated. 
    127127        """ 
    128128        def gotPackedLikelihood(result): 
     
    139139            for k, name in enumerate(('Lx', 'z', 'h')): 
    140140                setattr(paramContainer, name, result[k]) 
     141            return paramContainer 
    141142         
    142143        if (remote or self.remoteMode) and not local: 
  • projects/AsynCluster/trunk/svpmc/params.py

    r165 r166  
    3131 
    3232 
    33 PARAM_NAMES = [ 
    34     # Total parameters: 3*p**2 + 6*p 
    35     #------------------------------------------------------------ 
    36     'a',    # Drift (p x 1) 
    37      
    38     'b',    # Lag-1 cross-correlation for VAR of returns (p x p) 
    39  
    40     'cr',   # Innovation shock cross-correlation coefficients (vector with 
    41             # r01, r02,..., r0p, r12,..., r1p,...) 
    42      
    43     'd',    # Volatility offset (p x 1) 
    44      
    45     'e',    # Lag-1 cross-correlations for VAR of volatilities 
    46             # (p x p) 
    47  
    48     'fs',   # Volatility shock standard deviations (vector with s0, s1,..., sp) 
    49      
    50     'fr',    # Volatility shock coefficients (vector with 
    51              # r01, r02,..., r0p, r12,..., r1p,...) 
    52  
    53     'g',    # Volatility/innovation shock correlations (p x 1) 
    54  
    55     ] 
    56  
    57  
    5833class FlexArray(object): 
    5934    """ 
     
    243218    """ 
    244219    keyAttrs = {'z':[], 'Lx':None, 'Lp':None, 'Lj':None} 
    245     paramNames = PARAM_NAMES 
    246220 
    247221 
     
    336310    I am a container for array-like instances of L{FlexArray} that contain 
    337311    L{Prior} objects for the array elements of my model's parameters. 
    338     """ 
    339     paramNames = PARAM_NAMES 
    340  
     312 
     313    @ivar n: The number of samples. 
     314     
     315    @ivar p: The number of time series. 
     316 
     317    @ivar paramNames: A list of the parameter names in sequence. 
     318 
     319    """ 
     320    def __init__(self, p, n): 
     321        self.p = p 
     322        self.n = n 
     323     
    341324    def priorArray(self, paramName, *shape): 
    342325        array = getattr(self, paramName, None) 
     
    352335        """ 
    353336        Lp = 0 
    354         paramContainer = ParameterContainer() 
     337        paramContainer = ParameterContainer( 
     338            paramNames=self.paramNames, z=s.randn(self.p, self.n)) 
    355339        for name in self.paramNames: 
    356340            priorFlexArray = getattr(self, name) 
     
    383367        for name in self.paramNames: 
    384368            priorFlexArray = getattr(self, name) 
    385             jumps = priorArray.call('rJump', wiggle) 
    386             paramArray = getattr(paramContainer, name) + jumps 
    387             Lp += s.log(priorFlexArray.call('pdf', paramArray)).sum() 
     369            for attempt in xrange(self.attempts): 
     370                jumps = priorFlexArray.call('rJump', wiggle) 
     371                paramArray = getattr(paramContainer, name) + jumps 
     372                pPriors = priorFlexArray.call('pdf', paramArray) 
     373                if s.greater(pPriors, 0).all(): 
     374                    break 
     375            else: 
     376                raise ValueError("Failed to generate a valid proposal") 
     377            Lp += s.log(pPriors).sum() 
    388378            Lj += s.log(priorFlexArray.call('pJump', jumps)).sum() 
    389379            setattr(newVersion, name, paramArray) 
  • projects/AsynCluster/trunk/svpmc/pmc.py

    r164 r166  
    1717 
    1818""" 
    19 Monte Carlo sampling of a distribution (typically a posterior) using various 
    20 methods. 
     19Population Monte Carlo sampling. 
    2120""" 
    2221 
    23 import os.path, time 
    2422from random import sample as sampleWOR 
    2523 
    2624import scipy as s 
    2725from scipy import random 
    28 from Scientific.IO import NetCDF 
    2926from twisted.internet import defer 
    30  
    3127import asynqueue 
    32 from twisted_goodies.pybywire.pack import Packer, Unpacker 
    33 from asyncluster.master import jobs 
    34  
    35 import model, params 
     28 
     29import params 
    3630from sample import resample 
    3731 
     
    4034    """ 
    4135    Population Monte Carlo with per Cappe et al. 
     36 
    4237    """ 
    4338    chunkSize = 5000 
     
    4540     
    4641    def __init__(self, projectManager): 
    47         self.mgr = projectManager 
    48      
    49     def setupRun(self, N_iter, N_members): 
    50         """ 
    51         Set up a new run with various parameters and a restarted iteration 
    52         producer. 
    53         """ 
    54         self.N_iter = N_iter 
    55         self.N_members = N_members 
    56         self.iterationProducer.restart() 
    57  
     42        self.pm = projectManager 
     43        self.mm = projectManager.mgr 
     44     
    5845    def _get_queue(self): 
    5946        if not hasattr(self, '_queue'): 
     
    6451    queue = property(_get_queue) 
    6552     
    66     def subsetIndex(self, k): 
    67         """ 
    68         Returns a subset index for the samples in my I{X} attribute that 
    69         correspond to the jump variance for the supplied index I{k}. 
    70         """ 
    71         I = sampleWOR(self.Is, self.R[k]) 
    72         self.Is = s.setdiff1d(self.Is, I) 
    73         return I 
     53    def initialPopulation(self, N): 
     54        """ 
     55        Generates a new population of I{N} parameters from the priors and 
     56        returns a 1-D FlexArray of their parameter containers. 
     57        """ 
     58        X = params.FlexArray(N) 
     59        for k, paramContainer in enumerate(X): 
     60            X[k] = self.pm.priors.new() 
     61        return X 
    7462 
    7563    def proposals(self, X, v): 
    7664        """ 
    77         Returns a 3-tuple of info for jump proposals on the parameter 
    78         containers in the supplied 1-D parameter array I{X}, given a jump 
    79         deviation I{v}. Proposals with zero probability of occuring, given the 
    80         parameters' prior distributions, are censored from the result. (There's 
    81         no point considering the likelihood of proposals that cannot occur.) 
    82          
    83         The 3-tuple contains like-dimensioned 1-D arrays, with the following 
    84         respective elements: 
    85  
    86             1. A valid proposed parameter container. 
    87              
    88             2. The log-probability density of the proposal given the 
    89                parameters' prior distributions. 
    90  
    91             3. The log-probability density of the proposal jumps. 
    92          
    93         """ 
    94         XD = self.mgr.rProposal(len(X), v) 
    95         XP = X + XD 
    96         pPriors = self.mgr.pPriors(XP) 
    97         I = s.greater(pPriors, 0).all(1).nonzero() 
    98         return XP[I], \ 
    99                s.log(pPriors[I]).sum(1), \ 
    100                s.log(self.mgr.pProposal(XD[I], v)).sum(1) 
     65        Returns a 1-D FlexArray of parameter containers resulting in proposals 
     66        from the supplied FlexArray of parameter containers I{X}, given the 
     67        specified jump deviation I{v}. 
     68 
     69        Proposals with zero probability of occuring, given the parameters' 
     70        prior distributions, are censored. (There's no point considering the 
     71        likelihood of proposals that cannot occur.) 
     72        """ 
     73        XP = params.FlexArray(len(X)) 
     74        for k, paramContainer in enumerate(X): 
     75            XP[k] = self.pm.priors.proposal(paramContainer, v) 
     76        return XP 
    10177 
    10278    @defer.deferredGenerator 
    10379    def weightedProposals(self, X, v): 
    10480        """ 
    105         Returns a deferred that fires with a parameter array of valid proposals 
    106         from the parameters in the supplied parameter array I{X}, given the 
    107         specified proposal variance I{v}, along with log-importance weights for 
    108         those proposals. 
    109          
    110         Valid proposals are those with both non-zero prior probability and 
    111         non-zero likelihood. The censoring to only valid proposals means that 
    112         fewer proposals may be returned than supplied parameter sequences, 
    113         possibly no proposals at all. 
     81        Returns a deferred that fires with a 1-D FlexArray of proposals from 
     82        the parameter containers in the supplied FlexArray I{X}, given the 
     83        specified proposal variance I{v}, along with a like-dimensioned array 
     84        of linear, fractional importance weights for the proposals. 
    11485         
    11586        The importance weights are to be combined with those from other calls 
     
    11990        proportional to its likelihood and prior probability density, and 
    12091        inversely proportional to the probability density of the proposal 
    121         offest. 
    122         """ 
    123         k = 0 
    124         XP, W = [], [] 
     92        offset. 
     93        """ 
     94        def weight(paramContainer): 
     95            logWeight = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj 
     96            if s.isfinite(logWeight): 
     97                return s.exp(logWeight) 
     98            return 0.0 
     99         
     100        j = 0 
     101        N = len(X) 
     102        W = s.empty(len(X)) 
    125103        wfd = defer.waitForDeferred( 
    126104            self.queue.call(self.proposals, X, v, series='proposals')) 
    127105        yield wfd; 
    128         XPJ = wfd.getResult() 
    129         fullChunks, partialChunkSize = divmod(len(XPJ[0]), self.chunkSize) 
    130         chunkSizes = [self.chunkSize]*fullChunks 
    131         if partialChunkSize: 
    132             chunkSizes.append(partialChunkSize) 
    133         for N in chunkSizes: 
    134             Ic = slice(k, k+N) 
    135             k += N 
    136             wfd = defer.waitForDeferred(self.mgr.likelihoods(XPJ[0][Ic])) 
    137             yield wfd; 
    138             L = wfd.getResult() 
    139             If = s.isfinite(L).nonzero()[0] 
    140             if len(If): 
    141                 XP.append(XPJ[0][Ic][If]) 
    142                 W.append(L[If] + XPJ[1][Ic][If] - XPJ[2][Ic][If]) 
    143         if XP: 
    144             arrayXP = params.FlexArray(len(XP)) 
    145             for k, thisXP in enumerate(XP): 
    146                 arrayXP[k] = thisXP 
    147             yield arrayXP, s.concatenate(W) 
    148         else: 
    149             yield [], [] 
    150      
     106        XP = wfd.getResult() 
     107        while j < N: 
     108            N_this = min([self.chunkSize, N-j]) 
     109            # Here's where the vast majority of the CPU time is expended! 
     110            dList = [ 
     111                self.mm.likelihood(XP[k], v).addCallback(weight) 
     112                for k in xrange(j, j+N_this)] 
     113            wfd = defer.waitForDeferred(defer.gatherResults(dList)) 
     114            yield wfd 
     115            W[j:j+N_this] = wfd.getResult() 
     116            j += N_this 
     117        return XP, W 
     118 
    151119    @defer.deferredGenerator 
    152     def run(self, N_members, **kw): 
    153         """ 
    154         Does a PMC run with I{N_members} population members and jump variances 
    155         in the supplied 1-D array I{V}. The number of population members for 
     120    def run(self, N_iter, V=None): 
     121        """ 
     122        Does a PMC run with I{N_iter} iterations and the supplied sequence I{V} 
     123        of jump deviations. The portion of the population members allocated to 
    156124        each jump variance will go up or down, depending on the performance for 
    157125        that setting. 
    158126 
    159         B{NOTE}: What I've been calling variances are actually computed as 
    160         standard deviations, i.e., not squared. Should fix this either by 
    161         relabeling or adjusting the code. 
    162          
    163127        Returns a deferred that fires when the run is done. No output value is 
    164         provided via the deferred, however. 
    165  
    166         Use a provider of L{interfaces.IConsumer} to get the samples as they 
    167         are produced. Each attached consumer is updated after every iteration 
    168         with my array of parameter vectors (one vector per row) for each 
    169         iteration. 
     128        provided via the deferred, however. Instead, everything is written to 
     129        my project manager's open NetCDF file. 
    170130 
    171131        @param N_iter: The number of iterations to produce after burn-in. 
    172132 
    173         @param N_bi: The number of burn-in iterations to discard before 
    174           producing any. 
    175  
    176133        @param V: A sequence of variance values to use instead of the default. 
    177134         
    178135        """ 
    179         def gotIndex(I, k, v): 
    180             II[k] = I 
    181             d = self.weightedProposals(X[I], v) 
    182             d.addCallback(gotWeightedProposals, k) 
    183             return d 
    184  
    185         def gotWeightedProposals(results, k): 
    186             XP[k] = results[0] 
    187             W[k] = results[1] 
    188  
    189         def normalize(R): 
    190             # Rescale so that the proportions are global rather than 
    191             # individual. Highly successful settings will have a large portion 
    192             # of the next sample even if they last operated on only a few of 
    193             # the samples. 
     136        N_members = self.pm.m 
     137        # The variance settings (default of 6) 
     138        if V is None: 
     139            V = self.V 
     140        allocator = Allocator(N_members, V) 
     141        # Initialize some arrays 
     142        X = self.initialPopulation(N_members) 
     143        # Iteration 
     144        for i in xrange(N_iter): 
     145            dList, resultList = [], [] 
     146            for v, I, d in allocator.assembler(resultList): 
     147                d.addCallback(lambda _: self.weightedProposals(X[I], v)) 
     148                dList.append(d) 
     149            # Record the previous iteration's results while the nodes work on 
     150            # the current iteration... 
     151            self.pm.writeParams(X) 
     152            wfd = defer.waitForDeferred(defer.DeferredList(dList)) 
     153            yield wfd; wfd.getResult() 
     154            XP, W = resultList 
     155            # Resample everything together 
     156            I = resample(W, N_members) 
     157            X = XP[I] 
     158            allocator.updateAllocations(I) 
     159        self.recorder.done() 
     160 
     161 
     162class Allocator(object): 
     163    """ 
     164    Construct me with a population size I{N} and a sequence I{V} of jump 
     165    deviations. 
     166    """ 
     167    def __init__(self, N, V): 
     168        self.N = N 
     169        self.V = V 
     170        self.P = len(V) 
     171        self.rMin = max([2, int(round(0.01 * N))]) 
     172        self.rMax = N - self.rMin*(self.P-1) 
     173        self.updateAllocations() 
     174     
     175    def subsetIndex(self, k): 
     176        """ 
     177        Returns a subset index for population members that correspond to the 
     178        jump deviation for the supplied index I{k}. 
     179        """ 
     180        I = sampleWOR(self.Is, self.R[k]) 
     181        self.Is = s.setdiff1d(self.Is, I) 
     182        return I 
     183     
     184    def assembler(self, resultList): 
     185        """ 
     186        Call this method with a reference to an empty list that will hold 
     187        assembled results. 
     188 
     189        For each of my jump deviations, the method will yield the deviation 
     190        value, a 1-D array of indices for the subset of my population allocated 
     191        to that deviation value, and a fresh deferred already fired with 
     192        C{None}. You must add a callback to the deferred for each iteration 
     193        that returns one or more 1-D arrays of the same length as the yielded 
     194        index array. 
     195         
     196        Each returned array for each subset will be assembled back into a 
     197        single population-size array and placed into the supplied list, in 
     198        order. 
     199        """ 
     200        def gotResults(results, I): 
     201            if not resultList: 
     202                for result in results: 
     203                    if isinstance(result, (int, float)): 
     204                        resultList.append(s.empty(self.N)) 
     205                    else: 
     206                        resultList.append(params.FlexArray(self.N)) 
     207            for k, result in enumerate(results): 
     208                resultList[k][I] = result 
     209         
     210        if resultList != []: 
     211            raise ValueError( 
     212                "You must supply an empty list to hold your assembled results") 
     213        self.II = [None] * self.P 
     214        for k in s.flipud(s.argsort(self.R)): 
     215            I = self.II[k] = self.subsetIndex(k) 
     216            d = defer.succeed(None) 
     217            yield self.V[k], I, d 
     218            d.addCallback(gotResults, I) 
     219 
     220    def updateAllocations(self, I=None): 
     221        """ 
     222        Rescales my allocations based on the supplied 1-D array of resampling 
     223        Highly successful allocations will have a large portion of the next 
     224        allocation even if they last operated on only a few members of the 
     225        population. 
     226        """ 
     227        if I is None: 
     228            R = self.N * s.ones(self.P) / self.P 
     229        elif hasattr(self, 'II'): 
     230            R = s.array( 
     231                [sum([x in self.II[k] for x in I]) for k in xrange(self.P)]) 
    194232            R = R.astype(float) / R.sum() 
    195             # Turn the scaled array into an array of subsample sizes, with a 
    196             # minimum size of two apiece. 
    197             R = s.clip( 
    198                 s.round_(N_members*R), rMin, N_members-rMin*(P-1)).astype(int) 
    199             # Twiddle the biggest one as needed to keep sum = N_members 
    200             R[s.argmax(R)] += N_members - sum(R) 
    201             # Replace the old list and subset index 
    202             self.R = R 
    203             self.Is = s.arange(N_members) 
    204  
    205         N_iter, N_bi = self.setupRun(N_members, **kw) 
    206         # The variance settings (default of 6) 
    207         V = kw.pop('V', self.V) 
    208         # Some constants 
    209         P = len(V) 
    210         rMin = max([2, int(round(0.01*N_members))]) 
    211         # Initialize some arrays 
    212         X = self.mgr.rPriors(N_members) 
    213         normalize(s.ones(P)) 
    214         V = s.array(V) 
    215         # Iteration 
    216         for i in xrange(N_iter + N_bi): 
    217             t0 = time.time() 
    218             # Generate and weight some proposals 
    219             XP, W, II = [[None]*P for k in (1,2,3)] 
    220             dList = [] 
    221             # Get the iteration going with the biggest subpopulations first 
    222             for k in s.flipud(s.argsort(self.R)): 
    223                 d = self.queue.call(self.subsetIndex, k, series='subsets') 
    224                 d.addCallback(gotIndex, k, V[k]) 
    225                 dList.append(d) 
    226             d = defer.DeferredList(dList) 
    227             # Provide the previous iteration's results to any consumers while 
    228             # the nodes work on the current iteration... 
    229             if i >= N_bi: 
    230                 self.iterationProducer.gotNext(X) 
    231             # ...then yield until the nodes are done 
    232             wfd = defer.waitForDeferred(d) 
    233             yield wfd; wfd.getResult() 
    234             # Resample everything together 
    235             Wnz = [w for w in W if len(w)] 
    236             if Wnz: 
    237                 I = resample(s.concatenate(Wnz), N_members, logWeights=True) 
    238                 if len(I): 
    239                     X = s.row_stack([XP[k] for k in xrange(P) if len(W[k])])[I] 
    240                     R = s.array([ 
    241                         sum([x in II[k] for x in I]) for k in xrange(P)]) 
    242             else: 
    243                 R = s.ones(P) 
    244             # Normalize R to maintain a total of N_members population members 
    245             normalize(R) 
    246             # Provide some info 
    247             vInfo = ", ".join(["%4d" % r for r in self.R]) 
    248             msg = "%04d, %5.2f sec, VR={%s} :  %s" % ( 
    249                 i, time.time()-t0, vInfo, 
    250                 ", ".join(["%9.3g" % x for x in X.mean(0)])) 
    251             print msg 
    252         self.recorder.done() 
     233        else: 
     234            raise AttributeError( 
     235                "You can only update with an index array after completion "+\ 
     236                "of an assembler run") 
     237 
     238        # Turn the scaled array into an array of subsample sizes, with a 
     239        # minimum size of two apiece. 
     240        R = s.clip(s.round_(self.N*R), self.rMin, self.rMax).astype(int) 
     241        # Twiddle the biggest one as needed to keep sum = Number of members 
     242        R[s.argmax(R)] +=  - sum(R) 
     243        # Replace the old list and subset index 
     244        self.R = R 
     245        self.Is = s.arange(self.N) 
     246 
  • projects/AsynCluster/trunk/svpmc/project.py

    r165 r166  
    4646    @ivar m: The number of population members in each population monte carlo 
    4747      iteration. 
     48 
     49    @ivar paramNames: A list of the names of parameter arrays in the parameter 
     50      sequences used in the project. 
     51 
     52    @ivar priors: An instance of L{params.PriorContainer} constructed in 
     53      accordance with my project specification. 
    4854     
    4955    """ 
     
    6369        self.tables = self._parseSpec(specFile) 
    6470        tsList, seriesTitles = self._setupTimeSeries(specDir) 
     71        tsData = s.row_stack([ts() for ts in tsList]) 
     72        self.p, self.n = tsData.shape 
    6573        paramTitles, dimensions = self._setupParams() 
    6674        self._setupCDF( 
    6775            os.path.join(specDir, ncFileName), 
    68             tsList, seriesTitles, paramTitles, dimensions) 
     76            tsData, seriesTitles, paramTitles, dimensions) 
    6977        self.mgr = model.ModelManager(self.paramNames, tsList) 
    7078     
     
    164172        self.paramNames = [] 
    165173        titles, dimensions = [], [] 
    166         p = self.p = len(self.tables['series']) 
    167         xcorrs = self.xcorrs = int(0.5*(p**2 + p)) 
    168         self.paramContainer = params.PriorContainer() 
     174        xcorrs = self.xcorrs = int(0.5*(self.p**2 + self.p)) 
     175        self.priors = params.PriorContainer(self.p, self.n) 
    169176        for name, dims, title in self.tables['parameter']: 
    170177            shape = parseDims(dims) 
    171             pa = self.paramContainer.priorArray(name, *shape) 
     178            pa = self.priors.priorArray(name, *shape) 
    172179            for priorSpec in self.tables[name]: 
    173180                constructPriors(priorSpec, shape) 
    174181            titles.append(title) 
    175182            self.paramNames.append(name) 
     183        self.priors.paramNames = self.paramNames 
    176184        return titles, dimensions 
    177185 
    178     def _setupCDF(self, filePath, tsList, sTitles, pTitles, pDims): 
    179         """ 
    180         """ 
    181         tsData = s.row_stack([ts() for ts in tsList]) 
    182         self.n = tsData.shape[1] 
     186    def _setupCDF(self, filePath, tsData, sTitles, pTitles, pDims): 
     187        """ 
     188        """ 
    183189        cdf = self.cdf = NetCDF.NetCDFFile(filePath, 'w') 
    184190        cdf.title = self.tables['project'][0][0] 
  • projects/AsynCluster/trunk/svpmc/test/test_params.py

    r165 r166  
    217217     
    218218    def setUp(self): 
    219         self.priorContainer = params.PriorContainer(
     219        self.priorContainer = params.PriorContainer(3, 100
    220220        self.priorContainer.paramNames = ['a', 'b', 'c'] 
    221221        kw = {'dname':'norm', 'loc':0} 
  • projects/AsynCluster/trunk/svpmc/test/test_project.py

    r165 r166  
    3333class Testable_ProjectManager(project.ProjectManager): 
    3434    def __init__(self): 
    35         pass 
    36      
     35        self.p = 3 
     36        self.n = 937 
     37 
    3738 
    3839class Test_ProjectManager(util.TestCase): 
     
    7677        for title in titles: 
    7778            self.failUnless(isinstance(title, str)) 
    78         container = self.mgr.paramContainer 
     79        container = self.mgr.priors 
    7980        self.failUnlessEqual(container.a.shape, (3,)) 
    8081        self.failUnlessEqual(container.b.shape, (3,3)) 
     
    9293        self.mgr.m = 10000 
    9394        tsList, seriesTitles = self.mgr._setupTimeSeries(self.specDir) 
     95        tsData = s.row_stack([ts() for ts in tsList]) 
    9496        paramTitles, dimensions = self.mgr._setupParams() 
    9597        cdf = self.mgr._setupCDF( 
    96             cdfPath, tsList, seriesTitles, paramTitles, dimensions) 
     98            cdfPath, tsData, seriesTitles, paramTitles, dimensions) 
    9799        # Make sure we can write to the open file 
    98100        var = cdf.variables['a']