root/projects/AsynCluster/trunk/asyncluster/master/control.py

Revision 206, 15.5 kB (checked in by edsuom, 6 months ago)

Got Rao-Blackwellized (on z only) version running locally, not yet remotely nor checked out for performance

Line 
1 # AsynCluster: Master
2 # A cluster management server based on Twisted's Perspective Broker. Dispatches
3 # cluster jobs and regulates when and how much each user can use his account on
4 # any of the cluster node workstations.
5 #
6 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com
7 #
8 # This program is free software; you can redistribute it and/or modify it under
9 # the terms of the GNU General Public License as published by the Free Software
10 # Foundation; either version 2 of the License, or (at your option) any later
11 # version.
12 #
13 # This program is distributed in the hope that it will be useful, but WITHOUT
14 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
16 #
17 # You should have received a copy of the GNU General Public License along with
18 # this program; if not, write to the Free Software Foundation, Inc., 51
19 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
20
21 """
22 An all-powerful Controller object and a UNIX socket PB interface to it.
23 """
24
25 from twisted.python.failure import Failure
26 from twisted.python.reflect import namedObject
27 from twisted.internet import defer, reactor, task
28 from twisted.spread import pb
29
30 from asynqueue import jobs
31 import database
32
33
34 UNRESTRICTED_LOGIN_HOURS = 10.0
35
36
37 def log(msgProto, nodeID, *args):
38     """
39     Logs a message based on the supplied message prototype about some
40     particular node and, if specified as a third argument, some particular
41     user. Any arguments beyond the first three are applied to string
42     substitution of the message prototype.
43     """
44     msg = "N%04d" % nodeID
45     if args:
46         msg = "%s (%s): %s" % (msg, args[0], msgProto)
47         if len(args) > 1:
48             msg = msg % args[1:]
49     else:
50         msg = "%s: %s" % (msg, msgProto)
51     print msg
52
53
54 class RemoteCallError(Exception):
55     """
56     A remote call raised an exception.
57     """
58
59
60 class SessionManager(pb.Viewable):
61     """
62     This viewable permits a node client to begin and end user access sessions.
63     
64     @ivar t: An L{AccessBroker} object that manages user session data.
65     
66     """
67     updateInterval = 30.0
68     updateFirstDelay = 1.0
69    
70     def __init__(self, ctl):
71         self.ctl = ctl
72         self.sessionMap = {}
73         url = self.ctl.config['server']['database']
74         self.t = database.UserDataTransactor(url)
75         self.looper = task.LoopingCall(self._update)
76         self.d = self.looper.start(self.updateInterval, now=True)
77
78     def _update(self):
79         dList = []
80         # We use 'keys' rather than 'iterkeys' because we may be modifying the
81         # sessionMap dict from within the loop.
82         for ID in self.sessionMap.keys():
83             userID, hoursLeft = self.sessionMap[ID]
84             hoursLeft -= float(self.updateInterval) / 3600.0
85             self.sessionMap[ID][1] = hoursLeft
86             log("%f hours now left", ID, userID, hoursLeft)
87             if hoursLeft >= 0.0:
88                 d = self.ctl.nodeRemote(ID, 'setTimeLeft', hoursLeft)
89             else:
90                 d = self.end(ID)
91             d.addCallback(
92                 lambda _:
93                 log("Done updating client", ID, userID))
94             dList.append(d)
95         return defer.DeferredList(dList)
96    
97     def view_begin(self, node, userID, password):
98         """
99         Begins a user access session for the specified I{node} perspective and
100         the specified I{userID} and I{password}.
101
102         @return: A (possibly deferred) boolean indicating whether the
103           session is authorized and was started.
104         
105         """
106         def gotAuthorizationResult(authorized):
107             if not authorized:
108                 log("Unauthorized session attempt", node.ID, userID)
109                 return False
110             if ID is None:
111                 d = defer.succeed(None)
112             else:
113                 log("Replacing session with one on N%02d", ID, userID, node.ID)
114                 d = defer.maybeDeferred(self.end, ID)
115             d.addCallback(lambda _: self.t.restricted(userID))
116             d.addCallback(gotRestrictionResult)
117             return d
118        
119         def gotRestrictionResult(restricted):
120             if not restricted:
121                 log("Unrestricted session started", node.ID, userID)
122                 self.sessionMap[node.ID] = [userID, UNRESTRICTED_LOGIN_HOURS]
123                 return True
124             d = self.t.sessionStart(userID)
125             d.addCallback(gotHoursAvailable)
126             return d
127
128         def gotHoursAvailable(hours):
129             if hours <= 0.0:
130                 log("No usage time available", node.ID, userID)
131                 return False
132             self.sessionMap[node.ID] = [userID, hours]
133             log("Session started, %f hours left", node.ID, userID, hours)
134             self.t.recordSessionStartTime(userID)
135             return True
136        
137         for ID, info in self.sessionMap.iteritems():
138             if info[0] == userID:
139                 break
140         else:
141             ID = None
142         d = self.t.sessionAuthorized(userID, password)
143         d.addCallback(gotAuthorizationResult)
144         return d
145
146     def view_timeLeft(self, node):
147         """
148         """
149         if node.ID in self.sessionMap:
150             userID, hoursLeft = self.sessionMap[node.ID]
151             log("%f hours left", node.ID, userID, hoursLeft)
152             return hoursLeft
153         return 0.0
154    
155     def view_end(self, node):
156         """
157         Ends the current user access session for the specified I{node}
158         perspective.
159
160         See L{end}.
161         """
162         return self.end(node.ID, callClient=False)
163
164     def end(self, ID, callClient=True):
165         """
166         Ends the current user access session for the specified node I{ID}.
167         """
168         if ID not in self.sessionMap:
169             d = defer.succeed(None)
170         else:
171             userID, hoursLeft = self.sessionMap.pop(ID)
172             log("Ending session with %f hours left", ID,  userID, hoursLeft)
173             d = self.t.sessionEnd(userID)
174             if callClient:
175                 d.addCallback(
176                     lambda _: self.ctl.nodeRemote(ID, 'setTimeLeft', 0.0))
177         return d
178
179
180 class Controller(object):
181     """
182     I control everything, I{heh heh heh...}
183
184     @ivar nodes: A dict of tuples, keyed by node client ID, where each tuple
185       contains the local perspective instance and remote root reference for one
186       node client.
187     
188     """
189     def __init__(self, config):
190         self.config = config
191         self.counter = 0
192         self.nodes = {}
193         self.jobber = jobs.JobManager()
194
195     def getSessionManager(self):
196         """
197         Returns references to a single instance of the L{SessionManager}
198         viewable.
199         """
200         if not hasattr(self, 'sessionManager'):
201             self.sessionManager = SessionManager(self)
202         return self.sessionManager
203
204     def attachNode(self, nodePerspective, nodeRoot):
205         """
206         Call when another mutually authenticated node client has
207         connected. Attaches its root referenceable to my map of node roots
208         under a new integer ID, then has the node spawn as many child worker
209         clients as needed to keep all of its cores occupied.
210
211         Returns a deferred that fires with the node ID when all the spawning is
212         done.
213         """
214         self.counter += 1
215         # Use a local copy of counter so it is unchanged when the callback
216         # fires
217         counter = self.counter
218         self.nodes[counter] = nodePerspective, nodeRoot
219         d = nodeRoot.callRemote('spawnWorkers')
220         return d.addCallback(lambda _: counter)
221
222     def detachNode(self, ID):
223         """
224         A node client has disconnected, so detach its root referenceable from
225         my map of node roots.
226         """
227         self.nodes.pop(ID, None)
228         if hasattr(self, 'sessionManager'):
229             return self.sessionManager.end(ID, callClient=False)
230
231     def attachWorker(self, nodeRoot):
232         """
233         Call when another mutually authenticated worker client has
234         connected. Attaches its root referenceable to my jobber and returns a
235         deferred that fires with a new integer ID as assigned by the jobber.
236         """
237         N = int(self.config['server']['jobs'])
238         return self.jobber.attachChild(nodeRoot, N)
239
240     def detachWorker(self, ID):
241         """
242         A worker client has disconnected, so detach it from the jobber.
243         """
244         return self.jobber.detachChild(ID)
245    
246     def _remoteError(self, failure, ID):
247         if failure.check(pb.DeadReferenceError, pb.PBConnectionLost):
248             return self.sessionManager.end(ID, callClient=False)
249         return failure
250    
251     def nodeRemote(self, nodeID, called, *args, **kw):
252         """
253         Runs a remote call to the object specified by the string I{called} on
254         the node identified by the integer I{nodeID}, supplying any provided
255         arguments or keywords.
256
257         Returns a deferred that fires with the result of the remote call.
258         """
259         if nodeID not in self.nodes:
260             return defer.fail(Failure(
261                 RemoteCallError("Invalid node '%s'" % nodeID)))
262         nodeRoot = self.nodes[nodeID][1]
263         d = nodeRoot.callRemote(called, *args, **kw)
264         d.addErrback(self._remoteError, nodeID)
265         return d
266    
267     def userRemote(self, userID, called, *args, **kw):
268         """
269         Calls the object specified by the string I{called} on the node on which
270         the user identified by the string I{userID} has an active session,
271         supplying any provided arguments or keywords.
272
273         Returns a deferred that fires with the result of the remote call.
274         """
275         for ID, nodeStuff in self.nodes.iteritems():
276             nodePerspective, nodeRoot = nodeStuff
277             if getattr(nodePerspective, 'userID', None) == userID:
278                 d = nodeRoot.callRemote(called, *args, **kw)
279                 d.addErrback(self._remoteError, ID)
280                 return d
281         return defer.fail(Failure(
282             RemoteCallError("Invalid user '%s'" % userID)))
283
284     def allRemote(self, called, *args, **kw):
285         """
286         Runs a remote call to the object specified by the string I{called} on
287         all connected nodes, supplying any provided arguments or keywords.
288
289         Returns a deferred that fires with a list of results of the remote
290         calls, in no particular node order.
291         """
292         dList = []
293         for ID, nodeStuff in self.nodes.iteritems():
294             d = nodeStuff[1].callRemote(called, *args, **kw)
295             d.addErrback(self._remoteError, ID)
296             dList.append(d)
297         return defer.gatherResults(dList)
298
299
300 class Root(pb.Root):
301     """
302     I am the root object that each control client receives upon making its UNIX
303     socket connection to the master control server.
304
305     All of the heavy lifting is done by an instance of L{Controller}, a
306     reference to which is supplied to my constructor.
307     """
308     def __init__(self, ctl):
309         self.ctl = ctl
310
311     def remote_userAction(self, userID, action, actionArg=None):
312         """
313         Carries out the specified I{action} concerning the user account
314         I{userID}. If the action requires an argument, it is supplied as the
315         I{actionArg} option.
316
317         The actions are as follows:
318             - B{password}: use the supplied string as the password
319             - B{disable}: disable the account
320             - B{disable}: enable the account
321             - B{msg}: send the supplied string to the user as a pop-up message
322             - B{kick}: kick the user off the system, disabling his account
323             
324         """
325         t = self.ctl.getSessionManager().t
326         if action == 'password':
327             d = t.password(userID, actionArg)
328         elif action == 'disable':
329             d = t.enabled(userID, False)
330         elif action == 'enable':
331             d = t.enabled(userID, True)
332         elif action == 'restrict':
333             d = t.restricted(userID, True)
334         elif action == 'unrestrict':
335             d = t.restricted(userID, False)
336         elif action == 'msg':
337             d = self.ctl.userRemote(userID, 'message', actionArg)
338         elif action == 'kick':
339             d = self.ctl.userRemote(userID, 'kick')
340         else:
341             return "INVALID COMMAND '%s'" % action
342         d.addCallbacks(lambda _: "OK", lambda _: "FAIL")
343         return d
344
345     def remote_wall(self, message):
346         """
347         """
348         return self.ctl.allRemote('message', message)
349
350     def remote_registerClasses(self, *args):
351         """
352         Instructs my broker to register the classes specified by the
353         argument(s) as self-unjellyable and allowable past PB security, and
354         instructs the jobber to have all current and future nodes do the same
355         with their brokers.
356         
357         The classes are specified by their string representations::
358         
359             <package(s).module.class>
360
361         Use judiciously!
362         """
363         for stringRep in args:
364             cls = namedObject(stringRep)
365             pb.setUnjellyableForClass(stringRep, cls)
366         return self.ctl.jobber.registerClasses(*args)
367
368     def remote_newJob(self, jobCode, niceness=0):
369         """
370         Registers a new computing job with the supplied I{jobCode}, returning a
371         deferred to an integer jobID identifying the job.
372
373         See L{jobs.JobManager.new}.
374         """
375         return self.ctl.jobber.new(jobCode, niceness)
376
377     def remote_updateJob(self, jobID, callName, *args, **kw):
378         """
379         Updates the job specified by I{jobID} by arranging to dispatch a call
380         to the callable object specified in the job's namespace on each node
381         before it runs another call for that job.
382
383         If you don't want the task saved for future workers, but only run on
384         the workers currently attached, set the I{ephemeral} keyword C{True}.
385         """
386         return self.ctl.jobber.update(jobID, callName, *args, **kw)
387
388     def remote_runJob(self, jobID, callName, *args, **kw):
389         """
390         Queues up a call to the callable object specified in the namespace of
391         the specified I{jobID}. The call will be dispatched to the next node
392         that is qualified and becomes available to run it.
393
394         See L{jobs.JobManager.run}.
395
396         @param jobID: An image identifying the namespace of a job previously
397             registered on one or more of the nodes.
398
399         @param callName: A string identifying a callable object in the job
400             namespace.
401
402         @*args: Any arguments to pass to the callable object.
403
404         @**kw: Any keywords to pass to the callable object.
405
406         @return: A deferred to the eventual result of the dispatch when it
407             eventually runs on a qualified node.
408
409         """
410         return self.ctl.jobber.run(jobID, callName, *args, **kw)
411
412     def remote_cancelJob(self, jobID):
413         """
414         Cancels the job specified by I{jobID}.
415         """
416         return self.ctl.jobber.cancel(jobID)
417
418     def remote_resetup(self, sourcePath):
419         """
420         Instructs each connected node to do a new python setup in the specified
421         I{sourcePath} and respawn its coeterie of child worker process.
422
423         Terminates upon encountering any problems with the setup runs.
424
425         Returns a deferred that fires with C{True} if all went well, C{False}
426         if there were any problems.
427         """
428         def next(successCodes):
429             for success in successCodes:
430                 if not success:
431                     return False
432             d = self.ctl.allRemote('spawnWorkers', True)
433             d.addCallbacks(lambda _: True, lambda _: False)
434             return d
435        
436         d = self.ctl.allRemote('bash', "cd %s; python setup.py install")
437         return d.addCallback(next)
438        
439        
Note: See TracBrowser for help on using the browser.