root/projects/AsynCluster/trunk/svpmc/project.py

Revision 207, 11.9 kB (checked in by edsuom, 6 months ago)

Got Rao-Blackwellized version working locally & remotely, with better but still disappointing degeneracy

Line 
1 # svpmc: Stochastic Volatility Inference via Population Monte Carlo
2 #
3 # Copyright (C) 2008 by Edwin A. Suominen, http://www.eepatents.com
4 #
5 # This program is free software; you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free Software
7 # Foundation; either version 2 of the License, or (at your option) any later
8 # version.
9 #
10 # This program is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # this program; if not, write to the Free Software Foundation, Inc., 51
16 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17
18 """
19 Management of an overall SVpmc run.
20 """
21
22 import os.path, re
23 import scipy as s
24 from Scientific.IO import NetCDF
25 import asynqueue
26
27 import tseries, params, model
28
29
30 class NullQueue(object):
31     """
32     I act like an AsynQueue except that I run everything in the main
33     thread. I'm mostly useful for debugging.
34     """
35     def __init__(self, *args):
36         from twisted.internet import defer
37         self.defer = defer
38
39     def call(self, f, *args):
40         result = f(*args)
41         if isinstance(result, self.defer.Deferred):
42             return result
43         return self.defer.succeed(result)
44
45     def shutdown(self):
46         return self.defer.succeed(None)
47
48
49 class ProjectManager(object):
50     """
51     Instantiate me with the path of a file within a project directory that
52     defines a project specification and the name of a NetCDF file to be created
53     in the project directory. If no NetCDF file is specified, one will be
54     created with the name 'svpmc.nc'.
55
56     You can specify the PMC population size with the keyword I{m}. It defaults
57     to 10,000.
58
59     @ivar n: The number of samples.
60     
61     @ivar p: The number of time series.
62
63     @ivar tables: A dict whose entries contain rows of tables parsed from my
64       project specification.
65
66     @ivar xcorrs: The number of cross-correlations between series, computed as
67       C{(p**2 + p)/2}
68
69     @ivar m: The number of population members in each population monte carlo
70       iteration.
71
72     @ivar paramNames: A list of the names of arrays in the parameter containers
73       used in the project for both primary and secondary parameters.
74
75     @ivar D: The number of jump deviations used by the D-kernel PMC
76       inference engine that will be running the project.
77
78     @ivar Df: The value of a given jump deviation as a fraction of the previous
79       one (the first value is 1.0)
80
81     @ivar priors: An instance of L{params.PriorContainer} constructed in
82       accordance with my project specification.
83     
84     """
85     re_dashes = re.compile("^\-{10,}$")
86     re_xy = re.compile("([0-9]+),([0-9]+)")
87     dimensions = {
88         'n':        'sample',
89         'p':        'series',               
90         'xcorrs':   'cross_correlation',
91         'm':        'member',   
92         }
93     def __init__(self, specFile, ncFile="svpmc.nc", m=1000, readOnly=False):
94         self.m = m
95         self.iteration = 0
96         specDir = os.path.dirname(specFile)
97         self.tables = self._parseSpec(specFile)
98         # Set some simulation parameters
99         for name, value, null in self.tables['variable']:
100             if "." in value:
101                 value = float(value)
102             else:
103                 value = int(value)
104             setattr(self, name, value)
105         V = [self.Ds]
106         for k in xrange(self.D-1):
107             V.append(self.Df * V[-1])
108         self.V = s.array(V)
109         # Observations
110         tsData, seriesTitles = self._setupTimeSeries(specDir)
111         self.p, self.n = tsData.shape
112         # Parameters
113         paramTitles, dimensions = self._setupParams()
114         # Open the NetCDF file
115         self.cdf = NetCDF.NetCDFFile(
116             os.path.join(specDir, ncFile), ('w', 'r')[int(readOnly)])
117         # If in read-only mode, we're done now
118         if readOnly:
119             return
120         # Write mode, write some initial stuff to the NetCDF file...
121         self._setupCDF(tsData, seriesTitles, paramTitles, dimensions)
122         for k, title in enumerate(seriesTitles):
123             obs = self.cdf.variables['observations']
124             obs[k,:] = tsData[k,:].astype('f')
125             setattr(obs, "series_%02d" % (k,), title)
126         # ...and construct a queue and model manager for a simulation
127         self.queue = asynqueue.ThreadQueue(1)
128         #self.queue = NullQueue()
129         self.mgr = model.ModelManager(
130             self, tsData, wiggle=self.wiggle, Ni=self.Ni)
131    
132     def _parseSpec(self, filePath):
133         tables = {}
134         Nf = None
135         linePrev = ""
136         fh = open(filePath)
137         for line in fh:
138             line = line.strip()
139             if line.startswith('#'):
140                 continue
141             if not line:
142                 Nf = None
143             elif Nf:
144                 fieldList = line.split()
145                 fieldList += [''] * (Nf - len(fieldList))
146                 fieldList[Nf-1] = " ".join(fieldList[Nf-1:])
147                 rows.append(fieldList[:Nf])
148             elif self.re_dashes.match(line) and linePrev:
149                 fields = linePrev.split()
150                 Nf = len(fields)
151                 rows = tables[fields[0].lower()] = []
152             else:
153                 linePrev = line
154         fh.close()
155         return tables
156    
157     def _setupTimeSeries(self, fileDir):
158         """
159         Given the specification defined in my I{tables} dict, loads a list of
160         L{tseries.TimeSeries} objects from '.dat' files in the specified
161         directory.
162
163         Returns a 2-D array of the time series data, samples across columns and
164         time series across rows, along with a list of the time series titles.
165         """
166         prevObj = None
167         tsList, titles = [], []
168         for series, transform, title in self.tables['series']:
169             filePath = os.path.join(fileDir, series) + ".dat"
170             tsObject = tseries.TimeSeries(title, filePath)
171             for methodName in transform.split(','):
172                 getattr(tsObject, methodName)()
173             if prevObj:
174                 tsObject = tsObject.intersect(prevObj)
175             tsList.append(tsObject)
176             titles.append(title)
177             prevObj = tsObject
178         return s.row_stack([ts() for ts in tsList]), titles
179
180     def _setupParams(self):
181         """
182         Given the specification defined in my I{tables} dict, constructs an
183         instance of L{params.PriorContainer} with L{params.FlexArray} objects
184         loaded with a L{params.Prior} object for each array element of each of
185         my parameters.
186
187         Sets my I{priorContainer} attribute to the container and returns a list
188         of the parameter titles and lists of dimensions compatible with my
189         NetCDF specification.
190         """
191         def parseDims(dims):
192             shape = []
193             dimSequence = []
194             for dim in dims.split(','):
195                 if dim not in self.dimensions:
196                     raise ValueError(
197                         "Invalid dimension specification '%s'" % dims)
198                 shape.append(getattr(self, dim))
199                 dimSequence.append(self.dimensions[dim])
200             dimensions.append(dimSequence)
201             return tuple(shape)
202
203         def constructPriors(priorSpec, shape):
204             index = priorSpec[0]
205             kw = {'dname':priorSpec[1], 'V':self.V}
206             for k, name in enumerate(('loc', 'scale', 'a', 'b')):
207                 stringValue = priorSpec[k+2]
208                 if stringValue != '':
209                     kw[name] = float(stringValue)
210             for j in xrange(shape[0]):
211                 if len(shape) == 1:
212                     if index.isdigit() and j != int(index):
213                         continue
214                     pa[j] = params.Prior(**kw)
215                 else:
216                     for k in xrange(shape[1]):
217                         if index == "I" and j != k:
218                             continue
219                         if index != ":":
220                             match = self.re_xy.match(index)
221                             if match and match.group(0) != "%d,%d" % (j,k):
222                                 continue
223                         pa[j,k] = params.Prior(**kw)
224        
225         self.primaryParamNames = []
226         titles, dimensions = [], []
227         self.paramInfo = {}
228         xcorrs = self.xcorrs = int(0.5*(self.p**2 + self.p))
229         self.priors = params.PriorContainer(self.p, self.n)
230         for name, dims, title in self.tables['parameter']:
231             shape = parseDims(dims)
232             self.paramInfo[name] = shape, title
233             pa = self.priors.priorArray(name, *shape)
234             for priorSpec in self.tables[name]:
235                 constructPriors(priorSpec, shape)
236             titles.append(title)
237             self.primaryParamNames.append(name)
238         self.priors.primaryParamNames = self.primaryParamNames
239         self.derivedParamNames = []
240         for name, sourceParams, description in self.tables['derivation']:
241             self.derivedParamNames.append(name)
242         self.priors.derivedParamNames = self.derivedParamNames
243         self.paramNames = self.primaryParamNames + self.derivedParamNames
244         return titles, dimensions
245
246     def _setupCDF(self, tsData, sTitles, pTitles, pDims):
247         """
248         """
249         cdf = self.cdf
250         cdf.title = self.tables['project'][0][0]
251         # Dimensions
252         for attrName, name in self.dimensions.iteritems():
253             cdf.createDimension(name, getattr(self, attrName))
254         cdf.createDimension("iteration", None)
255         # Time Series
256         obs = cdf.createVariable(
257             "observations", 'f', ('series', 'sample'))
258         obs.long_name = "Observations, %d time series " % self.p +\
259                         "with %d samples each" % self.n
260         # Parameters
261         for k, name in enumerate(self.primaryParamNames):
262             pDim = ['iteration', 'member'] + pDims[k]
263             param = cdf.createVariable(name, 'f', tuple(pDim))
264             param.long_name = pTitles[k]
265         # Log-Likelihood, log-density of priors and jumps
266         L = cdf.createVariable("Lx", 'f', ('iteration', 'member'))
267         L.long_name = "Log-Likelihood of Observations Given Parameters"
268         L = cdf.createVariable("Lp", 'f', ('iteration', 'member'))
269         L.long_name = "Log-Density of Parameters from Prior Distributions"
270         L = cdf.createVariable("Lj", 'f', ('iteration', 'member'))
271         L.long_name = "Log-Density of Proposal Jumps"
272         # Done setting up, save what we've got thus far
273         cdf.sync()
274
275     def writeParams(self, parameters):
276         """
277         Write the supplied population of I{parameters}, housed in a
278         L{param.FlexArray} instance containing L{param.ParameterContainer}
279         objects, as the latest iteration of the open CDF record.
280         """
281         if len(parameters) != self.m:
282             raise ValueError(
283                 "Each iteration record must have %d parameters" % self.m)
284         # Write the parameters to their variables
285         for name in self.primaryParamNames:
286             var = self.cdf.variables[name]
287             # Iterate over a 1-D FlexArray of the population's parameter arrays
288             for member, paramArray in enumerate(getattr(parameters, name)):
289                 var[self.iteration, member, :] = paramArray.astype('f')
290         # Write the log-volatility, prior and jump log-density values for each
291         # population member
292         for name in ('Lx', 'Lp', 'Lj'):
293             var = self.cdf.variables[name]
294             var[self.iteration, :] = getattr(parameters, name).astype('f')
295         # Done writing this iteration of the CDF record
296         self.cdf.sync()
297         self.iteration += 1
298
299     def done(self):
300         """
301         Call this when the project is done.
302         """
303         if hasattr(self, 'mgr'):
304             d = self.mgr.shutdown()
305             d.addCallback(lambda _: self.queue.shutdown())
306             d.addCallback(lambda _: self.cdf.close())
307             return d
308         self.cdf.close()
Note: See TracBrowser for help on using the browser.