root/projects/AsynCluster/trunk/doc/example.py

Revision 119, 13.4 kB (checked in by edsuom, 10 months ago)

Working on having computing jobs done by node worker clients separate from the GUI client

Line 
1 #!/usr/bin/env python
2 #
3 # AsynCluster:
4 # A cluster management server based on Twisted's Perspective Broker. Dispatches
5 # cluster jobs and regulates when and how much each user can use his account on
6 # any of the cluster node workstations.
7 #
8 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
9 #
10 # This program is free software; you can redistribute it and/or modify it under
11 # the terms of the GNU General Public License as published by the Free Software
12 # Foundation; either version 2 of the License, or (at your option) any later
13 # version.
14 #
15 # This program is distributed in the hope that it will be useful, but WITHOUT
16 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
18 #
19 # You should have received a copy of the GNU General Public License along with
20 # this program; if not, write to the Free Software Foundation, Inc., 51
21 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
22
23 """
24 This script shows how Bayesian inference can be run on a cluster of nodes to
25 identify the location (median) and scale (half-width at half-maximum) of a
26 Cauchy-distributed random process.
27 """
28
29 import time, os.path
30 from random import sample as sampleWOR
31
32 import scipy as s
33 from scipy import stats
34
35 from twisted.internet import defer, reactor, threads
36
37 from asynqueue import ThreadQueue
38 from asyncluster.master import jobs
39 from twisted_goodies.pybywire import pack
40
41
42 def sample(W, N=None, logWeights=False):
43     """
44     Returns an index array that samples I{N} values from some external
45     array. Each element of the array will have a probability of being
46     sampled (with replacement) that is proportional to its corresponding
47     weight in the 1-D array I{W}.
48
49     Uses Walker's alias algorithm as set forth in L. Devroye, Non-Uniform
50     Random Variate Generation, p. 109,
51     http://cg.scs.carleton.ca/~luc/rnbookindex.html.
52
53     If N is not specified, it defaults to the length of W, i.e., importance
54     resampling.
55
56     If I{logWeights} is set C{True}, the weights are transformed from log
57     to linear.
58     """
59     if logWeights:
60         W = s.exp(W - W.max())
61     W /= sum(W)
62     I0 = (s.isfinite(W) * s.greater(W, 1E-7)).nonzero()[0]
63     K = len(W[I0])
64     if N is None:
65         N = K
66     if K == 0:
67         return []
68     if K == 1:
69         return [0]*N           
70     greater, smaller = [], []
71     T = s.zeros((K,2))
72     for m, w in enumerate(W[I0]):
73         q = T[m,0] = K*w
74         if q < 1:
75             smaller.append(m)
76         else:
77             greater.append(m)
78     while smaller and greater:
79         k = greater[-1]
80         m = smaller.pop()
81         T[m,1] = k
82         T[k,0] -= (1 - T[m,0])
83         if T[k,0] < 1:
84             greater.pop()
85             smaller.append(k)
86     V = s.rand(N)
87     RI = s.random.randint(0, K-1, N)
88     I1 = s.greater(V, T[RI,0]).choose(RI, T[RI,1].astype(int))
89     return I0[I1]
90
91
92 class Proposer(object):
93     """
94     I provide random variates and probabilities for jumps to be made in
95     proposing each new parameter vector. The jumps have a normal (Gaussian)
96     distribution, as is typical. It doesn't matter that we're trying to model a
97     somewhat different distribution.
98     """
99     def __init__(self):
100         self._jumpDist = stats.distributions.norm()
101    
102     def r(self, N, wiggle):
103         """
104         Returns an array of I{N} rows of parameter offsets drawn from the prior
105         distributions scaled by a I{wiggle} value between 0 and 1.
106         """
107         return wiggle * self._jumpDist.rvs((N, 2))
108
109     def p(self, X, wiggle):
110         """
111         Returns a vector of probability densities for each parameter offset in
112         X, or rows of X, under the assumption that they were generated from my
113         L{rProposal} method with the specified I{wiggle}.
114         """
115         if len(X.shape) == 1:
116             X = X.reshape(1, X.shape[0])
117         N = X.shape[0]
118         return self._jumpDist.pdf(X/wiggle) / wiggle
119
120
121 class NodeCaller(object):
122     """
123     I call on nodes to compute likelihoods of data given vectors of parameters.
124     """
125     nodecode = """
126     ###########################################################################
127     import scipy as s
128     from scipy import stats
129     from twisted_goodies.pybywire import pack
130
131     G = {}
132
133     def newData(data):
134         G['data'] = pack.Unpacker(data)()
135
136     def likelihood(paramVector):
137         X = pack.Unpacker(paramVector)()
138         # The next two lines are where all the real computing work gets done
139         distObj = stats.distributions.cauchy(loc=X[0], scale=X[1])
140         L = s.log(distObj.pdf(G['data'])).sum()
141         return pack.Packer(L)()
142     
143     ###########################################################################
144     """
145     def __init__(self, socket, data):
146         self.socket, self.data = socket, data
147    
148     def _oops(self, failure):
149         print "FAILURE:\n%s" % failure.getTraceback()
150
151     def startup(self):
152         """
153         Starts me up to run the named model remotely on the asyncluster nodes
154         via the supplied UNIX domain I{socket}. Returns a deferred that fires
155         when remotely-run operations can commence.
156         """
157         def maybeStarted(status):
158             if not status:
159                 raise Exception("Client couldn't connect!")
160             reactor.addSystemEventTrigger(
161                 'before', 'shutdown',  self.client.shutdown)
162             d = self.client.update('newData', pack.Packer(self.data)())
163             d.addErrback(self._oops)
164             return d
165        
166         self.client = jobs.JobClient(self.socket, codeString=self.nodecode)
167         return self.client.startup().addCallback(maybeStarted)
168    
169     def likelihood(self, X, runLocally=False):
170         """
171         Returns the log-likelihood of my data given the Cauchy distribution of
172         my model, after application of the location and scale parameters from
173         the supplied 2-element parameter vector I{X}.
174
175         This method returns the likelihood of the data given the parameters. It
176         does not consider any prior probability of the parameters; in this
177         example a 'non-informative uniform prior' is used.
178         """
179         def localLikelihood():
180             distObj = stats.distributions.cauchy(loc=X[0], scale=X[1])
181             return s.log(distObj.pdf(self.data)).sum()
182        
183         def gotResult(L_packed):
184             return pack.Unpacker(L_packed)()
185
186         if runLocally:
187             d = threads.deferToThread(localLikelihood)
188         else:
189             X_packed = pack.Packer(X)()
190             d = self.client.run('likelihood', X_packed, **{'timeout':20})
191             d.addCallbacks(gotResult, self._oops)
192         return d
193
194
195 class Population_Monte_Carlo(object):
196     """
197     Population MCMC with per Cappe et al.
198
199     I fit the location (median and mode) and scale (half-width at half-maximum)
200     of a Cauchy distribution to a set of observations from a Cauchy random
201     process. Basically, the Cauchy distribution accounts better for rare,
202     significant events that would be vanishingly unlikely under a Gaussian
203     model.
204
205     This example could fit a normal distribution to Gaussian-distributed data
206     instead, but that would be boring. You'd just be computing the mean and
207     variance of the data set.
208     """
209     runLocally = False
210     socket="/tmp/.ndm"
211     V = [0.10, 0.02, 0.005, 0.001]
212
213     def __init__(self, data):
214         self.proposer = Proposer()
215         self.caller = NodeCaller(self.socket, data)
216         self.fh = open('params.csv', 'w')
217         self.fh.write("loc, scale\n")
218
219     def subsetIndex(self, k):
220         """
221         Returns a subset index for the samples in my I{X} attribute that
222         correspond to the jump variance for the supplied index I{k}.
223         """
224         I = sampleWOR(self.Is, self.R[k])
225         self.Is = s.setdiff1d(self.Is, I)
226         return I
227
228     def weightedProposals(self, X, v):
229         """
230         Returns a deferred that fires with a 1-D array of valid proposals on
231         the supplied parameter array I{X}, given the specified proposal
232         variance I{v}, along with log-importance weights for those proposals.
233
234         Valid proposals are those with non-zero likelihood. The censoring to
235         only valid proposals means that fewer proposals may be returned than
236         supplied parameters, possibly no proposals at all.
237         
238         The importance weights are to be combined with those from other calls
239         to this method with different variance settings. The weights will be
240         used to resample the returned proposals so that the probability of any
241         given proposal being included in the final result for this iteration is
242         proportional to its likelihood, and inversely proportional to the
243         probability density of the proposal offest.
244         """
245         def gotLikelihoods(results):
246             XP = X + XD
247             L = s.array(results)
248             if len(L):
249                 I = s.isfinite(L).nonzero()[0]
250                 logProbJumps = s.log(self.proposer.p(XD[I], v)).sum(1)
251                 return XP[I], L[I] + logProbJumps
252             return XP, L
253
254         # Have the nodes evaluate the likelihood of each proposal, which is
255         # the most time-consuming step in real-life usage.
256         XD = self.proposer.r(len(X), v)
257         dList = [self.caller.likelihood(Xp, self.runLocally) for Xp in X+XD]
258         d = defer.gatherResults(dList)
259         d.addCallback(gotLikelihoods)
260         return d
261    
262     @defer.deferredGenerator
263     def run(self, N_iter, N_chains):
264         """
265         Does a PMC run with I{N_chains} population members and jump variances
266         in the supplied 1-D array I{V}. The number of population members for
267         each jump variance will go up or down, depending on the performance for
268         that setting.
269
270         B{NOTE}: What I've been calling variances are actually computed as
271         standard deviations, i.e., not squared. Should fix this either by
272         relabeling or adjusting the code.
273         
274         Returns a deferred that fires when the run is done. No output value is
275         provided via the deferred, however.
276
277         @param N_iter: The number of iterations to produce after burn-in.
278
279         @param N_chains: The number of Monte Carlo chains run at each
280           iteration.
281         
282         """
283         def gotWeightedProposals(results, k):
284             XP[k] = results[0]
285             W[k] = results[1]
286
287         def normalize(R):
288             # Rescale so that the proportions are global rather than
289             # individual. Highly successful settings will have a large portion
290             # of the next sample even if they last operated on only a few of
291             # the samples.
292             R = R.astype(float) / R.sum()
293             # Turn the scaled array into an array of subsample sizes, with a
294             # minimum size of two apiece.
295             R = s.clip(
296                 s.round_(N_chains*R), rMin,
297                 N_chains-rMin*(P-1)).astype(int)
298             # Twiddle the biggest one as needed to keep sum = N_chains
299             R[s.argmax(R)] += N_chains - sum(R)
300             # Replace the old list and subset index
301             self.R = R
302             self.Is = s.arange(N_chains)
303
304         # Connect the node caller to the AsynCluster master server
305         wfd = defer.waitForDeferred(self.caller.startup())
306         yield wfd; wfd.getResult()
307         # Some constants
308         P = len(self.V)
309         rMin = max([2, int(round(0.01*N_chains))])
310         # Initialize some arrays
311         normalize(s.ones(P))
312         X = self.proposer.r(N_chains, self.V[0])
313         # The iteration loop
314         for i in xrange(N_iter):
315             t0 = time.time()
316             # Generate and weight some proposals
317             XP, W, II = [[None]*P for k in (1,2,3)]
318             dList = []
319             for k, v in enumerate(self.V):
320                 I = II[k] = self.subsetIndex(k)
321                 d = self.weightedProposals(X[I], v)
322                 d.addCallback(gotWeightedProposals, k)
323                 dList.append(d)
324             wfd = defer.waitForDeferred(defer.DeferredList(dList))
325             yield wfd; wfd.getResult()
326             # Resample everything together
327             I = sample(s.concatenate(W), N_chains, logWeights=True)
328             if len(I):
329                 X = s.row_stack(XP)[I]
330             R = s.array([sum([x in II[k] for x in I]) for k in xrange(P)])
331             # Write the parameters for this iteration to the output file
332             for Xk in X:
333                 rowString = ", ".join(["%f" % x for x in Xk])
334                 self.fh.write(rowString + "\n")
335             # Normalize R to maintain a total of N_chains population members
336             normalize(R)
337             # Provide some info about this iteration
338             vInfo = ", ".join(["%3d" % r for r in self.R])
339             msg = "%04d, %5.2f sec, VR={%s} :  %s" % (
340                 i, time.time()-t0, vInfo,
341                 ", ".join(["%9.3g" % x for x in X.mean(0)]))
342             print msg
343         # All done
344         self.fh.close()
345         reactor.stop()
346
347
348 if __name__ == '__main__':
349     # Parameters for the example
350     N_obs = 50000
351     N_chains = 1000
352     N_iter = 1000
353     loc, scale = 0.1, 0.4
354
355     # Create some fake observations of a Cauchy random process
356     distObj = stats.distributions.cauchy(loc=loc, scale=scale)
357     data = distObj.rvs(N_obs)
358     fh = open('data.csv', 'w')
359     for x in data:
360         fh.write("%f\n" % x)
361     fh.close()
362    
363     # Create the Monte Carlo runner and set up the run
364     pmc = Population_Monte_Carlo(data)
365     reactor.callWhenRunning(pmc.run, N_iter, N_chains)
366
367     # Run!
368     reactor.run()
Note: See TracBrowser for help on using the browser.