Changeset 122
- Timestamp:
- 02/14/08 23:18:31 (1 year ago)
- Files:
-
- projects/AsynCluster/trunk/asyncluster/master/control.py (modified) (5 diffs)
- projects/AsynCluster/trunk/asyncluster/master/nodes.py (modified) (3 diffs)
- projects/AsynCluster/trunk/asyncluster/master/test/test_control.py (modified) (3 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/client.py (modified) (4 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/main.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/control.py
r121 r122 178 178 179 179 180 class WorkerRoster(object):181 """182 I manage a roster of child worker clients, generating new unique IDs as183 requested and keeping track of which workers are running on which node.184 185 @ivar pids: A dict of pids of child worker processes on the various nodes,186 keyed by the unique ID I've assigned to that child worker.187 188 """189 def __init__(self):190 self.counter = 0191 self.pids = {}192 193 def all(self):194 """195 Returns a list of all PIDs of all child worker processes on all196 nodes. (There may be duplicates, due to the same PID being used on197 different nodes.)198 """199 return self.pids.values()200 201 202 203 204 180 class Controller(object): 205 181 """ … … 215 191 self.counter = 0 216 192 self.nodes = {} 217 self.roster = WorkerRoster()218 193 self.jobber = jobs.JobManager() 219 194 … … 227 202 return self.sessionManager 228 203 229 @defer.deferredGenerator230 204 def attachNode(self, nodePerspective, nodeRoot): 231 205 """ 232 206 Call when another mutually authenticated node client has 233 207 connected. Attaches its root referenceable to my map of node roots 234 under a new ID in the form 'N001', then has the node spawn as many235 c hild worker clients as needed to keep all of its cores occupied.208 under a new integer ID, then has the node spawn as many child worker 209 clients as needed to keep all of its cores occupied. 236 210 237 211 Returns a deferred that fires with the node ID when all the spawning is … … 239 213 """ 240 214 self.counter += 1 241 ID = "N%03d" % self.counter 242 self.nodes[ID] = nodePerspective, nodeRoot 243 wfd = defer.waitForDeferred(nodeRoot.callRemote('mips')) 244 yield wfd 245 N_cores = len(wfd.getResult()) 246 wfd = defer.waitForDeferred( 247 nodeRoot.callRemote('checkWorkers', self,roster.all())) 248 yield wfd 249 215 self.nodes[self.counter] = nodePerspective, nodeRoot 216 d = nodeRoot.callRemote('spawnWorkers') 217 return d.addCallback(lambda _: self.counter) 250 218 251 219 def detachNode(self, ID): … … 258 226 return self.sessionManager.end(ID, callClient=False) 259 227 260 def attachWorker(self, nodeRoot , ID):228 def attachWorker(self, nodeRoot): 261 229 """ 262 230 Call when another mutually authenticated worker client has 263 231 connected. Attaches its root referenceable to my jobber and returns a 264 new ID in the form 'W001' based on the integer ID assigned by the 265 jobber. 266 """ 267 # TODO 268 d = self.jobber.attachChild(nodeRoot) 269 d.addCallback(lambda ID: "W%03d" % ID) 270 return d 232 deferred that fires with a new integer ID as assigned by the jobber. 233 """ 234 return self.jobber.attachChild(nodeRoot) 271 235 272 236 def detachWorker(self, ID): 273 237 """ 274 A worker client has disconnected, so detach it from the jobber and 275 remote its PID from the I{workers} map. 276 """ 277 # TODO 278 if hasattr(self, 'sessionManager'): 279 return self.sessionManager.end(ID, callClient=False) 280 238 A worker client has disconnected, so detach it from the jobber. 239 """ 240 return self.jobber.detachChild(ID) 241 281 242 def _remoteError(self, failure, ID): 282 243 if failure.check(pb.DeadReferenceError, pb.PBConnectionLost): projects/AsynCluster/trunk/asyncluster/master/nodes.py
r121 r122 34 34 class Perspective(pb.Avatar): 35 35 """ 36 Each node's PB client receives a reference to its very own instance of me37 as its perspective upon making an authenticated TCP connection to the node38 master server.36 Each PB node and worker client receives a reference to its very own 37 instance of me as its perspective upon making an authenticated TCP 38 connection to the node master server. 39 39 40 40 @ivar ID: A unique ID for this client, established during a mutually 41 authenticated client-server connection, in the form 'N001' for node 42 clients and 'W001' for worker clients. 41 authenticated client-server connection. 43 42 44 43 @ivar userID: The ID of any user having a session underway on the client if 45 44 it is a node rather than a worker. 45 46 @ivar nodeClient: A Boolean that is set C{True} if my client is a node 47 client, as opposed to a child worker client. 46 48 47 49 """ … … 55 57 """ 56 58 return self.ctl.getSessionManager() 59 60 def _printableID(self): 61 return "%s%4d" % ("WN"[self.nodeClient], self.ID) 57 62 58 63 def attached(self, clientRoot): … … 68 73 def responded(acceptanceCode): 69 74 if acceptanceCode is None: 70 d = defer.succeed(None) 75 return (pb.IPerspective, self, lambda _: None) 76 if acceptanceCode is True: 77 self.nodeClient = True 78 d = self.ctl.attachNode(self, clientRoot) 71 79 else: 72 clientRoot.notifyOnDisconnect(self.detached) 73 if acceptanceCode is True: 74 d = self.ctl.attachNode(self, clientRoot) 75 else: 76 d = self.ctl.attachWorker(clientRoot, acceptanceCode) 77 d.addCallback(done) 78 d.addCallback(lambda _: (pb.IPerspective, self, self.detached)) 79 return d 80 self.nodeClient = False 81 d = self.ctl.attachWorker(clientRoot) 82 clientRoot.notifyOnDisconnect(self.detached) 83 return d.addCallback(doneAttaching) 80 84 81 def done(ID): 82 print "%4s: Attached" % ID 85 def doneAttaching(ID): 83 86 self.ID = ID 87 print "%s: Attached" % self._printableID() 88 return pb.IPerspective, self, self.detached 84 89 85 90 serverPassword = self.ctl.config['common']['server password'] 86 91 d = clientRoot.callRemote('reverseLogin', serverPassword) 87 d.addCallback(responded) 88 return d 92 return d.addCallback(responded) 89 93 90 94 def detached(self, *null): 91 95 """ 92 Called when the nodeclient disconnects.96 Called when the client disconnects. 93 97 """ 94 98 if hasattr(self, 'ID'): 95 print "%4s: Detached" % self.ID 96 self.ctl.detachNode(self.ID) 99 print "%s: Detached" % self._printableID() 100 if self.nodeClient: 101 d = self.ctl.detachNode(self.ID) 102 else: 103 d = self.ctl.detachWorker(self.ID) 97 104 del self.ID 105 return d 98 106 99 107 projects/AsynCluster/trunk/asyncluster/master/test/test_control.py
r2 r122 98 98 def setUp(self): 99 99 def testableInit(self): 100 self.counter = 0 100 101 self.nodes = {} 101 102 self.jobber = mock.JobManager() … … 112 113 def checkAttached(null): 113 114 self.failUnless(expectedID in self.ctl.nodes) 114 self.failUnlessEqual(115 self.ctl.jobber.attached.get(expectedID, None), root)116 115 117 116 d = self.ctl.attachNode(node, root) … … 191 190 d.addCallback(self.ctl.nodeRemote, 'message', president) 192 191 d.addCallback(checkCalled, president) 193 self.ctl.detachNode(self._nodeID)192 d.addCallback(lambda _: self.ctl.detachNode(self._nodeID)) 194 193 d.addCallback( 195 194 lambda _: self.ctl.nodeRemote(self._nodeID, 'message', president)) projects/AsynCluster/trunk/asyncluster/ndm/client.py
r121 r122 187 187 raise ConnectionError("Couldn't connect to server") 188 188 189 def connectionLost(self): 190 """ 191 Called to terminate my process upon loss of connection to the PB server. 192 """ 193 reactor.stop() 194 189 195 190 196 class Client(object): … … 193 199 root resource object. 194 200 195 If I am constructed with the I{ID} keyword set to an integer, any 196 connection to the server will be as a child worker client. Otherwise, any 197 such connection will be as a node client, in which case, I will obtain and 201 If I am constructed with the I{session} keyword set C{True}, any connection 202 to the server will be as a node client. In that case, I will obtain and 198 203 return a remote reference to the server's global session manager upon 199 connecting. 200 """ 201 def __init__(self, main, ID=None):204 connecting. Otherwise, I'm just a lowly worker client. 205 """ 206 def __init__(self, main, session=False): 202 207 self.main = main 203 self. ID = ID208 self.session = session 204 209 205 210 def connect(self): … … 223 228 credential = credentials.UsernamePassword(cc['user'], cc['password']) 224 229 serverPassword = self.main.config['common']['server password'] 225 if self. ID is None:230 if self.session: 226 231 self.root = SessionRoot(serverPassword, self.main) 227 232 else: 228 self.root = ChildRoot(serverPassword , ID)233 self.root = ChildRoot(serverPassword) 229 234 # Do the login 230 235 return factory.login(credential, self.root).addBoth(gotAnswer) … … 243 248 # When that happens, we will want to: 244 249 # (1) end any active session, and 245 if self. ID is None:250 if self.session: 246 251 d.addCallback(lambda _: self.main.sessionEnd()) 247 252 # (2) disconnect from the server projects/AsynCluster/trunk/asyncluster/ndm/main.py
r121 r122 47 47 config = configobj.ConfigObj(CONFIG_PATH) 48 48 49 def __init__(self , ID):49 def __init__(self): 50 50 # The Twisted reactor, with no GUI integration needed 51 51 from twisted.internet import reactor; self.reactor = reactor 52 52 # The session-less client 53 self.client = client.Client(self , ID)53 self.client = client.Client(self) 54 54 # Go! 55 55 self.reactor.callWhenRunning(self.client.connect) … … 88 88 self.loginWindow = self.gui.LoginWindow(self) 89 89 90 self.client = client.Client(self )90 self.client = client.Client(self, session=True) 91 91 d = self.client.connect() 92 92 d.addCallback(lambda p: p.callRemote('getSessionManager'))
