Changeset 94

Show
Ignore:
Timestamp:
10/18/07 01:19:28 (1 year ago)
Author:
edsuom
Message:

Prototype code for job updating...needs testing

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/asyncluster/master/control.py

    r89 r94  
    3434 
    3535 
     36def log(msgProto, nodeID, *args): 
     37    """ 
     38    Logs a message based on the supplied message prototype about some 
     39    particular node and, if specified as a third argument, some particular 
     40    user. Any arguments beyond the first three are applied to string 
     41    substitution of the message prototype. 
     42    """ 
     43    msg = "N%02d" % nodeID 
     44    if args: 
     45        msg = "%s (%s): %s" % (msg, args[0], msgProto) 
     46        if len(args) > 1: 
     47            msg = msg % args[1:] 
     48    else: 
     49        msg = "%s: %s" % (msg, msgProto) 
     50    print msg 
     51 
     52 
    3653class RemoteCallError(Exception): 
    3754    """ 
     
    6683            hoursLeft -= float(self.updateInterval) / 3600.0 
    6784            self.sessionMap[ID][1] = hoursLeft 
    68             self._log("%f hours now left", ID, userID, hoursLeft) 
     85            log("%f hours now left", ID, userID, hoursLeft) 
    6986            if hoursLeft >= 0.0: 
    7087                d = self.ctl.nodeRemote(ID, 'setTimeLeft', hoursLeft) 
     
    7390            d.addCallback( 
    7491                lambda _: 
    75                 self._log("Done updating client", ID, userID)) 
     92                log("Done updating client", ID, userID)) 
    7693            dList.append(d) 
    7794        return defer.DeferredList(dList) 
    78      
    79     def _log(self, msgProto, nodeID, *args): 
    80         msg = "N%02d" % nodeID 
    81         if args: 
    82             msg = "%s (%s): %s" % (msg, args[0], msgProto) 
    83             if len(args) > 1: 
    84                 msg = msg % args[1:] 
    85         else: 
    86             msg = "%s: %s" % (msg, msgProto) 
    87         print msg 
    8895     
    8996    def view_begin(self, node, userID, password): 
     
    98105        def gotAuthorizationResult(authorized): 
    99106            if not authorized: 
    100                 self._log("Unauthorized session attempt", node.ID, userID) 
     107                log("Unauthorized session attempt", node.ID, userID) 
    101108                return False 
    102109            if ID is None: 
    103110                d = defer.succeed(None) 
    104111            else: 
    105                 self._log( 
    106                     "Replacing session with one on N%02d", ID, userID, node.ID) 
     112                log("Replacing session with one on N%02d", ID, userID, node.ID) 
    107113                d = defer.maybeDeferred(self.end, ID) 
    108114            d.addCallback(lambda _: self.t.restricted(userID)) 
     
    112118        def gotRestrictionResult(restricted): 
    113119            if not restricted: 
    114                 self._log("Unrestricted session started", node.ID, userID) 
     120                log("Unrestricted session started", node.ID, userID) 
    115121                self.sessionMap[node.ID] = [userID, UNRESTRICTED_LOGIN_HOURS] 
    116122                return True 
     
    121127        def gotHoursAvailable(hours): 
    122128            if hours <= 0.0: 
    123                 self._log("No usage time available", node.ID, userID) 
     129                log("No usage time available", node.ID, userID) 
    124130                return False 
    125131            self.sessionMap[node.ID] = [userID, hours] 
    126             self._log("Session started, %f hours left", node.ID, userID, hours) 
     132            log("Session started, %f hours left", node.ID, userID, hours) 
    127133            self.t.recordSessionStartTime(userID) 
    128134            return True 
     
    142148        if node.ID in self.sessionMap: 
    143149            userID, hoursLeft = self.sessionMap[node.ID] 
    144             self._log("%f hours left", node.ID, userID, hoursLeft) 
     150            log("%f hours left", node.ID, userID, hoursLeft) 
    145151            return hoursLeft 
    146152        return 0.0 
     
    163169        else: 
    164170            userID, hoursLeft = self.sessionMap.pop(ID) 
    165             self._log( 
    166                 "Ending session with %f hours left", 
    167                 ID,  userID, hoursLeft) 
     171            log("Ending session with %f hours left", ID,  userID, hoursLeft) 
    168172            d = self.t.sessionEnd(userID) 
    169173            if callClient: 
     
    304308        """ 
    305309        return self.ctl.jobber.new(jobCode, niceness) 
     310 
     311    def remote_updateJob(self, jobID, callName, *args, **kw): 
     312        """ 
     313        Updates the job specified by I{jobID} by arranging to dispatch a call 
     314        to the callable object specified in the job's namespace on each node 
     315        before it runs another call for that job. 
     316        """ 
     317        return self.ctl.jobber.update(jobID, callName, *args, **kw) 
    306318     
    307319    def remote_runJob(self, jobID, callName, *args, **kw): 
  • projects/AsynCluster/trunk/asyncluster/master/jobs.py

    r93 r94  
    2424 
    2525import os.path 
     26 
     27from zope.interface import implements 
    2628from twisted.internet import reactor 
    2729from twisted.spread import pb 
     30 
     31from asynqueue import workers, TaskQueue 
    2832 
    2933 
     
    3236    I connect to the master TCP server via a UNIX socket and provide an 
    3337    interface for running jobs defined by a Python code file. 
     38 
     39    @ivar root: A remote reference to the root object provided by the server. 
     40     
    3441    """ 
    3542    root = None 
     
    98105        return d 
    99106 
     107    def update(self, cmd, *args, **kw): 
     108        """ 
     109        Arranges for a job update to be done on all present and future nodes 
     110        before they next run a job. Note that the update will apply to any jobs 
     111        currently queued up, too! 
     112 
     113        The update is done via the specified I{cmd} with any args and keywords 
     114        supplied. 
     115        """ 
     116        if hasattr(self, 'jobID'): 
     117            return self.root.callRemote('updateJob', self.jobID, cmd, *args, **kw) 
     118        raise Exception("No job registered!") 
     119 
    100120    def run(self, cmd, *args, **kw): 
    101121        """ 
     
    105125            return self.root.callRemote('runJob', self.jobID, cmd, *args, **kw) 
    106126        raise Exception("No job registered!") 
     127 
     128 
     129class JobWorker(workers.RemoteCallWorker): 
     130    """ 
     131    Instantiate me with a started instance of L{JobClient} and I'll use its 
     132    root reference and job runner. 
     133    """ 
     134    N = 30 
     135 
     136    def __init__(self, jobClient): 
     137        if not hasattr(jobClient, 'jobID'): 
     138            raise Exception("Supplied job client not started!") 
     139        self.client = jobClient 
     140        self.iQualified = [jobClient.jobID] 
     141        self.startup(jobClient.root) 
     142 
     143    def runNow(self, null, task): 
     144        cmd, args, kw = task.callTuple 
     145        d = self.client.run(cmd, *args, **kw) 
     146        job = (task, d) 
     147        self.jobs.append(job) 
     148        d.addCallback(self.doneTrying, job) 
     149        d.addErrback(self.oops) 
     150        # The task's deferred is NOT returned! 
     151 
     152    def stop(self): 
     153        d = workers.RemoteCallWorker.stop(self) 
     154        d.addCallback(lambda _: self.client.shutdown()) 
     155        return d 
     156