root/projects/AsynCluster/trunk/asyncluster/ndm/client.py

Revision 175, 9.5 kB (checked in by edsuom, 7 months ago)

Got D-Kernel PMC running with what I hope is Rao-Blackwellization

Line 
1 # AsynCluster: Node Display Manager (NDM)
2 # A simple X display manager for cluster nodes that also serve as
3 # access-restricted workstations.
4 #
5 # An NDM client runs on each node and communicates via Twisted's Perspective
6 # Broker to the Aysncluster server, which regulates when and how much each user
7 # can use his account on any of the workstations. The NDM server also
8 # dispatches cluster operations to the nodes via the NDM clients, unbeknownst
9 # to the workstation users.
10 #
11 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com
12 #
13 # This program is free software; you can redistribute it and/or modify it under
14 # the terms of the GNU General Public License as published by the Free Software
15 # Foundation; either version 2 of the License, or (at your option) any later
16 # version.
17 #
18 # This program is distributed in the hope that it will be useful, but WITHOUT
19 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
20 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
21 #
22 # You should have received a copy of the GNU General Public License along with
23 # this program; if not, write to the Free Software Foundation, Inc., 51
24 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
25
26 """
27 The PB-based network client for the cluster node.
28 """
29
30 import os, signal, sys
31 from twisted.internet import defer, threads, reactor
32 from twisted.cred import credentials
33 from twisted.spread import pb
34
35 from asynqueue import jobs
36
37 PYTHON = "/usr/bin/python"
38 RETRY_DELAY = 10.0  # sec
39
40
41 def checkTrust(f):
42     """
43     Decorates methods so that they only run if their class instance has a
44     I{trusted} attribute that is set C{True}.
45     """
46     def wrapper(self, *args, **kw):
47         if self.trusted:
48             return f(self, *args, **kw)
49         raise jobs.TrustError()
50
51     wrapper.__name__ = f.__name__
52     return wrapper
53
54
55 class ConnectionError(Exception):
56     """
57     An error occurred while trying to connect to the AsynCluster
58     server.
59     """
60
61
62 class ChildRoot(jobs.ChildRoot):
63     """
64     I am the root resource for one child worker client capable of running
65     cluster jobs.
66     """
67     trusted = False
68    
69     def __init__(self, serverPassword):
70         self.serverPassword = serverPassword
71
72     def remote_reverseLogin(self, password):
73         """
74         The server calls this method with its own password to authenticate
75         itself to the client, in this case a child worker client.
76
77         If the server is authenticated, returns the string 'child' to identify
78         me to the server as a child root. Otherwise, returns C{None}.
79         """
80         self.trusted = (password == self.serverPassword)
81         if self.trusted:
82             return 'child'
83
84
85 class NodeRoot(pb.Root):
86     """
87     I am the root resource for one NDM client capable of spawning worker
88     clients and managing user sessions.
89     """
90     workerCmd = "from asyncluster.ndm import worker; worker.run()"
91    
92     def __init__(self, serverPassword, main):
93         self.serverPassword = serverPassword
94         self.main = main
95
96     def remote_reverseLogin(self, password):
97         """
98         The server calls this method with its own password to authenticate
99         itself to the client, in this case a node client.
100
101         If the server is authenticated, returns the string 'node' to identify
102         me to the server as a node root. Otherwise, returns C{None}.
103         """
104         self.trusted = (password == self.serverPassword)
105         if self.trusted:
106             return 'node'
107
108     def remote_setTimeLeft(self, hoursLeft):
109         """
110         Sets the number of hours left (a float) to the user.
111         """
112         self.main.sessionUpdate(hoursLeft)
113    
114     def remote_message(self, message):
115         """
116         Displays a pop-up message on the session window, if one is present.
117         """
118         self.main.message(message)
119
120     @checkTrust
121     def remote_bash(self, script):
122         """
123         Runs the supplied I{script} in a bash shell, returning a deferred that
124         fires with C{True} if the shell finishes without error, i.e., with a
125         zero exit code, or C{False} otherwise.
126         """
127         def done(result):
128             return (result[1] == 0)
129        
130         pid = os.spawnl(os.P_NOWAIT, "/bin/sh", "/bin/sh", "-c", script)
131         return threads.deferToThread(os.waitpid, pid, 0).addCallback(done)
132
133     def _mips(self):
134         """
135         Returns a list of bogomips float values for each core in the client's
136         CPU, in order of CPU number.
137         """
138         values = {}
139         fh = open("/proc/cpuinfo", 'r')
140         for line in fh:
141             if line.startswith("processor"):
142                 cpuNumber = int(line.split()[-1])
143             elif line.startswith("bogomips"):
144                 values[cpuNumber] = float(line.split()[-1])
145         fh.close()
146         keys = values.keys()
147         keys.sort()
148         return [values[key] for key in keys]
149
150     def _pids(self):
151         """
152         Returns a list of PIDs of all child worker processes currently running
153         on the client node.
154         """
155         pids = []
156         for subdir in os.listdir("/proc/"):
157             if not subdir.isdigit():
158                 continue
159             procPath = "/proc/%s/cmdline" % subdir
160             if not os.access(procPath, os.R_OK):
161                 continue
162             fh = open(procPath, 'rb')
163             cmdline = fh.read().split('\x00')
164             fh.close()
165             if cmdline[0] != PYTHON:
166                 continue
167             if cmdline[1] != '-c':
168                 continue
169             if cmdline[2] == self.workerCmd:
170                 pids.append(int(subdir))
171         return pids
172
173     @checkTrust
174     def remote_spawnWorkers(self, restart=False):
175         """
176         Spawns child processes as needed to keep one child worker client
177         running for each CPU core of the node.
178
179         If the I{restart} keyword is set C{True}, kills any child processes
180         currently running and then spawns one new child process per CPU core.
181         """
182         pids = self._pids()
183         N = len(self._mips())
184         if restart:
185             for pid in pids:
186                 os.kill(pid, signal.SIGHUP)
187         else:
188             N -= len(pids)
189         for k in xrange(N):
190             os.spawnl(os.P_NOWAIT, PYTHON, PYTHON, "-c", self.workerCmd)
191
192
193 class ClientFactory(pb.PBClientFactory):
194     """
195     I am a client factory that terminates my Python process upon disconnection
196     from the AsynCluster server.
197     """
198     def clientConnectionFailed(self, connector, reason):
199         """
200         Called to indicate that I couldn't connect to the PB server
201         (yet). Retry after a while.
202         """
203         pb.PBClientFactory.clientConnectionFailed(self, connector, reason)
204         reactor.callLater(RETRY_DELAY, connector.connect)
205        
206     def clientConnectionLost(self, *args, **kw):
207         """
208         Called to terminate my process upon loss of connection to the PB server.
209         """
210         pb.PBClientFactory.clientConnectionLost(self, *args, **kw)
211         try:
212             reactor.stop()
213         except:
214             pass
215
216
217 class Client(object):
218     """
219     I connect to the master TCP server via PB and offer it L{ClientRoot} as my
220     root resource object.
221
222     If I am constructed with the I{session} keyword set C{True}, any connection
223     to the server will be as a node client. In that case, I will obtain and
224     return a remote reference to the server's global session manager upon
225     connecting. Otherwise, I'm just a lowly worker client.
226     """
227     def __init__(self, main, session=False):
228         self.main = main
229         self.session = session
230    
231     def connect(self):
232         """
233         Connects to the master TCP server. Returns a deferred that fires with
234         the perspective provided by the server if and when the connection
235         succeeds.
236         """
237         def gotAnswer(answer):
238             if pb.IUnjellyable.providedBy(answer):
239                 self.perspective = answer
240                 return answer
241             raise ConnectionError("Couldn't authorize connection to server")
242
243         cc = self.main.config['client']
244         # TCP Connection
245         factory = ClientFactory()
246         port = int(self.main.config['common']['tcp port'])
247         self.connector = reactor.connectTCP(cc['host'], port, factory)
248         # Login parameters
249         credential = credentials.UsernamePassword(cc['user'], cc['password'])
250         serverPassword = self.main.config['common']['server password']
251         if self.session:
252             self.root = NodeRoot(serverPassword, self.main)
253         else:
254             self.root = ChildRoot(serverPassword)
255         # Do the login
256         return factory.login(credential, self.root).addBoth(gotAnswer)
257
258     def disconnect(self):
259         """
260         Disconnects from the master TCP server, returning a deferred that fires
261         when the disconnection is complete. Before the TCP disconnection
262         occurs, any jobs that are running are allowed to finish and any active
263         session is ended.
264         """
265         # Get a deferred that will have fired when any running jobs are done.
266         d = getattr(self.root, 'd_runningJob', None)
267         if d is None or d.called:
268             d = defer.succeed(None)
269         # When that happens, we will want to:
270         # (1) end any active session, and
271         if self.session:
272             d.addCallback(lambda _: self.main.sessionEnd())
273         # (2) disconnect from the server
274         d.addCallback(lambda _: self.connector.disconnect())
275         # The returned deferred only fires once all this is done.
276         return d
277        
278            
279        
280        
281
282        
283
284    
Note: See TracBrowser for help on using the browser.