Changeset 121
- Timestamp:
- 02/14/08 22:27:24 (10 months ago)
- Files:
-
- projects/AsynCluster/trunk/asyncluster/master/control.py (modified) (4 diffs)
- projects/AsynCluster/trunk/asyncluster/master/nodes.py (modified) (5 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/client.py (modified) (10 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/main.py (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/control.py
r119 r121 42 42 substitution of the message prototype. 43 43 """ 44 msg = " N%02d" % nodeID44 msg = "%4s" % nodeID 45 45 if args: 46 46 msg = "%s (%s): %s" % (msg, args[0], msgProto) … … 178 178 179 179 180 class WorkerRoster(object): 181 """ 182 I manage a roster of child worker clients, generating new unique IDs as 183 requested and keeping track of which workers are running on which node. 184 185 @ivar pids: A dict of pids of child worker processes on the various nodes, 186 keyed by the unique ID I've assigned to that child worker. 187 188 """ 189 def __init__(self): 190 self.counter = 0 191 self.pids = {} 192 193 def all(self): 194 """ 195 Returns a list of all PIDs of all child worker processes on all 196 nodes. (There may be duplicates, due to the same PID being used on 197 different nodes.) 198 """ 199 return self.pids.values() 200 201 202 203 180 204 class Controller(object): 181 205 """ 182 206 I control everything, I{heh heh heh...} 207 208 @ivar nodes: A dict of tuples, keyed by node client ID, where each tuple 209 contains the local perspective instance and remote root reference for one 210 node client. 211 183 212 """ 184 213 def __init__(self, config): 185 214 self.config = config 215 self.counter = 0 186 216 self.nodes = {} 217 self.roster = WorkerRoster() 187 218 self.jobber = jobs.JobManager() 188 219 … … 196 227 return self.sessionManager 197 228 229 @defer.deferredGenerator 198 230 def attachNode(self, nodePerspective, nodeRoot): 199 231 """ 200 Another mutually authenticated node client has connected, so give it a 201 unique ID, attach its root referenceable to my map of node roots under 202 that ID, and return a deferred that fires with the ID when the jobber 203 has attached the node as well. 204 205 TODO: Don't attach the node to the jobber if the perspective is for a 206 GUI client. 207 """ 208 def gotID(ID): 209 self.nodes[ID] = nodePerspective, nodeRoot 210 return ID 211 212 return self.jobber.attachChild(nodeRoot).addCallback(gotID) 213 232 Call when another mutually authenticated node client has 233 connected. Attaches its root referenceable to my map of node roots 234 under a new ID in the form 'N001', then has the node spawn as many 235 child worker clients as needed to keep all of its cores occupied. 236 237 Returns a deferred that fires with the node ID when all the spawning is 238 done. 239 """ 240 self.counter += 1 241 ID = "N%03d" % self.counter 242 self.nodes[ID] = nodePerspective, nodeRoot 243 wfd = defer.waitForDeferred(nodeRoot.callRemote('mips')) 244 yield wfd 245 N_cores = len(wfd.getResult()) 246 wfd = defer.waitForDeferred( 247 nodeRoot.callRemote('checkWorkers', self,roster.all())) 248 yield wfd 249 250 214 251 def detachNode(self, ID): 215 252 """ 216 253 A node client has disconnected, so detach its root referenceable from 217 254 my map of node roots. 218 219 AsynQueue's job manager takes care of detaching the node's worker from220 the queue, so we don't get involved with that.221 255 """ 222 256 self.nodes.pop(ID, None) 257 if hasattr(self, 'sessionManager'): 258 return self.sessionManager.end(ID, callClient=False) 259 260 def attachWorker(self, nodeRoot, ID): 261 """ 262 Call when another mutually authenticated worker client has 263 connected. Attaches its root referenceable to my jobber and returns a 264 new ID in the form 'W001' based on the integer ID assigned by the 265 jobber. 266 """ 267 # TODO 268 d = self.jobber.attachChild(nodeRoot) 269 d.addCallback(lambda ID: "W%03d" % ID) 270 return d 271 272 def detachWorker(self, ID): 273 """ 274 A worker client has disconnected, so detach it from the jobber and 275 remote its PID from the I{workers} map. 276 """ 277 # TODO 223 278 if hasattr(self, 'sessionManager'): 224 279 return self.sessionManager.end(ID, callClient=False) … … 231 286 def nodeRemote(self, nodeID, called, *args, **kw): 232 287 """ 233 Immediately runs a non-queued call to the object specified by the234 string I{called} on the node identified by the integer I{nodeID},235 supplying any providedarguments or keywords.288 Runs a remote call to the object specified by the string I{called} on 289 the node identified by the integer I{nodeID}, supplying any provided 290 arguments or keywords. 236 291 """ 237 292 if nodeID not in self.nodes: projects/AsynCluster/trunk/asyncluster/master/nodes.py
r2 r121 4 4 # any of the cluster node workstations. 5 5 # 6 # Copyright (C) 2006-200 7by Edwin A. Suominen, http://www.eepatents.com6 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 7 7 # 8 8 # This program is free software; you can redistribute it and/or modify it under … … 38 38 master server. 39 39 40 @ivar ID: A unique ID for this node during a mutually authenticated 41 client-server connection. 40 @ivar ID: A unique ID for this client, established during a mutually 41 authenticated client-server connection, in the form 'N001' for node 42 clients and 'W001' for worker clients. 42 43 43 @ivar userID: The ID of any user having a session underway on the node. 44 @ivar userID: The ID of any user having a session underway on the client if 45 it is a node rather than a worker. 44 46 45 47 """ … … 64 66 this server to execute for computing jobs will not do bad things to it. 65 67 """ 66 def responded(accepted): 67 if accepted: 68 def responded(acceptanceCode): 69 if acceptanceCode is None: 70 d = defer.succeed(None) 71 else: 68 72 clientRoot.notifyOnDisconnect(self.detached) 69 d = self.ctl.attachNode(self, clientRoot) 73 if acceptanceCode is True: 74 d = self.ctl.attachNode(self, clientRoot) 75 else: 76 d = self.ctl.attachWorker(clientRoot, acceptanceCode) 70 77 d.addCallback(done) 71 else:72 d = defer.succeed(None)73 78 d.addCallback(lambda _: (pb.IPerspective, self, self.detached)) 74 79 return d 75 80 76 81 def done(ID): 77 print " N%02d: Attached" % ID82 print "%4s: Attached" % ID 78 83 self.ID = ID 79 84 … … 88 93 """ 89 94 if hasattr(self, 'ID'): 90 print " N%02d: Detached" % self.ID95 print "%4s: Detached" % self.ID 91 96 self.ctl.detachNode(self.ID) 92 97 del self.ID … … 135 140 def requestAvatar(self, avatarID, mind, *interfaces): 136 141 """ 137 Returns a deferred of fires with the required138 I{interface, perspective, logout} tuple after the perspective attempts139 a reverse log into the client.142 Returns a deferred that fires with the required I{interface, 143 perspective, logout} tuple after the perspective attempts a reverse 144 login to the client. 140 145 """ 141 146 if pb.IPerspective not in interfaces: projects/AsynCluster/trunk/asyncluster/ndm/client.py
r119 r121 9 9 # to the workstation users. 10 10 # 11 # Copyright (C) 2006-200 7by Edwin A. Suominen, http://www.eepatents.com11 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 12 12 # 13 13 # This program is free software; you can redistribute it and/or modify it under … … 28 28 """ 29 29 30 import os 30 import os, signal 31 31 from twisted.internet import defer, reactor, threads 32 32 from twisted.cred import credentials … … 35 35 from asynqueue import jobs 36 36 37 38 WINDOW_LINGER_TIME = 0.5 37 PYTHON="/usr/bin/python" 38 39 40 def checkTrust(f): 41 if f.im_self.trusted: 42 return f 43 raise jobs.TrustError() 39 44 40 45 … … 46 51 47 52 48 class ClientRoot(jobs.ChildRoot): 49 """ 50 I am the root resource for one cluster node, with the remote-callable 51 methods of a child worker plus a few extra for NDM purposes. 52 """ 53 def __init__(self, main, serverPassword): 53 class ChildRoot(jobs.ChildRoot): 54 """ 55 I am the root resource for one child worker client capable of running 56 cluster jobs. 57 """ 58 trusted = False 59 60 def __init__(self, serverPassword, ID): 61 self.serverPassword, self.ID = serverPassword, ID 62 63 def remote_reverseLogin(self, password): 64 """ 65 The server calls this method with its own password to authenticate 66 itself to the client, in this case a child worker client. 67 68 If the server is authenticated, returns my unique integer 69 ID. Otherwise, returns C{None}. 70 """ 71 self.trusted = (password == self.serverPassword) 72 if self.trusted: 73 return self.ID 74 75 76 class SessionRoot(pb.Root): 77 """ 78 I am the root resource for one NDM client capable of spawning worker 79 clients and managing user sessions. 80 """ 81 workerCmd = "from asyncluster.ndm import main; main.BaseManager()" 82 83 def __init__(self, serverPassword, main): 84 self.serverPassword = serverPassword 54 85 self.main = main 55 self.serverPassword = serverPassword 56 86 57 87 def remote_reverseLogin(self, password): 58 88 """ 59 89 The server calls this method with its own password to authenticate 60 itself to the client. 61 62 @return: C{True} if the server was successfully authenticated. 90 itself to the client, in this case a node client. 91 92 If the server is authenticated, returns C{True}. Otherwise, returns 93 C{None}. 63 94 """ 64 95 self.trusted = (password == self.serverPassword) 65 return self.trusted 96 if self.trusted: 97 return True 66 98 67 99 def remote_setTimeLeft(self, hoursLeft): … … 77 109 pass 78 110 111 @checkTrust 79 112 def remote_bash(self, script): 80 113 """ … … 89 122 return threads.deferToThread(os.waitpid, pid, 0).addCallback(done) 90 123 91 def remote_mips(self):124 def _mips(self): 92 125 """ 93 126 Returns a list of bogomips float values for each core in the client's … … 102 135 return values 103 136 137 def _workersRunning(self): 138 """ 139 Returns the number of child worker processes currently running on the 140 client node. 141 """ 142 count = 0 143 for subdir in os.listdir("/proc/"): 144 if not subdir.isdigit(): 145 continue 146 procPath = "/proc/%s/cmdline" % subdir 147 if not os.access(procPath, os.R_OK): 148 continue 149 fh = open(procPath, 'rb') 150 cmdline = fh.read().split('\x00') 151 fh.close() 152 if cmdline[0] != PYTHON: 153 continue 154 if cmdline[1] != '-c': 155 continue 156 if cmdline[2] == self.workerCmd: 157 count += 1 158 return count 159 160 @checkTrust 161 def remote_spawnWorkers(self): 162 """ 163 Spawns child processes as needed to keep one child worker client 164 running for each CPU core of the node. 165 """ 166 N = len(self._mips()) - self._workersRunning() 167 for k in xrange(N): 168 os.spawnl(os.P_NOWAIT, PYTHON, PYTHON, "-c", self.workerCmd) 169 170 @checkTrust 171 def remote_kill(self, *pids): 172 """ 173 Kills the processes corresponding to the PID(s) supplied as one or more 174 integer arguments. 175 """ 176 for pid in pids: 177 os.kill(pid, signal.SIGHUP) 178 179 104 180 105 181 class ClientFactory(pb.PBClientFactory): … … 116 192 I connect to the master TCP server via PB and offer it L{ClientRoot} as my 117 193 root resource object. 118 """ 119 def __init__(self, main): 194 195 If I am constructed with the I{ID} keyword set to an integer, any 196 connection to the server will be as a child worker client. Otherwise, any 197 such connection will be as a node client, in which case, I will obtain and 198 return a remote reference to the server's global session manager upon 199 connecting. 200 """ 201 def __init__(self, main, ID=None): 120 202 self.main = main 203 self.ID = ID 121 204 122 205 def connect(self): 123 206 """ 124 Connects to the master TCP server, returning a deferred that fires with 125 a remote reference to the server's global session manager. 207 Connects to the master TCP server. Returns a deferred that fires with 208 the perspective provided by the server if and when the connection 209 succeeds. 126 210 """ 127 211 def gotAnswer(answer): 128 212 if pb.IUnjellyable.providedBy(answer): 129 213 self.perspective = answer 130 return answer .callRemote('getSessionManager')214 return answer 131 215 raise ConnectionError("Couldn't authorize connection to server") 132 216 … … 139 223 credential = credentials.UsernamePassword(cc['user'], cc['password']) 140 224 serverPassword = self.main.config['common']['server password'] 141 self.root = ClientRoot(self.main, serverPassword) 225 if self.ID is None: 226 self.root = SessionRoot(serverPassword, self.main) 227 else: 228 self.root = ChildRoot(serverPassword, ID) 142 229 # Do the login 143 230 return factory.login(credential, self.root).addBoth(gotAnswer) … … 156 243 # When that happens, we will want to: 157 244 # (1) end any active session, and 158 d.addCallback(lambda _: self.main.sessionEnd()) 245 if self.ID is None: 246 d.addCallback(lambda _: self.main.sessionEnd()) 159 247 # (2) disconnect from the server 160 248 d.addCallback(lambda _: self.connector.disconnect()) projects/AsynCluster/trunk/asyncluster/ndm/main.py
r115 r121 9 9 # to the workstation users. 10 10 # 11 # Copyright (C) 2006-200 7by Edwin A. Suominen, http://www.eepatents.com11 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 12 12 # 13 13 # This program is free software; you can redistribute it and/or modify it under … … 32 32 from twisted.internet import defer 33 33 34 import configobj, client 35 34 36 35 37 CONFIG_PATH = "/etc/asyncluster.conf" … … 38 40 class BaseManager(object): 39 41 """ 40 I am a base class for the node client, GUI or console.42 I am a base class for node and worker clients alike. 41 43 42 @ivar d: A deferred that fires when the client connects to the 43 AsynCluster server. 44 45 @ivar config: A L{configobj} config object loaded from the config file. 44 @cvar config: A L{configobj} config object loaded from the config file. 46 45 47 46 """ 48 def __init__(self): 47 config = configobj.ConfigObj(CONFIG_PATH) 48 49 def __init__(self, ID): 49 50 # The Twisted reactor, with no GUI integration needed 50 51 from twisted.internet import reactor; self.reactor = reactor 52 # The session-less client 53 self.client = client.Client(self, ID) 51 54 # Go! 52 self.reactor.callWhenRunning(self. startup)55 self.reactor.callWhenRunning(self.client.connect) 53 56 self.reactor.run() 54 57 55 def gotConnected(self, sessionMgr):56 """57 Connected callback for console clients.58 """59 self.sessionMgr = sessionMgr60 61 def startup(self):62 import configobj, client63 self.config = configobj.ConfigObj(CONFIG_PATH)64 self.client = client.Client(self)65 self.activeUser = None66 self.client.connect().addCallback(self.gotConnected)67 68 58 def shutdown(self): 69 59 d = self.client.disconnect() … … 71 61 return d 72 62 63 64 class NodeManager(BaseManager): 65 """ 66 I am a manager for a node clients. 67 """ 68 def __init__(self): 69 # Start PyQt4 with Twisted integration 70 from twisted_goodies.qtwisted import qt4reactor 71 from PyQt4.QtGui import QApplication 72 self.app = QApplication([]) 73 qt4reactor.install(self.app) 74 # Now get the regular reactor 75 from twisted.internet import reactor; self.reactor = reactor 76 # The gui module... 77 import gui; self.gui = gui 78 # Go! 79 self.reactor.callWhenRunning(self.startup) 80 self.reactor.run() 81 82 def startup(self): 83 """ 84 Instantiates a session-capable client and connects it to the server. 85 """ 86 def gotSessionMgr(sessionMgr): 87 self.sessionMgr = sessionMgr 88 self.loginWindow = self.gui.LoginWindow(self) 89 90 self.client = client.Client(self) 91 d = self.client.connect() 92 d.addCallback(lambda p: p.callRemote('getSessionManager')) 93 d.addCallback(gotSessionMgr) 94 return d 95 73 96 def sessionBegin(self, user, password): 74 97 """ 75 For all clients, requests a session for the specified I{user},76 authenticated with thesupplied I{password}.98 Requests a session for the specified I{user}, authenticated with the 99 supplied I{password}. 77 100 """ 78 101 def gotSessionAnswer(approved): … … 93 116 def sessionUpdate(self, hoursLeft): 94 117 """ 95 For all clients, updates the session.118 Updates the session. 96 119 """ 97 120 if self.activeUser is None: … … 105 128 def sessionEnd(self, callServer=True): 106 129 """ 107 For all clients, ends the session. 108 """ 109 self.activeUser = None 110 if callServer: 111 return self.sessionMgr.callRemote('end') 112 return defer.succeed(None) 113 114 115 class GuiManager(BaseManager): 116 """ 117 """ 118 def __init__(self): 119 # Start PyQt4 with Twisted integration 120 from twisted_goodies.qtwisted import qt4reactor 121 from PyQt4.QtGui import QApplication 122 self.app = QApplication([]) 123 qt4reactor.install(self.app) 124 # Now get the regular reactor 125 from twisted.internet import reactor; self.reactor = reactor 126 # The gui module... 127 import gui; self.gui = gui 128 # Go! 129 self.reactor.callWhenRunning(self.startup) 130 self.reactor.run() 131 132 def gotConnected(self, sessionMgr): 133 """ 134 Connected callback for GUI clients. 135 """ 136 self.sessionMgr = sessionMgr 137 self.loginWindow = self.gui.LoginWindow(self) 138 139 def sessionEnd(self, callServer=True): 140 """ 141 Ends the session for GUI clients. 130 Ends the session, returning a deferred that fires when I'm ready for a 131 new session. 142 132 """ 143 133 self.loginWindow.show() … … 147 137 self.sessionWindow.close() 148 138 del self.sessionWindow 149 return BaseManager.sessionEnd(self, callServer) 139 self.activeUser = None 140 if callServer: 141 return self.sessionMgr.callRemote('end') 142 return defer.succeed(None) 150 143 151 144 152 if __name__ == '__main__':153 from optparse import OptionParser154 parser = OptionParser()155 parser.add_option(156 "-g", "--gui",157 action="store_true", dest="gui",158 help="Run the NDM application in a fixed-sized, unmanaged window")159 opts, args = parser.parse_args()160 161 if opts.gui:162 # Run the NDM application in a fixed-sized, unmanaged window with the163 # overall event loop under Twisted control.164 GuiManager()165 else:166 # Run a console-only client with no user session.167 BaseManager()168 169
