Changeset 94
- Timestamp:
- 10/18/07 01:19:28 (1 year ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
projects/AsynCluster/trunk/asyncluster/master/control.py
r89 r94 34 34 35 35 36 def log(msgProto, nodeID, *args): 37 """ 38 Logs a message based on the supplied message prototype about some 39 particular node and, if specified as a third argument, some particular 40 user. Any arguments beyond the first three are applied to string 41 substitution of the message prototype. 42 """ 43 msg = "N%02d" % nodeID 44 if args: 45 msg = "%s (%s): %s" % (msg, args[0], msgProto) 46 if len(args) > 1: 47 msg = msg % args[1:] 48 else: 49 msg = "%s: %s" % (msg, msgProto) 50 print msg 51 52 36 53 class RemoteCallError(Exception): 37 54 """ … … 66 83 hoursLeft -= float(self.updateInterval) / 3600.0 67 84 self.sessionMap[ID][1] = hoursLeft 68 self._log("%f hours now left", ID, userID, hoursLeft)85 log("%f hours now left", ID, userID, hoursLeft) 69 86 if hoursLeft >= 0.0: 70 87 d = self.ctl.nodeRemote(ID, 'setTimeLeft', hoursLeft) … … 73 90 d.addCallback( 74 91 lambda _: 75 self._log("Done updating client", ID, userID))92 log("Done updating client", ID, userID)) 76 93 dList.append(d) 77 94 return defer.DeferredList(dList) 78 79 def _log(self, msgProto, nodeID, *args):80 msg = "N%02d" % nodeID81 if args:82 msg = "%s (%s): %s" % (msg, args[0], msgProto)83 if len(args) > 1:84 msg = msg % args[1:]85 else:86 msg = "%s: %s" % (msg, msgProto)87 print msg88 95 89 96 def view_begin(self, node, userID, password): … … 98 105 def gotAuthorizationResult(authorized): 99 106 if not authorized: 100 self._log("Unauthorized session attempt", node.ID, userID)107 log("Unauthorized session attempt", node.ID, userID) 101 108 return False 102 109 if ID is None: 103 110 d = defer.succeed(None) 104 111 else: 105 self._log( 106 "Replacing session with one on N%02d", ID, userID, node.ID) 112 log("Replacing session with one on N%02d", ID, userID, node.ID) 107 113 d = defer.maybeDeferred(self.end, ID) 108 114 d.addCallback(lambda _: self.t.restricted(userID)) … … 112 118 def gotRestrictionResult(restricted): 113 119 if not restricted: 114 self._log("Unrestricted session started", node.ID, userID)120 log("Unrestricted session started", node.ID, userID) 115 121 self.sessionMap[node.ID] = [userID, UNRESTRICTED_LOGIN_HOURS] 116 122 return True … … 121 127 def gotHoursAvailable(hours): 122 128 if hours <= 0.0: 123 self._log("No usage time available", node.ID, userID)129 log("No usage time available", node.ID, userID) 124 130 return False 125 131 self.sessionMap[node.ID] = [userID, hours] 126 self._log("Session started, %f hours left", node.ID, userID, hours)132 log("Session started, %f hours left", node.ID, userID, hours) 127 133 self.t.recordSessionStartTime(userID) 128 134 return True … … 142 148 if node.ID in self.sessionMap: 143 149 userID, hoursLeft = self.sessionMap[node.ID] 144 self._log("%f hours left", node.ID, userID, hoursLeft)150 log("%f hours left", node.ID, userID, hoursLeft) 145 151 return hoursLeft 146 152 return 0.0 … … 163 169 else: 164 170 userID, hoursLeft = self.sessionMap.pop(ID) 165 self._log( 166 "Ending session with %f hours left", 167 ID, userID, hoursLeft) 171 log("Ending session with %f hours left", ID, userID, hoursLeft) 168 172 d = self.t.sessionEnd(userID) 169 173 if callClient: … … 304 308 """ 305 309 return self.ctl.jobber.new(jobCode, niceness) 310 311 def remote_updateJob(self, jobID, callName, *args, **kw): 312 """ 313 Updates the job specified by I{jobID} by arranging to dispatch a call 314 to the callable object specified in the job's namespace on each node 315 before it runs another call for that job. 316 """ 317 return self.ctl.jobber.update(jobID, callName, *args, **kw) 306 318 307 319 def remote_runJob(self, jobID, callName, *args, **kw): projects/AsynCluster/trunk/asyncluster/master/jobs.py
r93 r94 24 24 25 25 import os.path 26 27 from zope.interface import implements 26 28 from twisted.internet import reactor 27 29 from twisted.spread import pb 30 31 from asynqueue import workers, TaskQueue 28 32 29 33 … … 32 36 I connect to the master TCP server via a UNIX socket and provide an 33 37 interface for running jobs defined by a Python code file. 38 39 @ivar root: A remote reference to the root object provided by the server. 40 34 41 """ 35 42 root = None … … 98 105 return d 99 106 107 def update(self, cmd, *args, **kw): 108 """ 109 Arranges for a job update to be done on all present and future nodes 110 before they next run a job. Note that the update will apply to any jobs 111 currently queued up, too! 112 113 The update is done via the specified I{cmd} with any args and keywords 114 supplied. 115 """ 116 if hasattr(self, 'jobID'): 117 return self.root.callRemote('updateJob', self.jobID, cmd, *args, **kw) 118 raise Exception("No job registered!") 119 100 120 def run(self, cmd, *args, **kw): 101 121 """ … … 105 125 return self.root.callRemote('runJob', self.jobID, cmd, *args, **kw) 106 126 raise Exception("No job registered!") 127 128 129 class JobWorker(workers.RemoteCallWorker): 130 """ 131 Instantiate me with a started instance of L{JobClient} and I'll use its 132 root reference and job runner. 133 """ 134 N = 30 135 136 def __init__(self, jobClient): 137 if not hasattr(jobClient, 'jobID'): 138 raise Exception("Supplied job client not started!") 139 self.client = jobClient 140 self.iQualified = [jobClient.jobID] 141 self.startup(jobClient.root) 142 143 def runNow(self, null, task): 144 cmd, args, kw = task.callTuple 145 d = self.client.run(cmd, *args, **kw) 146 job = (task, d) 147 self.jobs.append(job) 148 d.addCallback(self.doneTrying, job) 149 d.addErrback(self.oops) 150 # The task's deferred is NOT returned! 151 152 def stop(self): 153 d = workers.RemoteCallWorker.stop(self) 154 d.addCallback(lambda _: self.client.shutdown()) 155 return d 156
