Changeset 205

Show
Ignore:
Timestamp:
06/01/08 00:22:10 (6 months ago)
Author:
edsuom
Message:

Rao-Blackwellization on z looks very promising, but let's ditch it on jumps for now

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/svpmc/model.c

    r202 r205  
    182182  double c, cp; 
    183183  const double diff = b-a; 
     184  const double scaleUp = 1.06; // Increase scale value to account for EXP error 
    184185  // Compute the scale value c as the maximum possible pdf (regular normal 
    185186  // distribution) within the truncated range 
     
    187188    if(b > 0) { 
    188189      // Center is within the range, so use its pdf 
    189       c = 1
     190      c = scaleUp
    190191    } else { 
    191192      // Range is below center, so use pdf at its right edge 
    192       c = EXP(-0.5*b*b); 
     193      c = scaleUp * EXP(-0.5*b*b); 
    193194    } 
    194195  } else { 
    195196    // Range is above center, so use pdf at its left edge 
    196     c = EXP(-0.5*a*a); 
     197    c = scaleUp * EXP(-0.5*a*a); 
    197198  } 
    198199  // Accept-reject sampling for the truncated normal distribution 
     
    266267    // array lookups and products in the equation. 
    267268    // 
    268     //SPA1(S, x0, k) = exp(-0.5 * SPA1(S, h, k)) * PA1(x, k); 
    269     // 
    270     // This is very fast, but has error of about +/-2%. However, the error 
    271     // doesn't seem to make any difference in the variance of log-likelihood 
    272     // estimates. 
    273     SPA1(S, x0, k) = EXP(-0.5 * SPA1(S, h, k)) * PA1(x, k); 
     269    SPA1(S, x0, k) = exp(-0.5 * SPA1(S, h, k)) * PA1(x, k); 
     270    // Using EXP is very fast, but has error of about +/-6%. In a multivariate 
     271    // test, the resulting error was -1112.29 vs an expected -1135.15. 
     272    //SPA1(S, x0, k) = EXP(-0.5 * SPA1(S, h, k)) * PA1(x, k); 
    274273 
    275274  // Now decorrelate the equi-variance innovations in preparation for the 
     
    308307 
    309308 
    310 // Model.likelihood 
     309// Model.hybridGibbs 
    311310//  
    312311// Supplied variables 
     
    330329// sc   Constants for multivariate normal PDF 
    331330 
    332 //--- Return value (double float) --- 
    333 // The linear likelihood P(x|w) of the innovations given the parameters, 
    334 // integrated over the ni log-volatility simulations 
    335  
    336331 
    337332// Run evaluations until odds of error in selection between current and 
     
    346341int ki, ke; 
    347342int k0, k1, k2, k3; 
    348 double val, maxVal
     343double val
    349344 
    350345// Multivariate innovation for one current time-series sample 
     
    355350// first time-series sample of the evaluation interval 
    356351double he[2][Nt]; 
    357 // Log-likelihood of the innovation for the first time-series sample in current 
    358 // and proposal evalutions, working values and accumulated for each iteration 
    359 double Lxk[2], Lx[ni]; 
    360352 
    361353// A set of evaluation sample structs 
     
    380372// For each iteration... 
    381373for(ki=0; ki<ni; ki++) { 
    382  
    383   // Initialize this iteration's accumulator 
    384   Lx[ki] = 0; 
    385374 
    386375  // The "previous" log-volatility of the first time-series sample is set to 
     
    407396    for(k2=0; k2<2; k2++) 
    408397      se[k2].Lx = 0; 
    409       // Lxk[.] will be set to a value in se[.].Lx that has already been 
    410       // multivariate-accumulated, so it doesn't need clearing here 
    411398     
    412399    do { 
     
    438425        logLikelihood(&se[k2], &P, xk); 
    439426         
    440         // Save the first log-volatility and the log-likelihood based on it (and 
    441         // it alone, at this point) as the log-likelihood of the innovation given 
    442         // the log-volatility of the proposal 
     427        // Save the first log-volatility 
    443428        if(k1==k0) { 
    444429          for(k3=0; k3<Nt; k3++) 
    445430            he[k2][k3] = PA1(se[k2].h, k3); 
    446           Lxk[k2] = se[k2].Lx; 
    447431        } 
    448432      } 
     
    478462        PA1(se[k3].h, k2) = val; 
    479463    } 
    480  
    481     // The joint probability of the innovation and the variates underlying the 
    482     // log-volatility simulation, conditional on the parameters, is equal to 
    483     // the likelihood of the innovation, conditional on the simulated 
    484     // variates and the parameters, times the probability of the 
    485     // simulated variates, conditional on the parameters: 
    486     // 
    487     // P(x,z|w) = P(x|z,w) * P(z|w) 
    488     // 
    489     // P(z|w) doesn't need to be accounted for, however, because the z's are 
    490     // simulated from it. 
    491  
    492     // Accumulate Log(P(xk,zk|w) = Lxk to eventually get Log(P(x,z|w) 
    493     Lx[ki] += Lxk[ke]; 
    494464   
    495465    // Done with log-volatility draw for this time-series sample 
     
    513483} 
    514484 
    515 // Compute the result, the integrated P(x|w) over the last 3/4 of the 
    516 // simulations on z 
    517 k0 = (int)(100*ni/400); 
     485 
     486 
     487// Model.likelihood 
     488//  
     489// Supplied variables 
     490// ---------------------------------------------------------------------------- 
     491 
     492//--- Supplied variables --- 
     493// z    Independent normal variates, m previously simulated sets, [mpn] array 
     494// x    Innovation values, [pn] array 
     495 
     496//--- Model parameters, direct --- 
     497// d    Volatility offset, [p] vector 
     498// e    Lag-1 cross-correlations for VAR of volatilities, [pp] array 
     499// g    Volatility/innovation shock correlations, [p] vector 
     500 
     501//--- Model parameters, derivations --- 
     502// rz   Concurrent xcorr, innovation-volatility normals, [pp] 
     503// ri   Inverse concurrent xcorr, innovation shocks, [pp] 
     504// sc   Constants for multivariate normal PDF 
     505 
     506//--- Return value (double float) --- 
     507// The linear likelihood P(x|w) of the innovations given the parameters, 
     508// integrated over the log-volatility simulations along the first axis of z 
     509 
     510 
     511// The number of time series making up a single multivariate sample 
     512const int Nt = Nx[0]; 
     513// The number of multivariate samples 
     514const int Ns = Nx[1]; 
     515 
     516int ki; 
     517int k0, k1; 
     518double val, maxVal; 
     519 
     520// Multivariate innovation for one current time-series sample 
     521double *xk; 
     522// Log-likelihood of the innovation for the time-series sample in the current 
     523// evalution, accumulated for each iteration 
     524double Lx[Nz[0]]; 
     525 
     526// A single evaluation sample struct 
     527struct sample se; 
     528// A struct for the model parameters 
     529struct parameters P; 
     530 
     531 
     532// Generate parameter struct 
     533P.d = d_array; P.e = e_array; P.g = g_array; 
     534P.rz = rz_array; P.ri = ri_array; P.sc = sc_array; 
     535 
     536// Initialize various arrays 
     537xk = (double *) malloc(sizeof(double) * Nt); 
     538se.z = (double *) malloc(sizeof(double) * Nt); 
     539se.h = (double *) malloc(sizeof(double) * Nt); 
     540se.x0 = (double *) malloc(sizeof(double) * Nt); 
     541se.x1 = (double *) malloc(sizeof(double) * Nt); 
     542 
     543 
     544// Iterate over each set of simulated z values... 
     545for(ki=0; ki<Nz[0]; ki++) { 
     546 
     547  // Initialize this iteration's accumulator 
     548  Lx[ki] = 0; 
     549 
     550  // The "previous" log-volatility of the first time-series sample is set to 
     551  // the log-volatility offset plus a simulated, latent-parameter value. 
     552  // TODO 
     553  for(k1=0; k1<Nt; k1++) 
     554    PA1(se.h, k1) = D1(k1); 
     555 
     556  // For each time-series sample... 
     557  for(k0=0; k0<Ns; k0++) { 
     558     
     559    // For each time series... 
     560    for(k1=0; k1<Nt; k1++) { 
     561      // Prepare current-innovation vector xk... 
     562      PA1(xk, k1) = X2(k1, k0); 
     563      // ...and set z 
     564      PA1(se.z, k1) = Z3(ki, k1, k0); 
     565    } 
     566     
     567    // Compute the multivariate log-volatility... 
     568    logVol(&se, &P); 
     569    // ...and the log-likelihood of the innovation for this time-series 
     570    // sample, given the log-volatility 
     571    logLikelihood(&se, &P, xk); 
     572    // Accumulate the log-likelihood 
     573    Lx[ki] += se.Lxk; 
     574   
     575    // Done with log-likelihood evaluation for this time-series sample 
     576  } 
     577 
     578  // Done with iterations 
     579
     580 
     581// Free array memory 
     582free(xk); 
     583free(se.z); 
     584free(se.h); 
     585free(se.x0); 
     586free(se.x1); 
     587 
     588 
     589// Compute the result, the integrated P(x|w) over all the sets of simulated z 
     590// values 
    518591 
    519592// Compute the maximum log density and save it to subtract from everybody so 
    520593// that the maximum log density is normalized to zero 
    521594maxVal = -HUGE_VAL; 
    522 for(ki=k0; ki<ni; ki++) { 
     595for(ki=0; ki<Nz[0]; ki++) { 
    523596  if(Lx[ki] > maxVal) 
    524597    maxVal = Lx[ki]; 
     
    526599// Accumulate the linearized normalized densities 
    527600val = 0; 
    528 for(ki=k0; ki<ni; ki++) 
     601for(ki=0; ki<Nz[0]; ki++) 
    529602  val += exp(Lx[ki] - maxVal); 
    530603// ...and return the denormalized, log mean of the linear densities... 
    531 return_val = log(val) + maxVal - log(ni-k0); 
     604return_val = log(val) + maxVal - log(Nz[0]); 
  • projects/AsynCluster/trunk/svpmc/model.py

    r202 r205  
    2020""" 
    2121 
     22from copy import copy 
     23 
    2224import scipy as s 
    2325from scipy import linalg, stats 
    2426 
    2527from twisted.internet import defer 
     28from twisted.spread import pb 
    2629 
    2730from asyncluster.master import jobs 
     
    3235 
    3336 
     37class ZFetcher(pb.Referenceable): 
     38    """ 
     39    I provide a node worker with access to the normal variates underlying the 
     40    latest log-volatility simulations. 
     41 
     42    ???????????????????????????? 
     43    """ 
     44    chunkSize = 60000 
     45    intScaleUp = 2000 
     46    intScaleDown = 1.0 / intScaleUp 
     47     
     48    def __init__(self, m, p, n): 
     49        self.m = m 
     50        self.z = s.empty((m, p, n)) 
     51 
     52    def getZ(self, fullPrecision=False): 
     53        # Transmitting at short int cuts data size to one fourth, still offers 
     54        # full needed range with reasonable precision 
     55        result = self.z[self.successes,:,:] 
     56        if fullPrecision: 
     57            return result 
     58        return (self.intScaleUp*result).astype('h') 
     59 
     60    def _packageZ(self): 
     61        zPacked = pack.Packer(self.getZ())() 
     62        k = 0 
     63        N = len(zPacked) 
     64        self.zPackedChunks = [] 
     65        while k < N: 
     66            chunk = zPacked[k:k+self.chunkSize] 
     67            self.zPackedChunks.append(chunk) 
     68            k += self.chunkSize 
     69 
     70    def restart(self): 
     71        self.failures = [] 
     72        self.successes = [] 
     73        self.chunks = {} 
     74        self.zPackedChunks = None 
     75     
     76    def update(self, member, z=None): 
     77        """ 
     78        Call to update me with the results of a log-volatility simulation, with 
     79        integer arguments specifying the I{member} of the population for the 
     80        latest iteration of the overall PMC simulation. 
     81 
     82        If the log-volatility simulation succeeded, also supply the 2-D array 
     83        of its normal variates. Otherwise, the population member will be marked 
     84        as a failure. 
     85 
     86        Returns C{True} to indicate when this is the last update, C{False} 
     87        until then. 
     88        """ 
     89        if not isinstance(member, int) or member < 0 or member >= self.m: 
     90            raise ValueError( 
     91                "Invalid member ID '%s', must be integer between 0 and %d" \ 
     92                % (member, self.m-1)) 
     93        if z is None: 
     94            self.failures.append(member) 
     95        else: 
     96            self.successes.append(member) 
     97            self.z[member,:,:] = z 
     98        if len(self.failures) + len(self.successes) == self.m: 
     99            self._packageZ() 
     100            return True 
     101        return False 
     102     
     103    def view_get(self, perspective): 
     104        if perspective not in self.chunks: 
     105            self.chunks[perspective] = copy(self.zPackedChunks) 
     106        chunks = self.chunks[perspective] 
     107        if chunks: 
     108            return self.chunks[perspective].pop(0) 
     109 
     110 
    34111class ModelManager(object): 
    35112    """ 
     
    41118    Instantiate me with the text of a specification for the project and model 
    42119    I'm going to manage. 
     120 
     121    @ivar zFetcher: An instance of L{ZFetcher} that is supplied to my model, 
     122      through which remote (and local) instances of the model access the normal 
     123      variates underlying the latest log-volatility simulations. 
    43124     
    44125    """ 
     
    56137        M = modelObj 
    57138 
    58     def likelihood(paramContainer): 
    59         result = M.likelihood(paramContainer
    60         if result is None: 
    61             return 
    62         return Packer(*result)(
     139    def hybridGibbs(paramContainer): 
     140        return Packer(*M.hybridGibbs(paramContainer)
     141     
     142    def likelihood(paramContainer, zFilePath): 
     143        return M.likelihood(paramContainer, iteration
    63144 
    64145    ########################################################################### 
     
    67148        kw['y'] = y 
    68149        kw['paramNames'] = projectManager.paramNames 
     150        kw['zFetcher'] = self.zFetcher = ZFetcher( 
     151            projectManager.m, projectManager.p, projectManager.n) 
     152        #??????????????????????????? 
     153        #kw['zFetcherRemote'] =  
    69154        self.modelObj = Model(**kw) 
    70155        self.queue = projectManager.queue 
     156        self.iteration = -1 
    71157 
    72158    def _oops(self, failure): 
     
    107193        d.addErrback(self._oops) 
    108194        return d 
     195 
     196    def nextIteration(self): 
     197        """ 
     198        """ 
     199        self.iteration += 1 
     200        self.zFetcher.restart() 
     201 
     202    def zSimulation(self, paramContainer, member, remote=False, local=False): 
     203        """ 
     204        Runs a new log-volatily simulation for the I{paramContainer}, saving 
     205        its result in my I{z} array and updating my instance of L{ZFetcher} 
     206        with the array when it has been completely updated. 
     207 
     208        Updates the container in place with the last simulated log-volatility 
     209        value. 
     210         
     211        Returns a deferred that fires when the simulation is done, with C{True} 
     212        if it succeeded or C{False} if not. 
     213        """ 
     214        def update(z=None): 
     215            simulationsDone = self.zFetcher.update(member, z) 
     216 
     217        def unpackResult(packedResult): 
     218            if packedResult: 
     219                return list(pack.Unpacker(packedResult)) 
     220 
     221        def simulationDone(result): 
     222            if result is None: 
     223                update() 
     224                return False 
     225            update(result[0]) 
     226            paramContainer.h = result[1] 
     227            return True 
     228         
     229        if (remote or self.remoteMode) and not local: 
     230            d = self.client.run( 
     231                'hybridGibbs', paramContainer, timeout=self.timeout) 
     232            d.addCallback(unpackResult) 
     233        else: 
     234            d = self.queue.call( 
     235                self.modelObj.hybridGibbs, paramContainer) 
     236        return d.addCallbacks(simulationDone, self._oops) 
    109237     
    110238    def likelihood(self, paramContainer, remote=False, local=False): 
     
    113241        I{paramContainer} for my model. 
    114242 
    115         Updates the container in-place with the log-likelihood and an array of 
    116         volatility shocks on which the likelihood computation is based, after 
    117         undergoing some nested MCMC using the specified jump deviation 
    118         I{sigma}. 
     243        Updates the container in-place with the log-likelihood. 
    119244 
    120245        Returns a deferred that fires with a reference to the paramContainer 
     
    125250                # An error from the remote likelihood method call is treated as 
    126251                # infinitely low likelihood 
    127                 result = (-s.inf, 0, [], []) 
    128             elif isinstance(result, str): 
    129                 # Unpack string result from remote call 
    130                 result = list(pack.Unpacker(result)) 
    131             for k, name in enumerate(('Lx', 'z', 'h')): 
    132                 setattr(paramContainer, name, result[k]) 
     252                result = -s.inf 
     253            paramContainer.Lx = result 
    133254            return paramContainer 
    134255         
    135256        if (remote or self.remoteMode) and not local: 
    136257            d = self.client.run( 
    137                 'likelihood', paramContainer, timeout=self.timeout) 
     258                'likelihood', paramContainer, 
     259                self.iteration, timeout=self.timeout) 
    138260        else: 
    139261            d = self.queue.call( 
    140                 self.modelObj.likelihood, paramContainer
     262                self.modelObj.likelihood, paramContainer, self.iteration
    141263        return d.addCallbacks(gotLikelihood, self._oops) 
    142264 
     
    184306      generator. 
    185307 
    186     @ivar Ni: Number of hybrid Gibbs iterations per likelihood evaluation. 
    187      
    188     """ 
    189     keyAttrs = {'y':None, 'wiggle':1.0, 'Ni':100} 
     308    @ivar Ni: Number of hybrid Gibbs iterations per log-volatility simulation. 
     309     
     310    """ 
     311    keyAttrs = {'y':None, 'wiggle':1.0, 'Ni':50} 
    190312 
    191313    #--- Properties ----------------------------------------------------------- 
     
    228350                i += 1 
    229351        return cv 
    230      
    231  
     352 
     353    def setParams(self, paramContainer): 
     354        """ 
     355        """ 
     356        # Set my parameters 
     357        self.setParamSequence(paramContainer.paramSequence()) 
     358 
     359        # Set up array variables for the inline call, including innovations as 
     360        # residuals of my observations run through the reversed VAR(1)... 
     361        self.x = self.var.reverse(self.a, self.b) 
     362        # ...concurrent cross-correlations between volatility shocks and 
     363        # inverse of concurrent cross-correlations between innovation shocks 
     364        # (we'll bail out with no result if either is invalid), and... 
     365        try: 
     366            self.rz = linalg.cholesky( 
     367                self.covarMatrix(self.fs, self.fr), lower=True).astype('d') 
     368            self.ri = linalg.inv(linalg.cholesky(self.covarMatrix( 
     369                s.ones(self.p), self.cr), lower=True)).astype('d') 
     370        except linalg.LinAlgError: 
     371            return 
     372        # ...scaling constants for multivariate normal pdf 
     373        self.sc = s.empty((self.p, 2)) 
     374        self.sc[:,0] = 1.0 / s.sqrt(1.0 - self.g**2) 
     375        self.sc[:,1] = s.log(s.sqrt(2*s.pi) / self.sc[:,0]) 
     376 
     377    def getZ(self, zFilePath): 
     378        """ 
     379        Returns a 3-D array containing normal variates underlying simulated 
     380        multivariate log-volatilities for the N different sets of parameters in 
     381        the current iteration of the overall PMC simulation. 
     382 
     383        The array shape is (parameter set, time series, time-series sample). 
     384         
     385        """ 
     386        @defer.deferredGenerator 
     387        def getChunks(): 
     388            chunks = [] 
     389            while True: 
     390                wfd = defer.waitForDeferred( 
     391                    self.zFetcherRemote.callRemote('get')) 
     392                yield wfd 
     393                chunk = wfd.getResult() 
     394                if not chunk: 
     395                    break 
     396                chunks.append(chunk) 
     397             
     398            self._zArray = pack.Unpacker("".join(chunks))() 
     399            self._zArray = (ZFetcher.intScaleDown * self._zArray.astype('d')) 
     400            self._zID = iteration 
     401 
     402        # Local mode access 
     403        if hasattr(self, 'zFetcher'): 
     404            return defer.succeed(self.zFetcher.getZ(fullPrecision=True)) 
     405        # Remote mode access 
     406        if getattr(self, '_zID', None) != iteration: 
     407            if hasattr(self, '_d_getZ'): 
     408                d = defer.Deferred() 
     409                self._d_getZ.chainDeferred(d) 
     410            else: 
     411                d = self._d_getZ = getChunks() 
     412        else: 
     413            d = defer.succeed(None) 
     414        d.addCallback(lambda _: self._zArray) 
     415        return d 
     416     
     417     
    232418    #--- The API -------------------------------------------------------------- 
    233      
    234     def likelihood(self, paramContainer): 
    235         """ 
    236         Call this method with a parameter object that defines: 
    237  
    238             1. A set of values for my parameters. 
    239  
    240             2. A latent parameter I{z} consisting of an array of IID normal 
    241                variates to be used as a starting point for another simulation 
    242                draw of log volatilities. 
     419 
     420    def hybridGibbs(self, paramContainer): 
     421        """ 
     422        Call this method with a parameter object that defines a set of values 
     423        for my parameters. 
    243424 
    244425        A new 2-D array of log-volatilities will be drawn from a hybrid Gibbs 
     
    255436        returns a tuple containing the following: 
    256437         
    257             1. The log-likelihood of my observations given the parameters, from 
    258                an integration of the join probability of the observations and 
    259                simulated log-volatilities over the simulation rounds. 
     438            1. The IID normal variates underlying the simulated 
     439               log-volatilities after the last simulation round. 
     440 
     441            2. The multivariate log-volatility value for the last time-series 
     442               sample after the last simulation round. (Useful for 
     443               extrapolating from the fitted model.) 
     444 
     445        """ 
     446        self.setParams(paramContainer) 
     447        # Create the normal variate array with a standard starting point for 
     448        # every log-volatility simulation 
     449        z = s.zeros_like(self.x) 
     450        # This will contain the simulated log-volatility of the last 
     451        # time-series sample 
     452        h = s.empty(self.p) 
     453        self.inline( 
     454            'z', 'h', 'x', 'w', 'ni', 
     455            'd', 'e', 'g', 'rz', 'ri', 'sc', 
     456            w=self.wiggle, ni=self.Ni) 
     457        return z, h 
     458 
     459    def likelihood(self, paramContainer, zFilePath): 
     460        """ 
     461        Call this method with a parameter object that defines a set of values 
     462        for my parameters and an integer specifying which PMC iteration being 
     463        run. 
     464 
     465        If a L{linalg.LinAlgError} is raised due to an invalid correlation 
     466        matrix parameter, the method returns with no result. Otherwise, it 
     467        returns the log-likelihood of my observations given the parameters, 
     468        from an integration of the joint probability of the observations and 
     469        simulated log-volatilities over the simulation rounds. 
    260470 
    261471            2. The IID normal variates underlying the simulated 
     
    272482        what, making any call to this method with it a waste of computing time. 
    273483        """ 
    274         # Set my parameters... 
    275         self.setParamSequence(paramContainer.paramSequence()) 
    276         # ...including the latent parameter of IID normal variates 
    277         z = paramContainer.z.copy() 
    278  
    279         # Set up array variables for the inline call, including innovations as 
    280         # residuals of my observations run through the reversed VAR(1)... 
    281         x = self.var.reverse(self.a, self.b) 
    282         # ... the log-volatility of the last time-series sample... 
    283         h = s.empty(self.p) 
    284         # ...concurrent cross-correlations between volatility shocks and 
    285         # inverse of concurrent cross-correlations between innovation shocks 
    286         # (we'll bail out with no result if either is invalid), and... 
    287         try: 
    288             rz = linalg.cholesky( 
    289                 self.covarMatrix(self.fs, self.fr), lower=True).astype('d') 
    290             ri = linalg.inv(linalg.cholesky(self.covarMatrix( 
    291                 s.ones(self.p), self.cr), lower=True)).astype('d') 
    292         except linalg.LinAlgError: 
    293             return 
    294         # ...scaling constants for multivariate normal pdf 
    295         sc = s.empty((self.p, 2)) 
    296         sc[:,0] = 1.0 / s.sqrt(1.0 - self.g**2) 
    297         sc[:,1] = s.log(s.sqrt(2*s.pi) / sc[:,0]) 
    298  
    299         # Run the hybrid Gibbs rounds 
    300         Lx = self.inline( 
    301             'z', 'h', 'x', 'w', 'ni', 
    302             'd', 'e', 'g', 'rz', 'ri', 'sc', 
    303             w=self.wiggle, ni=self.Ni) 
    304         return Lx, z, h 
     484        self.setParams(paramContainer) 
     485        return self.inline( 
     486            'z', 'x', 'd', 'e', 'g', 'rz', 'ri', 'sc', 
     487            z=self.getZ(zFilePath)) 
     488     
     489                       
     490 
  • projects/AsynCluster/trunk/svpmc/params.py

    r203 r205  
    239239    In addition to the parameter arrays, I house as key attributes: 
    240240 
    241         - A latent parameter array I{z}. 
    242  
    243241        - A scalar I{Lx} with the log-likelihood of the model's data given my 
    244           parameters and the latent parameter I{z}. 
     242          parameters and some externally supplied normal variates underlying 
     243          sets of simulated log-volatilites. 
    245244 
    246245        - A scalar I{Lp} with the prior log-probability density of the 
     
    250249          resulting in my parameters. 
    251250 
    252         - A scalar I{Lh} with the log-probability density of the simulated 
    253           log-volatilities, reflected in the simulated values of I{z}
    254      
    255     """ 
    256     keyAttrs = {'z':[], 'Lx':None, 'Lp':None, 'Lj':None, 'Lh':None} 
     251        - A 1-D array I{h} containing the last multivariate log-volatility 
     252          value of any log-volatility simulation yet done on me
     253     
     254    """ 
     255    keyAttrs = {'Lx':None, 'Lp':None, 'Lj':None, 'h':None} 
    257256 
    258257 
     
    431430        """ 
    432431        Lp = 0 
    433         paramContainer = ParameterContainer( 
    434             paramNames=self.paramNames, z=s.randn(self.p, self.n)) 
     432        paramContainer = ParameterContainer(paramNames=self.paramNames) 
    435433        for name in self.paramNames: 
    436434            priorFlexArray = getattr(self, name) 
     
    439437            setattr(paramContainer, name, paramArray) 
    440438        paramContainer.Lp = Lp 
     439        paramContainer.Lj = 0 # There was no jump 
    441440        return paramContainer 
    442441 
     
    449448        deviations in the jump distribution. 
    450449 
    451         The returned parameter container object will have a copy of the 
    452         supplied instance's latent parameter array I{z} and new values of I{Lp} 
     450        The returned parameter container object will have new values of I{Lp} 
    453451        and I{Lj} corresponding to the proposal. 
    454452 
     
    464462                "You must supply a ParameterContainer as the first "+\ 
    465463                "argument, not a '%s'" % type(paramContainer)) 
    466         newVersion = ParameterContainer( 
    467             paramNames=self.paramNames, z=s.array(paramContainer.z).copy()) 
     464        newVersion = ParameterContainer(paramNames=self.paramNames) 
    468465        Lp = 0 
    469466        # Define a 1-D array holding the probability density for each jump, 
  • projects/AsynCluster/trunk/svpmc/pmc.py

    r201 r205  
    5252        self.rMax = N - self.rMin*(self.D-1) 
    5353        self.updateAllocations() 
     54        self.II = None 
    5455     
    5556    def subsetIndex(self, k): 
     
    6566        """ 
    6667        Call this method with a reference to an empty list that will hold 
    67         assembled results. 
    68  
    69         For each of my bins, the method will yield the bin index, a 1-D array 
    70         of indices for the subset of my population allocated to that bin, and a 
    71         fresh deferred already fired with C{None}. You must add a callback to 
    72         the deferred for each iteration that returns a tuple of 1-D arrays, 
    73         each being the same length as the yielded index array. 
     68        results assembled from an allocation that I'll be (re)doing. If you 
     69        want to re-use the same allocation as from a previous call to this 
     70        method, set the I{lastAllocation} keyword C{True}. 
     71 
     72        The method will allocate the population members into my I{D} bins, 
     73        unless we're re-using the last allocation. For each bin, the method 
     74        will yield the bin index, a 1-D array of indices for the subset of my 
     75        population allocated to that bin, and a fresh deferred already fired 
     76        with C{None}. You must add a callback to the deferred for each 
     77        iteration that returns a tuple of 1-D arrays, each being the same 
     78        length as the yielded index array. 
    7479         
    7580        Each returned array for each subset will be assembled back into a 
     
    9499            raise ValueError( 
    95100                "You must supply an empty list to hold your assembled results") 
    96         self.II = [None] * self.D 
     101        if self.II is None: 
     102            lastAllocation = False 
     103            self.II = [None] * self.D 
     104        else: 
     105            lastAllocation = True 
    97106        for k in s.flipud(s.argsort(self.R)): 
    98             I = self.II[k] = self.subsetIndex(k) 
     107            if lastAllocation: 
     108                I = self.II[k] 
     109            else: 
     110                I = self.II[k] = self.subsetIndex(k) 
    99111            d = defer.succeed(None) 
    100112            yield k, I, d 
     
    115127        if I is None: 
    116128            R = s.ones(self.D) / self.D 
    117         elif hasattr(self, 'II')
     129        elif self.II is not None
    118130            R = s.array( 
    119131                [sum([x in self.II[k] for x in I]) for k in xrange(self.D)]) 
    120132            R = R.astype(float) / R.sum() 
     133            self.II = None 
    121134        else: 
    122135            raise AttributeError( 
     
    140153 
    141154    """ 
    142     chunkSize = 200 
    143      
    144155    def __init__(self, projectManager, socket=None): 
    145156        self.socket = socket 
     
    149160        self.resampler = Resampler(logWeights=True)             
    150161        self.allocator = Allocator(projectManager.m, len(projectManager.V)) 
     162 
     163    def _oops(self, failure, **kw): 
     164        text = "\nFailure" 
     165        if kw: 
     166            text += " with " 
     167            for name, value in kw.iteritems(): 
     168                text += "%s=%s" % (name, value) 
     169        print "%s\n%s" % (text, failure.getTraceback()) 
    151170     
    152     def initialPopulation(self, N): 
    153         """ 
    154         Generates a new population of I{N} parameters from the priors and 
    155         returns a 1-D FlexArray of their parameter containers. 
    156         """ 
    157         print "Initializing population:" 
    158         X = params.FlexArray(N) 
    159         for k, paramContainer in enumerate(X): 
    160             X[k] = self.pm.priors.new() 
    161             print ".", 
    162             sys.stdout.flush() 
    163         return X 
    164  
    165     def proposals(self, X, vIndex): 
    166         """ 
    167         Returns a 1-D FlexArray of parameter containers resulting in proposals 
    168         from the supplied FlexArray of parameter containers I{X}, given the 
    169         jump deviation corresponding to the specified I{vIndex}. 
    170  
    171         Proposals with zero probability of occuring, given the parameters' 
    172         prior distributions, are censored. (There's no point considering the 
    173         likelihood of proposals that cannot occur.) 
    174         """ 
    175         XP = params.FlexArray(len(X)) 
    176         for k, paramContainer in enumerate(X): 
    177             XP[k] = self.pm.priors.proposal( 
    178                 paramContainer, vIndex, self.allocator.W) 
    179         return XP 
     171    def proposals(self, *args): 
     172        """ 
     173        Call this method with a 1-D FlexArray of parameter containers and an 
     174        integer index of a particular jump deviation, and it will generate 
     175        proposals from the existing parameters. 
     176 
     177        Call it with just an integer number of proposals, and it will generate 
     178        new ones directly from the priors. 
     179 
     180        Runs a log-volatility simulation for each proposal via the model 
     181        manager, which constructs a 3-D array of the normal variates underlying 
     182        the entire set of simulations. 
     183 
     184        Returns a deferred that fires when all the log-volatility simulations 
     185        are done. The deferred fires with a 1-D FlexArray of the proposals 
     186        along with an equal-length 1-D bool array of True/False values 
     187        indicating which of them resulted in valid log-volatility simulations. 
     188        """ 
     189        def simulationDone(success): 
     190            if success: 
     191                print vIndex, 
     192            else: 
     193                print ".", 
     194            I.append(success) 
     195 
     196        def allDone(null): 
     197            return XP, s.array(I) 
     198         
     199        dList = [] 
     200        I = [] 
     201        if len(args) == 2: 
     202            new = False 
     203            X, vIndex = args 
     204            N = len(X) 
     205        else: 
     206            new = True 
     207            N = args[0] 
     208            vIndex = "+" 
     209        XP = params.FlexArray(N) 
     210        print "|", 
     211        for k in xrange(N): 
     212            if new: 
     213                proposalPC = self.pm.priors.new() 
     214            else: 
     215                proposalPC = self.pm.priors.proposal( 
     216                    X[k], vIndex, self.allocator.W) 
     217            XP[k] = proposalPC 
     218            d = self.mm.zSimulation(proposalPC, k) 
     219            d.addCallback(simulationDone) 
     220            d.addErrback(self._oops, k=k) 
     221            dList.append(d) 
     222        return defer.DeferredList(dList).addCallback(allDone) 
    180223     
    181     @defer.deferredGenerator 
    182     def weightedProposals(self, X, vIndex): 
    183         """ 
    184         Returns a deferred that fires with a 1-D FlexArray of proposals from 
    185         the parameter containers in the supplied FlexArray I{X}, given the 
    186         specified proposal deviation index I{vIndex}, along with a 
    187         like-dimensioned array of linear, fractional importance weights for the 
    188         proposals. 
     224    def weights(self, XP, vIndex): 
     225        """ 
     226        Returns a deferred that fires with a 1-D array of log importance 
     227        weights for the proposals supplied in the 1-D FlexArray I{XP}. 
    189228         
    190229        The importance weights are to be combined with those from other calls 
    191230        to this method with different variance settings. The weights will be 
    192         used to resample the returned proposals so that the probability of any 
     231        used to resample the supplied proposals so that the probability of any 
    193232        given proposal being included in the final result for this iteration is 
    194233        proportional to its likelihood and prior probability density, and 
     
    207246            print "%6.4f\t%+12.2f\t%s" % (self.pm.V[vIndex], L, info) 
    208247            return L 
    209  
    210         def evaluateChunk(XP): 
    211             XP_list.append(XP) 
    212             # Here's where the vast majority of the CPU time is expended! 
    213             dList = [ 
    214                 self.mm.likelihood(XPk).addCallback(weight) for XPk in XP] 
    215             return defer.gatherResults(dList) 
    216          
    217         j = 0 
    218         N = len(X) 
    219         XP_list = [] 
    220         W = s.empty(len(X)) 
    221         while j < N: 
    222             N_this = min([self.chunkSize, N-j]) 
    223             X_this = X[j:j+N_this] 
    224             # Generate a chunk of proposals (somewhat time-consuming, done 
    225             # locally in a thread)... 
    226             d = self.queue.call(self.proposals, X_this, vIndex) 
    227             # ...and then evaluate the proposals (really time-consuming, done 
    228             # either locally in a thread or in the cluster) 
    229             d.addCallback(evaluateChunk) 
    230             wfd = defer.waitForDeferred(d) 
    231             yield wfd 
    232             W[j:j+N_this] = wfd.getResult() 
    233             j += N_this 
    234         yield params.FlexArray.concatenate(XP_list), W 
     248         
     249        # Evaluate the proposals (very time-consuming, done either locally in a 
     250        # thread or in the cluster) 
     251        dList = [self.mm.likelihood(XPk).addCallback(weight) for XPk in XP] 
     252        return defer.gatherResults(dList).addCallback(lambda W: s.array(W)) 
    235253 
    236254    @defer.deferredGenerator 
     
    249267 
    250268        """ 
     269        def gotTheseWeights(Wv, I, Iv): 
     270            """ 
     271            Returns a 1-D weights array of the same length as this allocation's 
     272            length, with infinitely negative values except for the valid 
     273            weights supplied in I{Wv}. 
     274            """ 
     275            W = -s.inf * s.ones(len(I)) 
     276            W[Iv] = Wv 
     277            return W 
     278 
     279        X = None 
    251280        if self.socket: 
    252281            wfd = defer.waitForDeferred(self.mm.setRemoteMode(self.socket)) 
    253282            yield wfd; wfd.getResult() 
    254283        N_members = self.pm.m 
    255         # Initialize some arrays 
    256         X = self.initialPopulation(N_members) 
    257         # Iteration 
    258284        print "\nRunning %d iterations with %d population members" \ 
    259285              % (N_iter, N_members) 
     286        # For each iteration... 
    260287        for i in xrange(N_iter): 
    261288            print "\n%03d\n%s" % (i+1, "-"*100) 
     289            self.mm.nextIteration() 
     290            if X is None: 
     291                print "Generating %d proposals from priors:" % N_members 
     292                # No existing members, generate proposals directly from the 
     293                # priors 
     294                wfd = defer.waitForDeferred(self.proposals(N_members)) 
     295                yield wfd 
     296                XP, validXP = wfd.getResult() 
     297            else: 
     298                print "Generating %d proposals from previous population:" \ 
     299                      % N_members 
     300                # Regular iteration, use proposals generated from the last 
     301                # iteration's members 
     302                dList, resultList = [], [] 
     303                for vIndex, I, d in self.allocator.assembler(resultList): 
     304                    d.addCallback(lambda _: self.proposals(X[I], vIndex)) 
     305                dList.append(d) 
     306                wfd = defer.waitForDeferred(defer.DeferredList(dList)) 
     307                yield wfd 
     308                wfd.getResult() 
     309                XP, validXP = resultList 
     310            print "\n" 
     311            # Evaluate the proposals 
    262312            dList, resultList = [], [] 
    263313            for vIndex, I, d in self.allocator.assembler(resultList): 
    264                 d.addCallback(lambda _: self.weightedProposals(X[I], vIndex)) 
     314                Iv = validXP[I].nonzero()[0] 
     315                d.addCallback(lambda _: self.weights(XP[I][Iv], vIndex)) 
     316                d.addCallback(gotTheseWeights, I, Iv) 
     317                d.addErrback(self._oops, vIndex=vIndex) 
    265318                dList.append(d) 
    266             # Record the previous iteration's results while the nodes work on 
    267             # the current iteration... 
    268             if i > 0
     319            # Record the previous iteration's members while the nodes work on 
     320            # evaluating the valid proposals for the current iteration... 
     321            if X is not None
    269322                self.pm.writeParams(X)