Changeset 123
- Timestamp:
- 02/20/08 01:16:56 (10 months ago)
- Files:
-
- projects/AsynCluster/trunk/asyncluster/master/control.py (modified) (1 diff)
- projects/AsynCluster/trunk/asyncluster/master/test/mock.py (modified) (4 diffs)
- projects/AsynCluster/trunk/asyncluster/master/test/test_nodes.py (modified) (2 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/client.py (modified) (3 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/test/test_client.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/control.py
r122 r123 4 4 # any of the cluster node workstations. 5 5 # 6 # Copyright (C) 2006-200 7by Edwin A. Suominen, http://www.eepatents.com6 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 7 7 # 8 8 # This program is free software; you can redistribute it and/or modify it under projects/AsynCluster/trunk/asyncluster/master/test/mock.py
r79 r123 75 75 class Control(object): 76 76 def __init__(self): 77 self.counter = 0 78 self.workers = {} 77 79 self.attached = False 78 80 self.config = { … … 89 91 self.attached = None 90 92 93 def attachWorker(self, nodeRoot): 94 self.counter += 1 95 self.workers[self.counter] = nodeRoot 96 return defer.succeed(self.counter) 97 98 def detachWorker(self, ID): 99 del self.workers[ID] 100 return defer.succeed(None) 101 91 102 def getSessionManager(self): 92 103 return SessionManager() … … 115 126 return self.callRemote(*args[1:], **kw) 116 127 if called == 'reverseLogin': 117 result = (args[0] == self.serverPassword) 128 if args[0] == self.serverPassword: 129 result = True 130 else: 131 result = None 118 132 elif called == 'test': 119 133 result = 10*sum(args) + 100*sum(kw.values()) … … 161 175 def remote_reverseLogin(self, password): 162 176 self.password = password 163 return password == SERVER_PASSWORD 177 if password == SERVER_PASSWORD: 178 return True 164 179 165 180 projects/AsynCluster/trunk/asyncluster/master/test/test_nodes.py
r20 r123 1 # Node Display Manager (NDM): 2 # A simple X display manager for cluster nodes that also serve as 3 # access-restricted workstations. An NDM client runs on each node and 4 # communicates via Twisted's Perspective Broker to a master NDM server, which 5 # regulates when and how much each user can use his account on any of the 6 # workstations. The NDM server also dispatches cluster operations to the nodes 7 # via the NDM clients, unbeknownst to the workstation users. 1 # AsynCluster: Master 2 # A cluster management server based on Twisted's Perspective Broker. Dispatches 3 # cluster jobs and regulates when and how much each user can use his account on 4 # any of the cluster node workstations. 8 5 # 9 # Copyright (C) 2006 by Edwin A. Suominen, http://www.eepatents.com6 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 10 7 # 11 8 # This program is free software; you can redistribute it and/or modify it under … … 63 60 d.addCallback(checkResult) 64 61 return d 65 62 66 63 def testDetached(self): 64 self.p.nodeClient = True 67 65 self.p.ID = 234 68 66 self.p.detached() projects/AsynCluster/trunk/asyncluster/ndm/client.py
r122 r123 39 39 40 40 def checkTrust(f): 41 if f.im_self.trusted: 42 return f 43 raise jobs.TrustError() 41 """ 42 Decorates methods so that they only run if their class instance has a 43 I{trusted} attribute that is set C{True}. 44 """ 45 def wrapper(self, *args, **kw): 46 if self.trusted: 47 return f(self, *args, **kw) 48 raise jobs.TrustError() 49 50 wrapper.__name__ = f.__name__ 51 return wrapper 44 52 45 53 … … 135 143 return values 136 144 137 def _ workersRunning(self):138 """ 139 Returns the number of child worker processes currently running on the140 client node.141 """ 142 count = 0145 def _pids(self): 146 """ 147 Returns a list of PIDs of all child worker processes currently running 148 on the client node. 149 """ 150 pids = [] 143 151 for subdir in os.listdir("/proc/"): 144 152 if not subdir.isdigit(): … … 155 163 continue 156 164 if cmdline[2] == self.workerCmd: 157 count += 1158 return count165 pids.append(int(subdir)) 166 return pids 159 167 160 168 @checkTrust 161 def remote_spawnWorkers(self ):169 def remote_spawnWorkers(self, restart=False): 162 170 """ 163 171 Spawns child processes as needed to keep one child worker client 164 172 running for each CPU core of the node. 165 """ 166 N = len(self._mips()) - self._workersRunning() 173 174 If the I{restart} keyword is set C{True}, kills any child processes 175 currently running and then spawns one new child process per CPU core. 176 """ 177 pids = self._pids() 178 N = len(self._mips()) 179 if restart: 180 for pid in pids: 181 os.kill(pid, signal.SIGHUP) 182 else: 183 N -= len(pids) 167 184 for k in xrange(N): 168 185 os.spawnl(os.P_NOWAIT, PYTHON, PYTHON, "-c", self.workerCmd) 169 170 @checkTrust171 def remote_kill(self, *pids):172 """173 Kills the processes corresponding to the PID(s) supplied as one or more174 integer arguments.175 """176 for pid in pids:177 os.kill(pid, signal.SIGHUP)178 179 186 180 187 projects/AsynCluster/trunk/asyncluster/ndm/test/test_client.py
r119 r123 9 9 # to the workstation users. 10 10 # 11 # Copyright (C) 2006-200 7by Edwin A. Suominen, http://www.eepatents.com11 # Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 12 12 # 13 13 # This program is free software; you can redistribute it and/or modify it under … … 29 29 30 30 import os, signal 31 from twisted.internet import defer, utils 31 from twisted.internet import defer, utils, reactor 32 32 from twisted.trial.unittest import TestCase 33 33 … … 35 35 36 36 37 class Test_Client(TestCase): 37 def deferToDelay(delay=0.3): 38 d = defer.Deferred() 39 reactor.callLater(delay, d.callback, None) 40 return d 41 42 43 class Test_SessionRoot(TestCase): 38 44 def setUp(self): 39 self.client = client.ClientRoot(None, "foo") 45 self.root = client.SessionRoot("foo", None) 46 self.root.trusted = True 40 47 41 48 def _pids(self, *null): … … 65 72 d = self._pids() 66 73 d.addCallback(lambda x: setattr(self, '_count', len(x))) 67 d.addCallback(lambda _: self. client.remote_bash(script))74 d.addCallback(lambda _: self.root.remote_bash(script)) 68 75 d.addCallback(self.failUnlessEqual, True) 69 76 d.addCallback(self._pids) … … 72 79 73 80 def test_mips(self): 74 def gotResult(mips): 75 print mips 76 self.failUnless(len(mips) >= 1) 81 mips = self.root._mips() 82 self.failUnless(len(mips) >= 1) 83 self.failUnless(isinstance(mips[0], float)) 84 85 def test_pids(self): 86 def next(null): 87 pidsAfter = [x for x in self.root._pids() if x not in pidsBefore] 88 self.failUnlessEqual(pidsAfter, [pid]) 89 os.kill(pid, signal.SIGHUP) 77 90 78 d = defer.maybeDeferred(self.client.remote_mips) 79 d.addCallback(gotResult) 80 return d 91 pidsBefore = self.root._pids() 92 pid = os.spawnl( 93 os.P_NOWAIT, 94 client.PYTHON, client.PYTHON, "-c", self.root.workerCmd) 95 return deferToDelay().addCallback(next) 96 97 def test_spawnWorkers_normal(self): 98 def first(null): 99 # Running spawn should start a new worker per core 100 self.root.remote_spawnWorkers() 101 return deferToDelay().addCallback(second) 102 103 def second(null): 104 pids = self.root._pids() 105 self.failUnlessEqual(len(pids), len(self.root._mips())) 106 # Running spawn again shouldn't do anything 107 self.root.remote_spawnWorkers() 108 self.failUnlessEqual(pids, self.root._pids()) 109 110 for pid in self.root._pids(): 111 os.kill(pid, signal.SIGHUP) 112 return deferToDelay().addCallback(first) 113 114 def test_spawnWorkers_restart(self): 115 def first(null): 116 # Running spawn again with restart should result in all new workers 117 self.root.remote_spawnWorkers(restart=True) 118 return deferToDelay().addCallback(second) 119 120 def second(null): 121 for pid in self.root._pids(): 122 self.failIf(pid in pidsA) 123 124 # The worker(s) after the first spawn 125 self.root.remote_spawnWorkers() 126 pidsA = self.root._pids() 127 return deferToDelay().addCallback(first) 128
