Changeset 206

Show
Ignore:
Timestamp:
06/01/08 02:06:56 (6 months ago)
Author:
edsuom
Message:

Got Rao-Blackwellized (on z only) version running locally, not yet remotely nor checked out for performance

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/asyncluster/master/control.py

    r175 r206  
    380380        to the callable object specified in the job's namespace on each node 
    381381        before it runs another call for that job. 
     382 
     383        If you don't want the task saved for future workers, but only run on 
     384        the workers currently attached, set the I{ephemeral} keyword C{True}. 
    382385        """ 
    383386        return self.ctl.jobber.update(jobID, callName, *args, **kw) 
  • projects/AsynCluster/trunk/svpmc/model.py

    r205 r206  
    3535 
    3636 
     37INT_SCALE = 2000 
     38 
     39 
    3740class ZFetcher(pb.Referenceable): 
    3841    """ 
     
    4043    latest log-volatility simulations. 
    4144 
    42     ???????????????????????????? 
     45    @ivar z: The full-precision 3-D array of normal variates underlying the 
     46      current set of valid log-volatility simulations. 
     47 
     48    @ivar zPackedChunks: A list of strings containing a chunked, packed version 
     49      of z. 
     50     
    4351    """ 
    4452    chunkSize = 60000 
    45     intScaleUp = 2000 
    46     intScaleDown = 1.0 / intScaleUp 
    4753     
    4854    def __init__(self, m, p, n): 
    4955        self.m = m 
    50         self.z = s.empty((m, p, n)) 
    51  
    52     def getZ(self, fullPrecision=False): 
    53         # Transmitting at short int cuts data size to one fourth, still offers 
    54         # full needed range with reasonable precision 
    55         result = self.z[self.successes,:,:] 
    56         if fullPrecision: 
    57             return result 
    58         return (self.intScaleUp*result).astype('h') 
    59  
    60     def _packageZ(self): 
    61         zPacked = pack.Packer(self.getZ())() 
    62         k = 0 
    63         N = len(zPacked) 
    64         self.zPackedChunks = [] 
    65         while k < N: 
    66             chunk = zPacked[k:k+self.chunkSize] 
    67             self.zPackedChunks.append(chunk) 
    68             k += self.chunkSize 
    69  
     56        self._z = s.empty((m, p, n)) 
     57        self.restart() 
     58 
     59    def _get_z(self): 
     60        return self._z[self.successes,:,:] 
     61    z = property(_get_z) 
     62     
    7063    def restart(self): 
    7164        self.failures = [] 
    7265        self.successes = [] 
    73         self.chunks = {} 
    7466        self.zPackedChunks = None 
    7567     
     
    9587        else: 
    9688            self.successes.append(member) 
    97             self.z[member,:,:] = z 
     89            self._z[member,:,:] = z 
    9890        if len(self.failures) + len(self.successes) == self.m: 
    99             self._packageZ() 
     91            zPacked = pack.Packer(self.z)() 
     92            k = 0 
     93            N = len(zPacked) 
     94            self.zPackedChunks = [] 
     95            while k < N: 
     96                chunk = zPacked[k:k+self.chunkSize] 
     97                self.zPackedChunks.append(chunk) 
     98                k += self.chunkSize 
    10099            return True 
    101100        return False 
    102101     
    103     def view_get(self, perspective): 
    104         if perspective not in self.chunks: 
    105             self.chunks[perspective] = copy(self.zPackedChunks) 
    106         chunks = self.chunks[perspective] 
    107         if chunks: 
    108             return self.chunks[perspective].pop(0) 
    109  
    110102 
    111103class ModelManager(object): 
     
    119111    I'm going to manage. 
    120112 
    121     @ivar zFetcher: An instance of L{ZFetcher} that is supplied to my model, 
    122       through which remote (and local) instances of the model access the normal 
    123       variates underlying the latest log-volatility simulations. 
    124      
    125113    """ 
    126114    remoteMode = False 
     
    129117    nodecode = """ 
    130118    ########################################################################### 
    131     from twisted_goodies.pybywire.pack import Packer 
    132      
    133     M = None 
     119    from twisted_goodies.pybywire import pack 
     120     
     121    MODEL = None 
     122    CHUNKS = [] 
    134123 
    135124    def newModel(modelObj): 
    136         global M 
    137         M = modelObj 
    138  
     125        global MODEL 
     126        MODEL = modelObj 
     127 
     128    def newZ(zPacked, k, N): 
     129        global CHUNKS 
     130        if k == 0: 
     131            CHUNKS = [] 
     132        CHUNKS.append(zPacked) 
     133        if k == N-1: 
     134            MODEL.z = %f * pack.Unpacker(''.join(CHUNKS)).astype('d') 
     135     
    139136    def hybridGibbs(paramContainer): 
    140         return Packer(*M.hybridGibbs(paramContainer)) 
    141      
    142     def likelihood(paramContainer, zFilePath): 
    143         return M.likelihood(paramContainer, iteration
     137        return pack.Packer(*MODEL.hybridGibbs(paramContainer)) 
     138     
     139    def likelihood(paramContainer): 
     140        return MODEL.likelihood(paramContainer
    144141 
    145142    ########################################################################### 
    146     """ 
     143    """ % (1.0/INT_SCALE,) 
     144     
    147145    def __init__(self, projectManager, y, **kw): 
    148146        kw['y'] = y 
    149147        kw['paramNames'] = projectManager.paramNames 
    150         kw['zFetcher'] = self.zFetcher = ZFetcher( 
     148        self.zFetcher = ZFetcher( 
    151149            projectManager.m, projectManager.p, projectManager.n) 
    152         #??????????????????????????? 
    153         #kw['zFetcherRemote'] =  
    154150        self.modelObj = Model(**kw) 
    155151        self.queue = projectManager.queue 
    156         self.iteration = -1 
    157152 
    158153    def _oops(self, failure): 
     
    188183        d.addCallback( 
    189184            lambda _: self.client.registerClasses(*Parameterized.registry)) 
    190         d.addCallback( 
    191             lambda _: self.client.update('newModel', self.modelObj)) 
     185        d.addCallback(lambda _: self.client.update('newModel', self.modelObj)) 
    192186        d.addCallback(lambda _: setattr(self, 'remoteMode', True)) 
    193187        d.addErrback(self._oops) 
    194188        return d 
    195189 
    196     def nextIteration(self): 
    197         """ 
    198         """ 
    199         self.iteration += 1 
     190    @defer.deferredGenerator 
     191    def nextIteration(self, remote=False, local=False): 
     192        """ 
     193        """ 
     194        self.modelObj.z = self.zFetcher.z 
     195        if (remote or self.remoteMode) and not local: 
     196            chunks = self.zFetcher.zPackedChunks 
     197            N = len(chunks) 
     198            for k, chunk in enumerate(chunks): 
     199                wfd = defer.waitForDeferred( 
     200                    self.client.update('newZ', chunk, k, N, ephemeral=True)) 
     201                yield wfd 
     202                wfd.getResult() 
    200203        self.zFetcher.restart() 
    201  
     204     
    202205    def zSimulation(self, paramContainer, member, remote=False, local=False): 
    203206        """ 
     
    253256            paramContainer.Lx = result 
    254257            return paramContainer 
    255          
     258 
    256259        if (remote or self.remoteMode) and not local: 
    257260            d = self.client.run( 
    258                 'likelihood', paramContainer, 
    259                 self.iteration, timeout=self.timeout) 
     261                'likelihood', paramContainer, timeout=self.timeout) 
    260262        else: 
    261263            d = self.queue.call( 
    262                 self.modelObj.likelihood, paramContainer, self.iteration
     264                self.modelObj.likelihood, paramContainer
    263265        return d.addCallbacks(gotLikelihood, self._oops) 
    264266 
     
    374376        self.sc[:,0] = 1.0 / s.sqrt(1.0 - self.g**2) 
    375377        self.sc[:,1] = s.log(s.sqrt(2*s.pi) / self.sc[:,0]) 
    376  
    377     def getZ(self, zFilePath): 
    378         """ 
    379         Returns a 3-D array containing normal variates underlying simulated 
    380         multivariate log-volatilities for the N different sets of parameters in 
    381         the current iteration of the overall PMC simulation. 
    382  
    383         The array shape is (parameter set, time series, time-series sample). 
    384          
    385         """ 
    386         @defer.deferredGenerator 
    387         def getChunks(): 
    388             chunks = [] 
    389             while True: 
    390                 wfd = defer.waitForDeferred( 
    391                     self.zFetcherRemote.callRemote('get')) 
    392                 yield wfd 
    393                 chunk = wfd.getResult() 
    394                 if not chunk: 
    395                     break 
    396                 chunks.append(chunk) 
    397              
    398             self._zArray = pack.Unpacker("".join(chunks))() 
    399             self._zArray = (ZFetcher.intScaleDown * self._zArray.astype('d')) 
    400             self._zID = iteration 
    401  
    402         # Local mode access 
    403         if hasattr(self, 'zFetcher'): 
    404             return defer.succeed(self.zFetcher.getZ(fullPrecision=True)) 
    405         # Remote mode access 
    406         if getattr(self, '_zID', None) != iteration: 
    407             if hasattr(self, '_d_getZ'): 
    408                 d = defer.Deferred() 
    409                 self._d_getZ.chainDeferred(d) 
    410             else: 
    411                 d = self._d_getZ = getChunks() 
    412         else: 
    413             d = defer.succeed(None) 
    414         d.addCallback(lambda _: self._zArray) 
    415         return d 
    416378     
    417379     
     
    457419        return z, h 
    458420 
    459     def likelihood(self, paramContainer, zFilePath): 
     421    def likelihood(self, paramContainer): 
    460422        """ 
    461423        Call this method with a parameter object that defines a set of values 
    462         for my parameters and an integer specifying which PMC iteration being 
    463         run. 
     424        for my parameters. 
    464425 
    465426        If a L{linalg.LinAlgError} is raised due to an invalid correlation 
     
    467428        returns the log-likelihood of my observations given the parameters, 
    468429        from an integration of the joint probability of the observations and 
    469         simulated log-volatilities over the simulation rounds. 
    470  
    471             2. The IID normal variates underlying the simulated 
    472                log-volatilities after the last simulation round. 
    473  
    474             3. The multivariate log-volatility value for the last time-series 
    475                sample after the last simulation round. (Useful for 
    476                extrapolating from the fitted model.) 
     430        log-volatilities simulated from all the other parameters in the current 
     431        PMC population. 
    477432         
    478433        The returned likelihood does not consider the prior probability of the 
     
    483438        """ 
    484439        self.setParams(paramContainer) 
    485         return self.inline( 
    486             'z', 'x', 'd', 'e', 'g', 'rz', 'ri', 'sc', 
    487             z=self.getZ(zFilePath)) 
     440        return self.inline('z', 'x', 'd', 'e', 'g', 'rz', 'ri', 'sc') 
    488441     
    489442                       
  • projects/AsynCluster/trunk/svpmc/params.py

    r205 r206  
    284284 
    285285    class Jump(object): 
    286         __slots__ = ('x', 'px', 'pV') 
    287         def __init__(self, x, px, pV): 
     286        __slots__ = ('x', 'px', 'pJump') 
     287        def __init__(self, x, px, pJump): 
    288288            self.x = x 
    289289            self.px = px 
    290             self.pV = pV 
     290            self.pJump = pJump 
    291291     
    292292    def _get_distObj(self): 
     
    320320        D = len(self.V) 
    321321        rJumps = self.rdsObj.rvs(self.N_draw) 
    322         # Right side 
    323         V = self.V[::-1] 
    324         pJumpsL = self.rdsObj.pdf( 
    325             s.kron(rJumps, V).reshape(self.N_draw, D)) 
    326         # Left side 
    327         V = 1.0/V[:0:-1] 
    328         pJumpsR = self.rdsObj.pdf( 
    329             s.kron(rJumps, V).reshape(self.N_draw, D-1)) 
    330         pJumps = s.column_stack([pJumpsL, pJumpsR]) 
     322        pJumps = self.rdsObj.pdf(rJumps) 
    331323        return {'k':-1, 'N':self.N_draw, 'R':rJumps, 'P':pJumps} 
    332324     
     
    340332                draw = self.cache['draw'] = self._newDraw() 
    341333        k = draw['k'] 
    342         return draw['R'][k], draw['P'][k,:
     334        return draw['R'][k], draw['P'][k
    343335     
    344336    def pdf(self, x): 
     
    367359        standard deviation width of a normal distribution.) 
    368360         
    369         Returns a tiny object with two attributes, conveniently accessible as 
     361        Returns a tiny object with three attributes, conveniently accessible as 
    370362        part of a L{FlexArray}: 
    371363 
    372364            - B{x}: the value after the jump, and 
    373365 
    374             - B{pV}: a 1-D array of probabilities of the jump for each jump 
    375               deviation in my array I{V}. 
     366            - B{px}: the probability density of I{x}, given the prior 
     367              distribution. 
     368 
     369            - B{pj}: the probability density of the jump, given the jump 
     370              deviation used. 
    376371         
    377372        For my Gaussian-PDF deviate generator, the probability density of a 
     
    380375        each possible jump deviation in the I{p} array of the result. 
    381376        """ 
    382         D = len(self.V) 
     377        v = self.V[vIndex] 
     378        vInverse = 1.0 / v 
    383379        for k in xrange(self.N_attempts): 
    384             jumpValue, P = self._rJumpDraw() 
    385             jumpValue *= self.V[vIndex] 
    386             pV = P[D-vIndex-1:2*D-vIndex-1] / self.V 
    387             x = startValue + jumpValue 
     380            rJump, pJump = self._rJumpDraw() 
     381            rJump *= v 
     382            pJump *= vInverse 
     383            x = startValue + rJump 
    388384            px = self.pdf(x) 
    389385            if px > 0: 
     
    395391            x = self.rvs() 
    396392            px = self.pdf(x) 
    397             pV = px * s.ones(D) 
    398         return self.Jump(x, px, pV
     393            pJump = 1.0  # No jump involved 
     394        return self.Jump(x, px, pJump
    399395 
    400396 
     
    463459                "argument, not a '%s'" % type(paramContainer)) 
    464460        newVersion = ParameterContainer(paramNames=self.paramNames) 
    465         Lp =
     461        Lp, Lj = 0,
    466462        # Define a 1-D array holding the probability density for each jump, 
    467463        # joint across all parameters. Since the densities will be scaled by 
    468464        # the weight for each jump deviation, just start the array with the 
    469465        # weights. 
    470         pJumps = vWeights.copy() 
    471466        for name in self.paramNames: 
    472467            priorsFA = getattr(self, name) 
    473468            startValues = getattr(paramContainer, name) 
    474469            jumpsFA = priorsFA.call('rJump', startValues, vIndex) 
     470            setattr(newVersion, name, jumpsFA.x) 
    475471            Lp += s.log(jumpsFA.px).sum() 
    476             for k in xrange(len(vWeights)): 
    477                 pJumps[k] *= jumpsFA.pV.call('item', k).prod() 
    478             setattr(newVersion, name, jumpsFA.x) 
     472            Lj += s.log(jumpsFA.pJump).sum() 
    479473        newVersion.Lp = Lp 
    480         newVersion.Lj = s.log(pJumps.sum()) 
     474        newVersion.Lj = Lj 
    481475        return newVersion 
  • projects/AsynCluster/trunk/svpmc/pmc.py

    r205 r206  
    208208            vIndex = "+" 
    209209        XP = params.FlexArray(N) 
    210         print "|", 
    211210        for k in xrange(N): 
    212211            if new: 
  • projects/AsynCluster/trunk/svpmc/project.py

    r205 r206  
    126126            setattr(obs, "series_%02d" % (k,), title) 
    127127        # ...and construct a queue and model manager for a simulation 
    128         #self.queue = asynqueue.ThreadQueue(1) 
    129         self.queue = NullQueue() 
     128        self.queue = asynqueue.ThreadQueue(1) 
     129        #self.queue = NullQueue() 
    130130        self.mgr = model.ModelManager( 
    131131            self, tsData, wiggle=self.wiggle, Ni=self.Ni)