root/projects/AsynCluster/trunk/svpmc/model.py

Revision 207, 14.0 kB (checked in by edsuom, 6 months ago)

Got Rao-Blackwellized version working locally & remotely, with better but still disappointing degeneracy

Line 
1 # svPMC: Stochastic Volatility Inference via Population Monte Carlo
2 #
3 # Copyright (C) 2007-2008 by Edwin A. Suominen, http://www.eepatents.com
4 #
5 # This program is free software; you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free Software
7 # Foundation; either version 2 of the License, or (at your option) any later
8 # version.
9 #
10 # This program is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # this program; if not, write to the Free Software Foundation, Inc., 51
16 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17
18 """
19 The stochastic volatility model and its manager
20 """
21
22 from copy import copy
23
24 import scipy as s
25 from scipy import linalg, stats
26
27 from twisted.internet import defer
28 from twisted.spread import pb
29
30 from asyncluster.master import jobs
31 from twisted_goodies.pybywire.params import Parameterized
32 from twisted_goodies.pybywire import pack
33
34 import weave
35
36
37 INT_SCALE = 2000
38
39
40 class ZFetcher(pb.Referenceable):
41     """
42     I provide a node worker with access to the normal variates underlying the
43     latest log-volatility simulations.
44
45     @ivar z: The full-precision 3-D array of normal variates underlying the
46       current set of valid log-volatility simulations.
47
48     @ivar zPackedChunks: A list of strings containing a chunked, packed version
49       of z.
50     
51     """
52     chunkSize = 60000
53    
54     def __init__(self, m, p, n):
55         self.m = m
56         self._z = s.empty((m, p, n))
57         self.restart()
58
59     def _get_z(self):
60         return self._z[self.successes,:,:]
61     z = property(_get_z)
62    
63     def restart(self):
64         self.failures = []
65         self.successes = []
66         self.zPackedChunks = None
67    
68     def update(self, member, z=None):
69         """
70         Call to update me with the results of a log-volatility simulation, with
71         integer arguments specifying the I{member} of the population for the
72         latest iteration of the overall PMC simulation.
73
74         If the log-volatility simulation succeeded, also supply the 2-D array
75         of its normal variates. Otherwise, the population member will be marked
76         as a failure.
77
78         Returns C{True} to indicate when this is the last update, C{False}
79         until then.
80         """
81         if not isinstance(member, int) or member < 0 or member >= self.m:
82             raise ValueError(
83                 "Invalid member ID '%s', must be integer between 0 and %d" \
84                 % (member, self.m-1))
85         if z is None:
86             self.failures.append(member)
87         else:
88             self.successes.append(member)
89             self._z[member,:,:] = z
90         if len(self.failures) + len(self.successes) == self.m:
91             zInt = (INT_SCALE * self.z).astype('h')
92             zPacked = pack.Packer(zInt)()
93             k = 0
94             N = len(zPacked)
95             self.zPackedChunks = []
96             while k < N:
97                 chunk = zPacked[k:k+self.chunkSize]
98                 self.zPackedChunks.append(chunk)
99                 k += self.chunkSize
100             return True
101         return False
102    
103
104 class ModelManager(object):
105     """
106     I manage a L{Model} object and a collection of L{priors.Prior} objects.
107
108     I handle calls to the model object in either local or remote mode. Calls in
109     local mode are run through my project manager's thread queue.
110
111     Instantiate me with the text of a specification for the project and model
112     I'm going to manage.
113
114     """
115     remoteMode = False
116     timeout = 80
117
118     nodecode = """
119     ###########################################################################
120     from twisted_goodies.pybywire import pack
121     
122     MODEL = None
123     CHUNKS = []
124
125     def newModel(modelObj):
126         global MODEL
127         MODEL = modelObj
128         MODEL.z = None
129
130     def newZ(zPacked, k, N):
131         global CHUNKS
132         if k == 0:
133             CHUNKS = []
134             MODEL.z = None
135         CHUNKS.append(zPacked)
136         if k == N-1 and len(CHUNKS) == N:
137             zInt = pack.Unpacker(''.join(CHUNKS))()
138             MODEL.z = %f * zInt.astype('d')
139     
140     def hybridGibbs(paramContainer):
141         if MODEL is None:
142             raise RuntimeError('No model available')
143         return pack.Packer(MODEL.hybridGibbs(paramContainer))()
144     
145     def likelihood(paramContainer):
146         if MODEL is None:
147             raise RuntimeError('No model available')
148         if MODEL.z is None:
149             raise RuntimeError(
150                 'Model not initialized with simulated log-volatility data')
151         return MODEL.likelihood(paramContainer)
152
153     ###########################################################################
154     """ % (1.0/INT_SCALE,)
155    
156     def __init__(self, projectManager, y, **kw):
157         kw['y'] = y
158         kw['paramNames'] = projectManager.paramNames
159         self.zFetcher = ZFetcher(
160             projectManager.m, projectManager.p, projectManager.n)
161         self.modelObj = Model(**kw)
162         self.queue = projectManager.queue
163
164     def _oops(self, failure):
165         print "FAILURE:\n%s" % failure.getTraceback()
166
167     def shutdown(self):
168         """
169         Call this when I'm done being used. Returns a deferred that fires when
170         everything's shut down.
171         """
172         if hasattr(self, 'client'):
173             d = self.client.shutdown()
174         else:
175             d = defer.succeed(None)
176         return d
177
178     def setLocalMode(self):
179         """
180         Sets me to run the models locally in the current interpreter.
181         """
182         self.remoteMode = False
183    
184     def setRemoteMode(self, socket):
185         """
186         Sets me to run the named model remotely on the asyncluster nodes via
187         the supplied UNIX domain I{socket}. Returns a deferred that fires when
188         remotely-run operations can commence.
189         """
190         if hasattr(self, 'client'):
191             return defer.succeed(None)
192         self.client = jobs.JobClient(socket, codeString=self.nodecode)
193         d = self.client.startup()
194         d.addCallback(
195             lambda _: self.client.registerClasses(*Parameterized.registry))
196         d.addCallback(lambda _: self.client.update('newModel', self.modelObj))
197         d.addCallback(lambda _: setattr(self, 'remoteMode', True))
198         d.addErrback(self._oops)
199         return d
200
201     @defer.deferredGenerator
202     def nextIteration(self, remote=False, local=False):
203         """
204         """
205         self.modelObj.z = self.zFetcher.z
206         if (remote or self.remoteMode) and not local:
207             chunks = self.zFetcher.zPackedChunks
208             N = len(chunks)
209             for k, chunk in enumerate(chunks):
210                 wfd = defer.waitForDeferred(
211                     self.client.update('newZ', chunk, k, N, ephemeral=True))
212                 yield wfd
213                 wfd.getResult()
214         self.zFetcher.restart()
215    
216     def zSimulation(self, paramContainer, member, remote=False, local=False):
217         """
218         Runs a new log-volatily simulation for the I{paramContainer}, saving
219         its result in my I{z} array and updating my instance of L{ZFetcher}
220         with the array when it has been completely updated.
221
222         Updates the container in place with the last simulated log-volatility
223         value.
224         
225         Returns a deferred that fires when the simulation is done, with C{True}
226         if it succeeded or C{False} if not.
227         """
228         def update(z=None):
229             simulationsDone = self.zFetcher.update(member, z)
230
231         def unpackResult(packedResult):
232             if packedResult:
233                 return pack.Unpacker(packedResult)()
234        
235         def simulationDone(result):
236             if result is None:
237                 update()
238                 return False
239             update(result)
240             return True
241        
242         if (remote or self.remoteMode) and not local:
243             d = self.client.run(
244                 'hybridGibbs', paramContainer, timeout=self.timeout)
245             d.addCallback(unpackResult)
246         else:
247             d = self.queue.call(
248                 self.modelObj.hybridGibbs, paramContainer)
249         return d.addCallbacks(simulationDone, self._oops)
250    
251     def likelihood(self, paramContainer, remote=False, local=False):
252         """
253         Computes the log-likelihood of the parameters in the supplied
254         I{paramContainer} for my model.
255
256         Updates the container in-place with the log-likelihood.
257
258         Returns a deferred that fires with a reference to the paramContainer
259         when the likelihood computation is done and it has been updated.
260         """
261         def gotLikelihood(result):
262             if result is None:
263                 # An error from the remote likelihood method call is treated as
264                 # infinitely low likelihood
265                 result = -s.inf
266             paramContainer.Lx = result
267             return paramContainer
268
269         # DEBUG
270         if (remote or self.remoteMode) and not local:
271             d = self.client.run(
272                 'likelihood', paramContainer, timeout=self.timeout)
273         else:
274             d = self.queue.call(
275                 self.modelObj.likelihood, paramContainer)
276         return d.addCallbacks(gotLikelihood, self._oops)
277
278
279 class VAR(weave.Weaver):
280     """
281     Construct me with a 2-D array of time series values, one time series per
282     row.
283     
284     Then you can call me with a C{p x 1} drift vector I{a} and a C{p x p}
285     VAR(1) cross-correlation matrix I{b}, to get a C{p x n} vector of residuals
286     from my observation data after application of the VAR(1) model::
287     
288         y[:,t] = a + b*y[:,t-1]
289
290     @ivar y: Observations from I{n} intersecting dates of I{p} time series,
291       C{p,n} array.
292     
293     """
294     def __init__(self, y):
295         self.y = y
296    
297     def reverse(self, a, b):
298         v = s.empty_like(self.y)
299         self.inline('v', 'y', 'a', 'b')
300         return v
301
302     def forward(self, v, a, b):
303         y = s.empty_like(v)
304         self.inline('y', 'v', 'a', 'b')
305         return y
306
307
308 class Model(Parameterized, weave.Weaver):
309     """
310     I am a sophisticated econometric model wherein the residuals of a
311     linear-drift, VAR(1) process follow a multivariate stochastic volatility
312     process with correlated volatility (v) and return (w) shocks.
313
314     @ivar y: A 2-D array containing my observation data.
315
316     @ivar wiggle: The +/- range of the random-walk uniform proposal on z for
317       each hybrid Gibbs proposal. Use +/-1.0 for balance between fast
318       convergence and reasonable accept-reject efficiency in z proposal
319       generator.
320
321     @ivar Ni: Number of hybrid Gibbs iterations per log-volatility simulation.
322     
323     """
324     keyAttrs = {'y':None, 'wiggle':1.0, 'Ni':50}
325
326     #--- Properties -----------------------------------------------------------
327
328     def _get_p(self):
329         return self.y.shape[0]
330     p = property(_get_p)
331    
332     def _get_n(self):
333         return self.y.shape[1]
334     n = property(_get_n)
335
336     def _get_var(self):
337         if 'var' not in self.cache:
338             self.cache['var'] = VAR(self.y)
339         return self.cache['var']
340     var = property(_get_var)
341
342
343     #--- Support --------------------------------------------------------------
344
345     def setParams(self, paramContainer):
346         """
347         Sets up array variables for an inline call made by the caller,
348         including innovations as residuals of my observations run through the
349         reversed VAR(1).
350         """
351         self.setParamSequence(paramContainer.paramSequence())
352         self.x = self.var.reverse(self.a, self.b)
353
354    
355     #--- The API --------------------------------------------------------------
356
357     def hybridGibbs(self, paramContainer):
358         """
359         Call this method with a parameter object that defines a set of values
360         for my parameters.
361
362         A new 2-D array of log-volatilities will be drawn from a hybrid Gibbs
363         simulation, with each Gibbs step driven by a short importance-sampling
364         simulation of the IID variates, followed by cross-correlation to form
365         volatility shocks, and VAR(1) to form the log-volatilities. The
366         importance-sampling proposals are scaled based on the supplied
367         fractional I{sigma}. The simulation is governed by the log-likelihood
368         of my observation data given the supplied parameters and the simulated
369         log-volatilities.
370
371         If a L{linalg.LinAlgError} is raised due to an invalid correlation
372         matrix parameter, the method returns with no result. Otherwise, it
373         returns the IID normal variates underlying the simulated
374         log-volatilities after the last simulation round.
375
376         """
377         self.setParams(paramContainer)
378         # Create the normal variate array with a standard starting point for
379         # every log-volatility simulation
380         z = s.zeros_like(self.x)
381         self.inline(
382             'z', 'x', 'w', 'ni',
383             'd', 'e', 'g', 'rz', 'ri', 'sc',
384             w=self.wiggle, ni=self.Ni)
385         return z
386
387     def likelihood(self, paramContainer):
388         """
389         Call this method with a parameter object that defines a set of values
390         for my parameters.
391
392         If a L{linalg.LinAlgError} is raised due to an invalid correlation
393         matrix parameter, the method returns with no result. Otherwise, it
394         returns the log-likelihood of my observations given the parameters,
395         from an integration of the joint probability of the observations and
396         log-volatilities simulated from all the other parameters in the current
397         PMC population.
398         
399         The returned likelihood does not consider the prior probability of the
400         parameters. You have to account for that yourself, preferably before
401         calling this method. If the prior probability of any parameter is zero,
402         the posterior density for that parameter will also be zero no matter
403         what, making any call to this method with it a waste of computing time.
404         """
405         self.setParams(paramContainer)
406         return self.inline('z', 'x', 'd', 'e', 'g', 'rz', 'ri', 'sc')
407    
408                      
Note: See TracBrowser for help on using the browser.