root/projects/AsynCluster/trunk/svpmc/pmc.py

Revision 207, 13.2 kB (checked in by edsuom, 6 months ago)

Got Rao-Blackwellized version working locally & remotely, with better but still disappointing degeneracy

Line 
1 # svpmc: Stochastic Volatility Inference via Population Monte Carlo
2 #
3 # Copyright (C) 2007-2008 by Edwin A. Suominen, http://www.eepatents.com
4 #
5 # This program is free software; you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free Software
7 # Foundation; either version 2 of the License, or (at your option) any later
8 # version.
9 #
10 # This program is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # this program; if not, write to the Free Software Foundation, Inc., 51
16 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17
18 """
19 Population Monte Carlo sampling
20 """
21
22 import sys
23 from random import sample as sampleWOR
24
25 import scipy as s
26 from scipy import random
27 from twisted.internet import defer
28
29 import params
30 from sample import Resampler
31
32
33 class Allocator(object):
34     """
35     Construct me with a population size I{N} and an integer number of
36     allocation bins I{D}.
37
38     @ivar W: A 1-D array containing the fraction of population members, i.e.,
39       the population weight, currently allocated to each bin.
40
41     @ivar R: A 1-D array containing the number of population members currently
42       allocated to each bin.
43
44     """
45     def __init__(self, N, D):
46         if N < 2*D:
47             raise ValueError(
48                 "Population size must be at least twice the number of bins")
49         self.N = N
50         self.D = D
51         self.rMin = max([2, int(round(0.01 * N))])
52         self.rMax = N - self.rMin*(self.D-1)
53         self.updateAllocations()
54         self.II = None
55    
56     def subsetIndex(self, k):
57         """
58         Returns a subset index for population members that correspond to the
59         bin indexed by I{k}.
60         """
61         I = sampleWOR(self.Is, self.R[k])
62         self.Is = s.setdiff1d(self.Is, I)
63         return I
64    
65     def assembler(self, resultList):
66         """
67         Call this method with a reference to an empty list that will hold
68         results assembled from an allocation that I'll be (re)doing. If you
69         want to re-use the same allocation as from a previous call to this
70         method, set the I{lastAllocation} keyword C{True}.
71
72         The method will allocate the population members into my I{D} bins,
73         unless we're re-using the last allocation. For each bin, the method
74         will yield the bin index, a 1-D array of indices for the subset of my
75         population allocated to that bin, and a fresh deferred already fired
76         with C{None}. You must add a callback to the deferred for each
77         iteration that returns a tuple of 1-D arrays, each being the same
78         length as the yielded index array.
79         
80         Each returned array for each subset will be assembled back into a
81         single population-size array and placed into the supplied list, in
82         order.
83         """
84         def gotResults(results, I):
85             if not isinstance(results, tuple):
86                 results = (results,)
87             if not resultList:
88                 for result in results:
89                     if isinstance(result, list) and len(result) == len(I):
90                         resultList.append(s.empty(self.N))
91                     elif isinstance(result, s.ndarray):
92                         resultList.append(s.empty(self.N))
93                     else:
94                         resultList.append(params.FlexArray(self.N))
95             for k, result in enumerate(results):
96                 resultList[k][I] = result
97        
98         if resultList != []:
99             raise ValueError(
100                 "You must supply an empty list to hold your assembled results")
101         if self.II is None:
102             lastAllocation = False
103             self.II = [None] * self.D
104         else:
105             lastAllocation = True
106         for k in s.flipud(s.argsort(self.R)):
107             if lastAllocation:
108                 I = self.II[k]
109             else:
110                 I = self.II[k] = self.subsetIndex(k)
111             d = defer.succeed(None)
112             yield k, I, d
113             d.addCallback(gotResults, I)
114
115     def updateAllocations(self, I=None):
116         """
117         Rescales my allocations based on the supplied 1-D array I{I} of
118         resampling indices. Allocations with lots of repeated indices were
119         highly successful and those with lots of missing indices were not.
120         
121         Highly successful allocations will have a large portion of the next
122         allocation even if they last operated on only a few members of the
123         population.
124
125         If no index array is supplied, the allocations will be equalized.
126         """
127         if I is None:
128             R = s.ones(self.D) / self.D
129         elif self.II is not None:
130             R = s.array(
131                 [sum([x in self.II[k] for x in I]) for k in xrange(self.D)])
132             R = R.astype(float) / R.sum()
133             self.II = None
134         else:
135             raise AttributeError(
136                 "You can only update with an index array after completion "+\
137                 "of an assembler run")
138
139         # Turn the scaled array into an array of subsample sizes, with a
140         # minimum size of two apiece.
141         R = s.clip(s.round_(self.N*R), self.rMin, self.rMax)
142         # Twiddle the biggest one as needed to keep sum = Number of members
143         R[s.argmax(R)] += self.N - sum(R)
144         # Replace the old allocation arrays and subset index
145         self.W = R / R.sum()
146         self.R = R.astype(int)
147         self.Is = s.arange(self.N)
148
149
150 class PMC(object):
151     """
152     Population Monte Carlo with per Cappe et al.
153
154     """
155     def __init__(self, projectManager, socket=None):
156         self.socket = socket
157         self.pm = projectManager
158         self.mm = projectManager.mgr
159         self.V = projectManager.V
160         self.queue = projectManager.queue
161         self.resampler = Resampler(logWeights=True)           
162         self.allocator = Allocator(projectManager.m, len(projectManager.V))
163
164     def _oops(self, failure, **kw):
165         text = "\nFailure"
166         if kw:
167             text += " with "
168             for name, value in kw.iteritems():
169                 text += "%s=%s" % (name, value)
170         print "%s\n%s" % (text, failure.getTraceback())
171    
172     def proposals(self, *args):
173         """
174         Call this method with a 1-D FlexArray of parameter containers and a
175         jump deviation, and it will generate proposals from the existing
176         parameters.
177
178         Call it with just an integer number of proposals, and it will generate
179         new ones directly from the priors.
180
181         Returns a 1-D FlexArray of parameter containers embodying the
182         proposals.
183         """
184         if len(args) == 2:
185             new = False
186             X, v = args
187             N = len(X)
188         else:
189             new = True
190             N = args[0]
191         XP = params.FlexArray(N)
192         for k in xrange(N):
193             if new:
194                 proposalPC = self.pm.priors.new()
195             else:
196                 proposalPC = self.pm.priors.proposal(X[k], v)
197             XP[k] = proposalPC
198         return XP
199    
200     def simulations(self, X):
201         """
202         Runs a log-volatility simulation for each parameter container supplied
203         in the 1-D FlexArray I{X}. The simulations are run via the model
204         manager, which constructs a 3-D array of the normal variates underlying
205         the entire set of simulations.
206
207         Returns a deferred that fires when all the log-volatility simulations
208         are done.
209         """
210         def simulationDone(success):
211             if success:
212                 print "+",
213             else:
214                 print ".",
215
216         dList = []
217         for k, Xk in enumerate(X):
218             d = self.mm.zSimulation(Xk, k)
219             d.addCallback(simulationDone)
220             d.addErrback(self._oops, k=k)
221             dList.append(d)
222         d = defer.DeferredList(dList)
223         d.addCallback(lambda _: self.mm.nextIteration())
224         d.addErrback(self._oops)
225         return d
226    
227     def weights(self, XP, v):
228         """
229         Returns a deferred that fires with a 1-D array of log importance
230         weights for the parameter proposals in the 1-D FlexArray I{XP}.
231         
232         The importance weights are to be combined with those from other calls
233         to this method with different variance settings. The weights will be
234         used to resample the supplied proposals so that the probability of any
235         given proposal being included in the final result for this iteration is
236         proportional to its likelihood and prior probability density, and
237         inversely proportional to the probability density of the proposal
238         offset.
239         """
240         def weight(paramContainer):
241             if s.isfinite(paramContainer.Lx):
242                 L = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj
243                 info = " ".join([
244                     "%+12.2f" % (getattr(paramContainer, x),)
245                     for x in ('Lx', 'Lp', 'Lj')])
246             else:
247                 L = -s.inf
248             if L < 0:
249                 print "%6.4f" % v
250             else:
251                 print "%6.4f\t%+12.2f\t%s" % (v, L, info)
252             return L
253        
254         # Evaluate the proposals (very time-consuming, done either locally in a
255         # thread or in the cluster)
256         dList = [self.mm.likelihood(XPk).addCallback(weight) for XPk in XP]
257         return defer.gatherResults(dList).addCallback(lambda W: s.array(W))
258    
259     @defer.deferredGenerator
260     def run(self, N_iter):
261         """
262         Does a PMC run with I{N_iter} iterations and the sequence of jump
263         deviations in my I{V} attribute. The portion of the population members
264         allocated to each jump variance will go up or down, depending on the
265         performance for that setting.
266
267         Returns a deferred that fires when the run is done. No output value is
268         provided via the deferred, however. Instead, everything is written to
269         my project manager's open NetCDF file.
270
271         @param N_iter: The number of iterations to produce after burn-in.
272
273         """
274         X = None
275         if self.socket:
276             wfd = defer.waitForDeferred(self.mm.setRemoteMode(self.socket))
277             yield wfd; wfd.getResult()
278         N_members = self.pm.m
279         print "\nRunning %d iterations with %d population members" \
280               % (N_iter, N_members)
281         # For each iteration...
282         for i in xrange(N_iter):
283             print "\n%03d\n%s" % (i+1, "-"*100)
284             if X is None:
285                 # No existing members, generate proposals directly from the
286                 # priors and simulate log-volatilities for them
287                 print "Generating %d proposals from priors " % N_members +\
288                       "and simulating log-volatilities for them"
289                 wfd = defer.waitForDeferred(
290                     self.queue.call(self.proposals, N_members))
291                 yield wfd
292                 XP = wfd.getResult()
293                 wfd = defer.waitForDeferred(self.simulations(XP))
294                 yield wfd
295                 wfd.getResult()
296             else:
297                 # Regular iteration, simulate log-volatilities for the last
298                 # iteration's members and use proposals generated from those
299                 # members
300                 print "\nSimulating log-volatilities for the previous "+\
301                       "population and generating %d proposals " % N_members +\
302                       "from the population:"
303                 resultList = []
304                 dList = [self.simulations(X)]
305                 for vIndex, I, d in self.allocator.assembler(resultList):
306                     d.addCallback(
307                         lambda _:
308                         self.queue.call(self.proposals, X[I], self.V[vIndex]))
309                     d.addErrback(self._oops, vIndex=vIndex)
310                     dList.append(d)
311                 # Record the previous iteration's members while the nodes work on
312                 # generating the proposals and simulations for the current iteration...
313                 if i > 0:
314                     self.pm.writeParams(X)
315                 wfd = defer.waitForDeferred(defer.DeferredList(dList))
316                 yield wfd
317                 wfd.getResult()
318                 XP = resultList[0]
319             print "\n"
320             # Evaluate the proposals
321             dList, resultList = [], []
322             for vIndex, I, d in self.allocator.assembler(resultList):
323                 d.addCallback(lambda _: self.weights(XP[I], self.V[vIndex]))
324                 d.addErrback(self._oops, vIndex=vIndex)
325                 dList.append(d)
326             wfd = defer.waitForDeferred(defer.DeferredList(dList))
327             yield wfd
328             wfd.getResult()
329             W = resultList[0]
330             I0 = s.isfinite(W).nonzero()[0]
331             if len(I0):
332                 # Resample everything together
333                 I1 = self.resampler(W[I0], N_members)
334                 X = XP[I0][I1]
335                 self.allocator.updateAllocations(I0[I1])
336             else:
337                 # No valid parameters, start over
338                 X = None
339                 self.allocator.updateAllocations()
340         wfd = defer.waitForDeferred(self.pm.done())
341         yield wfd
342         wfd.getResult()
343         print "\nPMC Run done"
344        
Note: See TracBrowser for help on using the browser.