| 1 |
# svpmc: Stochastic Volatility Inference via Population Monte Carlo |
|---|
| 2 |
# |
|---|
| 3 |
# Copyright (C) 2007-2008 by Edwin A. Suominen, http://www.eepatents.com |
|---|
| 4 |
# |
|---|
| 5 |
# This program is free software; you can redistribute it and/or modify it under |
|---|
| 6 |
# the terms of the GNU General Public License as published by the Free Software |
|---|
| 7 |
# Foundation; either version 2 of the License, or (at your option) any later |
|---|
| 8 |
# version. |
|---|
| 9 |
# |
|---|
| 10 |
# This program is distributed in the hope that it will be useful, but WITHOUT |
|---|
| 11 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 12 |
# FOR A PARTICULAR PURPOSE. See the file COPYING for more details. |
|---|
| 13 |
# |
|---|
| 14 |
# You should have received a copy of the GNU General Public License along with |
|---|
| 15 |
# this program; if not, write to the Free Software Foundation, Inc., 51 |
|---|
| 16 |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA |
|---|
| 17 |
|
|---|
| 18 |
""" |
|---|
| 19 |
Population Monte Carlo sampling |
|---|
| 20 |
""" |
|---|
| 21 |
|
|---|
| 22 |
import sys |
|---|
| 23 |
from random import sample as sampleWOR |
|---|
| 24 |
|
|---|
| 25 |
import scipy as s |
|---|
| 26 |
from scipy import random |
|---|
| 27 |
from twisted.internet import defer |
|---|
| 28 |
|
|---|
| 29 |
import params |
|---|
| 30 |
from sample import Resampler |
|---|
| 31 |
|
|---|
| 32 |
|
|---|
| 33 |
class Allocator(object): |
|---|
| 34 |
""" |
|---|
| 35 |
Construct me with a population size I{N} and an integer number of |
|---|
| 36 |
allocation bins I{D}. |
|---|
| 37 |
|
|---|
| 38 |
@ivar W: A 1-D array containing the fraction of population members, i.e., |
|---|
| 39 |
the population weight, currently allocated to each bin. |
|---|
| 40 |
|
|---|
| 41 |
@ivar R: A 1-D array containing the number of population members currently |
|---|
| 42 |
allocated to each bin. |
|---|
| 43 |
|
|---|
| 44 |
""" |
|---|
| 45 |
def __init__(self, N, D): |
|---|
| 46 |
if N < 2*D: |
|---|
| 47 |
raise ValueError( |
|---|
| 48 |
"Population size must be at least twice the number of bins") |
|---|
| 49 |
self.N = N |
|---|
| 50 |
self.D = D |
|---|
| 51 |
self.rMin = max([2, int(round(0.01 * N))]) |
|---|
| 52 |
self.rMax = N - self.rMin*(self.D-1) |
|---|
| 53 |
self.updateAllocations() |
|---|
| 54 |
self.II = None |
|---|
| 55 |
|
|---|
| 56 |
def subsetIndex(self, k): |
|---|
| 57 |
""" |
|---|
| 58 |
Returns a subset index for population members that correspond to the |
|---|
| 59 |
bin indexed by I{k}. |
|---|
| 60 |
""" |
|---|
| 61 |
I = sampleWOR(self.Is, self.R[k]) |
|---|
| 62 |
self.Is = s.setdiff1d(self.Is, I) |
|---|
| 63 |
return I |
|---|
| 64 |
|
|---|
| 65 |
def assembler(self, resultList): |
|---|
| 66 |
""" |
|---|
| 67 |
Call this method with a reference to an empty list that will hold |
|---|
| 68 |
results assembled from an allocation that I'll be (re)doing. If you |
|---|
| 69 |
want to re-use the same allocation as from a previous call to this |
|---|
| 70 |
method, set the I{lastAllocation} keyword C{True}. |
|---|
| 71 |
|
|---|
| 72 |
The method will allocate the population members into my I{D} bins, |
|---|
| 73 |
unless we're re-using the last allocation. For each bin, the method |
|---|
| 74 |
will yield the bin index, a 1-D array of indices for the subset of my |
|---|
| 75 |
population allocated to that bin, and a fresh deferred already fired |
|---|
| 76 |
with C{None}. You must add a callback to the deferred for each |
|---|
| 77 |
iteration that returns a tuple of 1-D arrays, each being the same |
|---|
| 78 |
length as the yielded index array. |
|---|
| 79 |
|
|---|
| 80 |
Each returned array for each subset will be assembled back into a |
|---|
| 81 |
single population-size array and placed into the supplied list, in |
|---|
| 82 |
order. |
|---|
| 83 |
""" |
|---|
| 84 |
def gotResults(results, I): |
|---|
| 85 |
if not isinstance(results, tuple): |
|---|
| 86 |
results = (results,) |
|---|
| 87 |
if not resultList: |
|---|
| 88 |
for result in results: |
|---|
| 89 |
if isinstance(result, list) and len(result) == len(I): |
|---|
| 90 |
resultList.append(s.empty(self.N)) |
|---|
| 91 |
elif isinstance(result, s.ndarray): |
|---|
| 92 |
resultList.append(s.empty(self.N)) |
|---|
| 93 |
else: |
|---|
| 94 |
resultList.append(params.FlexArray(self.N)) |
|---|
| 95 |
for k, result in enumerate(results): |
|---|
| 96 |
resultList[k][I] = result |
|---|
| 97 |
|
|---|
| 98 |
if resultList != []: |
|---|
| 99 |
raise ValueError( |
|---|
| 100 |
"You must supply an empty list to hold your assembled results") |
|---|
| 101 |
if self.II is None: |
|---|
| 102 |
lastAllocation = False |
|---|
| 103 |
self.II = [None] * self.D |
|---|
| 104 |
else: |
|---|
| 105 |
lastAllocation = True |
|---|
| 106 |
for k in s.flipud(s.argsort(self.R)): |
|---|
| 107 |
if lastAllocation: |
|---|
| 108 |
I = self.II[k] |
|---|
| 109 |
else: |
|---|
| 110 |
I = self.II[k] = self.subsetIndex(k) |
|---|
| 111 |
d = defer.succeed(None) |
|---|
| 112 |
yield k, I, d |
|---|
| 113 |
d.addCallback(gotResults, I) |
|---|
| 114 |
|
|---|
| 115 |
def updateAllocations(self, I=None): |
|---|
| 116 |
""" |
|---|
| 117 |
Rescales my allocations based on the supplied 1-D array I{I} of |
|---|
| 118 |
resampling indices. Allocations with lots of repeated indices were |
|---|
| 119 |
highly successful and those with lots of missing indices were not. |
|---|
| 120 |
|
|---|
| 121 |
Highly successful allocations will have a large portion of the next |
|---|
| 122 |
allocation even if they last operated on only a few members of the |
|---|
| 123 |
population. |
|---|
| 124 |
|
|---|
| 125 |
If no index array is supplied, the allocations will be equalized. |
|---|
| 126 |
""" |
|---|
| 127 |
if I is None: |
|---|
| 128 |
R = s.ones(self.D) / self.D |
|---|
| 129 |
elif self.II is not None: |
|---|
| 130 |
R = s.array( |
|---|
| 131 |
[sum([x in self.II[k] for x in I]) for k in xrange(self.D)]) |
|---|
| 132 |
R = R.astype(float) / R.sum() |
|---|
| 133 |
self.II = None |
|---|
| 134 |
else: |
|---|
| 135 |
raise AttributeError( |
|---|
| 136 |
"You can only update with an index array after completion "+\ |
|---|
| 137 |
"of an assembler run") |
|---|
| 138 |
|
|---|
| 139 |
# Turn the scaled array into an array of subsample sizes, with a |
|---|
| 140 |
# minimum size of two apiece. |
|---|
| 141 |
R = s.clip(s.round_(self.N*R), self.rMin, self.rMax) |
|---|
| 142 |
# Twiddle the biggest one as needed to keep sum = Number of members |
|---|
| 143 |
R[s.argmax(R)] += self.N - sum(R) |
|---|
| 144 |
# Replace the old allocation arrays and subset index |
|---|
| 145 |
self.W = R / R.sum() |
|---|
| 146 |
self.R = R.astype(int) |
|---|
| 147 |
self.Is = s.arange(self.N) |
|---|
| 148 |
|
|---|
| 149 |
|
|---|
| 150 |
class PMC(object): |
|---|
| 151 |
""" |
|---|
| 152 |
Population Monte Carlo with per Cappe et al. |
|---|
| 153 |
|
|---|
| 154 |
""" |
|---|
| 155 |
def __init__(self, projectManager, socket=None): |
|---|
| 156 |
self.socket = socket |
|---|
| 157 |
self.pm = projectManager |
|---|
| 158 |
self.mm = projectManager.mgr |
|---|
| 159 |
self.V = projectManager.V |
|---|
| 160 |
self.queue = projectManager.queue |
|---|
| 161 |
self.resampler = Resampler(logWeights=True) |
|---|
| 162 |
self.allocator = Allocator(projectManager.m, len(projectManager.V)) |
|---|
| 163 |
|
|---|
| 164 |
def _oops(self, failure, **kw): |
|---|
| 165 |
text = "\nFailure" |
|---|
| 166 |
if kw: |
|---|
| 167 |
text += " with " |
|---|
| 168 |
for name, value in kw.iteritems(): |
|---|
| 169 |
text += "%s=%s" % (name, value) |
|---|
| 170 |
print "%s\n%s" % (text, failure.getTraceback()) |
|---|
| 171 |
|
|---|
| 172 |
def proposals(self, *args): |
|---|
| 173 |
""" |
|---|
| 174 |
Call this method with a 1-D FlexArray of parameter containers and a |
|---|
| 175 |
jump deviation, and it will generate proposals from the existing |
|---|
| 176 |
parameters. |
|---|
| 177 |
|
|---|
| 178 |
Call it with just an integer number of proposals, and it will generate |
|---|
| 179 |
new ones directly from the priors. |
|---|
| 180 |
|
|---|
| 181 |
Returns a 1-D FlexArray of parameter containers embodying the |
|---|
| 182 |
proposals. |
|---|
| 183 |
""" |
|---|
| 184 |
if len(args) == 2: |
|---|
| 185 |
new = False |
|---|
| 186 |
X, v = args |
|---|
| 187 |
N = len(X) |
|---|
| 188 |
else: |
|---|
| 189 |
new = True |
|---|
| 190 |
N = args[0] |
|---|
| 191 |
XP = params.FlexArray(N) |
|---|
| 192 |
for k in xrange(N): |
|---|
| 193 |
if new: |
|---|
| 194 |
proposalPC = self.pm.priors.new() |
|---|
| 195 |
else: |
|---|
| 196 |
proposalPC = self.pm.priors.proposal(X[k], v) |
|---|
| 197 |
XP[k] = proposalPC |
|---|
| 198 |
return XP |
|---|
| 199 |
|
|---|
| 200 |
def simulations(self, X): |
|---|
| 201 |
""" |
|---|
| 202 |
Runs a log-volatility simulation for each parameter container supplied |
|---|
| 203 |
in the 1-D FlexArray I{X}. The simulations are run via the model |
|---|
| 204 |
manager, which constructs a 3-D array of the normal variates underlying |
|---|
| 205 |
the entire set of simulations. |
|---|
| 206 |
|
|---|
| 207 |
Returns a deferred that fires when all the log-volatility simulations |
|---|
| 208 |
are done. |
|---|
| 209 |
""" |
|---|
| 210 |
def simulationDone(success): |
|---|
| 211 |
if success: |
|---|
| 212 |
print "+", |
|---|
| 213 |
else: |
|---|
| 214 |
print ".", |
|---|
| 215 |
|
|---|
| 216 |
dList = [] |
|---|
| 217 |
for k, Xk in enumerate(X): |
|---|
| 218 |
d = self.mm.zSimulation(Xk, k) |
|---|
| 219 |
d.addCallback(simulationDone) |
|---|
| 220 |
d.addErrback(self._oops, k=k) |
|---|
| 221 |
dList.append(d) |
|---|
| 222 |
d = defer.DeferredList(dList) |
|---|
| 223 |
d.addCallback(lambda _: self.mm.nextIteration()) |
|---|
| 224 |
d.addErrback(self._oops) |
|---|
| 225 |
return d |
|---|
| 226 |
|
|---|
| 227 |
def weights(self, XP, v): |
|---|
| 228 |
""" |
|---|
| 229 |
Returns a deferred that fires with a 1-D array of log importance |
|---|
| 230 |
weights for the parameter proposals in the 1-D FlexArray I{XP}. |
|---|
| 231 |
|
|---|
| 232 |
The importance weights are to be combined with those from other calls |
|---|
| 233 |
to this method with different variance settings. The weights will be |
|---|
| 234 |
used to resample the supplied proposals so that the probability of any |
|---|
| 235 |
given proposal being included in the final result for this iteration is |
|---|
| 236 |
proportional to its likelihood and prior probability density, and |
|---|
| 237 |
inversely proportional to the probability density of the proposal |
|---|
| 238 |
offset. |
|---|
| 239 |
""" |
|---|
| 240 |
def weight(paramContainer): |
|---|
| 241 |
if s.isfinite(paramContainer.Lx): |
|---|
| 242 |
L = paramContainer.Lx + paramContainer.Lp - paramContainer.Lj |
|---|
| 243 |
info = " ".join([ |
|---|
| 244 |
"%+12.2f" % (getattr(paramContainer, x),) |
|---|
| 245 |
for x in ('Lx', 'Lp', 'Lj')]) |
|---|
| 246 |
else: |
|---|
| 247 |
L = -s.inf |
|---|
| 248 |
if L < 0: |
|---|
| 249 |
print "%6.4f" % v |
|---|
| 250 |
else: |
|---|
| 251 |
print "%6.4f\t%+12.2f\t%s" % (v, L, info) |
|---|
| 252 |
return L |
|---|
| 253 |
|
|---|
| 254 |
# Evaluate the proposals (very time-consuming, done either locally in a |
|---|
| 255 |
# thread or in the cluster) |
|---|
| 256 |
dList = [self.mm.likelihood(XPk).addCallback(weight) for XPk in XP] |
|---|
| 257 |
return defer.gatherResults(dList).addCallback(lambda W: s.array(W)) |
|---|
| 258 |
|
|---|
| 259 |
@defer.deferredGenerator |
|---|
| 260 |
def run(self, N_iter): |
|---|
| 261 |
""" |
|---|
| 262 |
Does a PMC run with I{N_iter} iterations and the sequence of jump |
|---|
| 263 |
deviations in my I{V} attribute. The portion of the population members |
|---|
| 264 |
allocated to each jump variance will go up or down, depending on the |
|---|
| 265 |
performance for that setting. |
|---|
| 266 |
|
|---|
| 267 |
Returns a deferred that fires when the run is done. No output value is |
|---|
| 268 |
provided via the deferred, however. Instead, everything is written to |
|---|
| 269 |
my project manager's open NetCDF file. |
|---|
| 270 |
|
|---|
| 271 |
@param N_iter: The number of iterations to produce after burn-in. |
|---|
| 272 |
|
|---|
| 273 |
""" |
|---|
| 274 |
X = None |
|---|
| 275 |
if self.socket: |
|---|
| 276 |
wfd = defer.waitForDeferred(self.mm.setRemoteMode(self.socket)) |
|---|
| 277 |
yield wfd; wfd.getResult() |
|---|
| 278 |
N_members = self.pm.m |
|---|
| 279 |
print "\nRunning %d iterations with %d population members" \ |
|---|
| 280 |
% (N_iter, N_members) |
|---|
| 281 |
# For each iteration... |
|---|
| 282 |
for i in xrange(N_iter): |
|---|
| 283 |
print "\n%03d\n%s" % (i+1, "-"*100) |
|---|
| 284 |
if X is None: |
|---|
| 285 |
# No existing members, generate proposals directly from the |
|---|
| 286 |
# priors and simulate log-volatilities for them |
|---|
| 287 |
print "Generating %d proposals from priors " % N_members +\ |
|---|
| 288 |
"and simulating log-volatilities for them" |
|---|
| 289 |
wfd = defer.waitForDeferred( |
|---|
| 290 |
self.queue.call(self.proposals, N_members)) |
|---|
| 291 |
yield wfd |
|---|
| 292 |
XP = wfd.getResult() |
|---|
| 293 |
wfd = defer.waitForDeferred(self.simulations(XP)) |
|---|
| 294 |
yield wfd |
|---|
| 295 |
wfd.getResult() |
|---|
| 296 |
else: |
|---|
| 297 |
# Regular iteration, simulate log-volatilities for the last |
|---|
| 298 |
# iteration's members and use proposals generated from those |
|---|
| 299 |
# members |
|---|
| 300 |
print "\nSimulating log-volatilities for the previous "+\ |
|---|
| 301 |
"population and generating %d proposals " % N_members +\ |
|---|
| 302 |
"from the population:" |
|---|
| 303 |
resultList = [] |
|---|
| 304 |
dList = [self.simulations(X)] |
|---|
| 305 |
for vIndex, I, d in self.allocator.assembler(resultList): |
|---|
| 306 |
d.addCallback( |
|---|
| 307 |
lambda _: |
|---|
| 308 |
self.queue.call(self.proposals, X[I], self.V[vIndex])) |
|---|
| 309 |
d.addErrback(self._oops, vIndex=vIndex) |
|---|
| 310 |
dList.append(d) |
|---|
| 311 |
# Record the previous iteration's members while the nodes work on |
|---|
| 312 |
# generating the proposals and simulations for the current iteration... |
|---|
| 313 |
if i > 0: |
|---|
| 314 |
self.pm.writeParams(X) |
|---|
| 315 |
wfd = defer.waitForDeferred(defer.DeferredList(dList)) |
|---|
| 316 |
yield wfd |
|---|
| 317 |
wfd.getResult() |
|---|
| 318 |
XP = resultList[0] |
|---|
| 319 |
print "\n" |
|---|
| 320 |
# Evaluate the proposals |
|---|
| 321 |
dList, resultList = [], [] |
|---|
| 322 |
for vIndex, I, d in self.allocator.assembler(resultList): |
|---|
| 323 |
d.addCallback(lambda _: self.weights(XP[I], self.V[vIndex])) |
|---|
| 324 |
d.addErrback(self._oops, vIndex=vIndex) |
|---|
| 325 |
dList.append(d) |
|---|
| 326 |
wfd = defer.waitForDeferred(defer.DeferredList(dList)) |
|---|
| 327 |
yield wfd |
|---|
| 328 |
wfd.getResult() |
|---|
| 329 |
W = resultList[0] |
|---|
| 330 |
I0 = s.isfinite(W).nonzero()[0] |
|---|
| 331 |
if len(I0): |
|---|
| 332 |
# Resample everything together |
|---|
| 333 |
I1 = self.resampler(W[I0], N_members) |
|---|
| 334 |
X = XP[I0][I1] |
|---|
| 335 |
self.allocator.updateAllocations(I0[I1]) |
|---|
| 336 |
else: |
|---|
| 337 |
# No valid parameters, start over |
|---|
| 338 |
X = None |
|---|
| 339 |
self.allocator.updateAllocations() |
|---|
| 340 |
wfd = defer.waitForDeferred(self.pm.done()) |
|---|
| 341 |
yield wfd |
|---|
| 342 |
wfd.getResult() |
|---|
| 343 |
print "\nPMC Run done" |
|---|
| 344 |
|
|---|