Changeset 127
- Timestamp:
- 02/22/08 22:42:38 (10 months ago)
- Files:
-
- projects/AsynCluster/trunk/asyncluster/master/control.py (modified) (3 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/client.py (modified) (3 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/gui.py (modified) (3 diffs)
- projects/AsynCluster/trunk/asyncluster/ndm/node.py (modified) (3 diffs)
- projects/AsynCluster/trunk/console (modified) (2 diffs)
- projects/AsynCluster/trunk/misc/etc_asyncluster.conf (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/control.py
r125 r127 213 213 """ 214 214 self.counter += 1 215 self.nodes[self.counter] = nodePerspective, nodeRoot 215 # Use a local copy of counter so it is unchanged when the callback 216 # fires 217 counter = self.counter 218 self.nodes[counter] = nodePerspective, nodeRoot 216 219 d = nodeRoot.callRemote('spawnWorkers') 217 return d.addCallback(lambda _: self.counter)220 return d.addCallback(lambda _: counter) 218 221 219 222 def detachNode(self, ID): … … 232 235 deferred that fires with a new integer ID as assigned by the jobber. 233 236 """ 234 return self.jobber.attachChild(nodeRoot) 237 N = int(self.config['server']['jobs']) 238 return self.jobber.attachChild(nodeRoot, N) 235 239 236 240 def detachWorker(self, ID): … … 287 291 """ 288 292 for ID, nodeStuff in self.nodes.iteritems(): 289 d = node Root.callRemote(called, *args, **kw)293 d = nodeStuff[1].callRemote(called, *args, **kw) 290 294 d.addErrback(self._remoteError, ID) 291 295 dList.append(d) projects/AsynCluster/trunk/asyncluster/ndm/client.py
r125 r127 35 35 from asynqueue import jobs 36 36 37 PYTHON="/usr/bin/python" 37 PYTHON = "/usr/bin/python" 38 RETRY_DELAY = 10.0 # sec 38 39 39 40 … … 133 134 """ 134 135 Returns a list of bogomips float values for each core in the client's 135 CPU .136 """ 137 values = []136 CPU, in order of CPU number. 137 """ 138 values = {} 138 139 fh = open("/proc/cpuinfo", 'r') 139 140 for line in fh: 140 if line.startswith("bogomips"): 141 values.append(float(line.split()[-1])) 141 if line.startswith("processor"): 142 cpuNumber = int(line.split()[-1]) 143 elif line.startswith("bogomips"): 144 values[cpuNumber] = float(line.split()[-1]) 142 145 fh.close() 143 return values 146 keys = values.keys() 147 keys.sort() 148 return [values[key] for key in keys] 144 149 145 150 def _pids(self): … … 191 196 from the AsynCluster server. 192 197 """ 198 def clientConnectionFailed(self, connector, reason): 199 """ 200 Called to indicate that I couldn't connect to the PB server 201 (yet). Retry after a while. 202 """ 203 pb.PBClientFactory.clientConnectionFailed(self, connector, reason) 204 reactor.callLater(RETRY_DELAY, connector.connect) 205 193 206 def clientConnectionLost(self, *args, **kw): 194 207 """ 195 208 Called to terminate my process upon loss of connection to the PB server. 196 209 """ 197 print "Connection lost to server!"198 210 pb.PBClientFactory.clientConnectionLost(self, *args, **kw) 199 211 try: projects/AsynCluster/trunk/asyncluster/ndm/gui.py
r125 r127 37 37 38 38 # Now the regular imports 39 import os 39 import os, pwd 40 40 from twisted.internet import defer, reactor, protocol 41 41 from PyQt4 import QtCore, QtGui … … 219 219 p, windowManager, (windowManager,), 220 220 env=env, path=homeDir, uid=uid) 221 os.system("renice +%d - -user%s" % (niceness, self.user))221 os.system("renice +%d -u %s" % (niceness, self.user)) 222 222 p.d.addCallback(lambda _: self.sessionEnd()) 223 223 224 224 def sessionEnd(self): 225 225 def ended(null): … … 237 237 os.system("killall --user %s" % self.user) 238 238 util.log("Killed all user processes") 239 239 240 240 def closeEvent(self, event): 241 241 """ projects/AsynCluster/trunk/asyncluster/ndm/node.py
r125 r127 41 41 42 42 """ 43 def __init__(self, headless=False ):43 def __init__(self, headless=False, duration=None): 44 44 import configobj 45 45 self.config = configobj.ConfigObj(CONFIG_PATH) … … 53 53 from twisted.internet import reactor 54 54 reactor.callWhenRunning(self.startup) 55 if isinstance(duration, (float, int)): 56 reactor.callLater(float(duration), reactor.stop) 55 57 reactor.run() 56 58 … … 125 127 Manager() 126 128 127 def runHeadless( ):128 Manager(headless=True )129 def runHeadless(duration=None): 130 Manager(headless=True, duration=duration) projects/AsynCluster/trunk/console
r89 r127 108 108 if cmd == 'user': 109 109 cmd = 'userAction' 110 elif cmd == 'new': 111 cmd = 'newJob' 112 filePath = args[0] 113 if not os.path.exists(filePath): 114 return self.oops("File '%s' not found" % filePath) 115 args = [self._jobCode(filePath)] 116 elif cmd == 'run': 117 cmd = 'runJob' 110 elif cmd == 'resetup': 111 srcDir = args[0] 112 if not os.path.isdir(srcDir): 113 return self.oops("Source directory '%s' not found" % srcDir) 118 114 return caller(cmd, *args) 119 115 … … 434 430 self.terminal.resetPrivateModes([privateModes.CURSOR_MODE]) 435 431 436 self.rootWidget = TopWindow(self._painter) #, lambda f: self.reactor.callLater(0, f)) 432 self.rootWidget = TopWindow( 433 self._painter, lambda f: self.reactor.callLater(0, f)) 437 434 self.rootWidget.reactor = self.reactor 438 435 vbox = VBox() projects/AsynCluster/trunk/misc/etc_asyncluster.conf
r2 r127 10 10 # Example: "subnets = 127.0.0.1, 192.168.1.0/24" 11 11 subnets = 127.0.0.1, 192.168.135.0/24 12 13 # Number of jobs to queue up at a time for an individual worker 14 jobs = 2 12 15 13 16
