Changeset 123

Show
Ignore:
Timestamp:
02/20/08 01:16:56 (10 months ago)
Author:
edsuom
Message:

Got refactored node/worker client version done, partially unit tested

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • projects/AsynCluster/trunk/asyncluster/master/control.py

    r122 r123  
    44# any of the cluster node workstations. 
    55# 
    6 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com 
     6# Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 
    77# 
    88# This program is free software; you can redistribute it and/or modify it under 
  • projects/AsynCluster/trunk/asyncluster/master/test/mock.py

    r79 r123  
    7575class Control(object): 
    7676    def __init__(self): 
     77        self.counter = 0 
     78        self.workers = {} 
    7779        self.attached = False 
    7880        self.config = { 
     
    8991        self.attached = None 
    9092 
     93    def attachWorker(self, nodeRoot): 
     94        self.counter += 1 
     95        self.workers[self.counter] = nodeRoot 
     96        return defer.succeed(self.counter) 
     97 
     98    def detachWorker(self, ID): 
     99        del self.workers[ID] 
     100        return defer.succeed(None) 
     101         
    91102    def getSessionManager(self): 
    92103        return SessionManager() 
     
    115126            return self.callRemote(*args[1:], **kw) 
    116127        if called == 'reverseLogin': 
    117             result = (args[0] == self.serverPassword) 
     128            if args[0] == self.serverPassword: 
     129                result = True 
     130            else: 
     131                result = None 
    118132        elif called == 'test': 
    119133            result = 10*sum(args) + 100*sum(kw.values()) 
     
    161175    def remote_reverseLogin(self, password): 
    162176        self.password = password 
    163         return password == SERVER_PASSWORD 
     177        if password == SERVER_PASSWORD: 
     178            return True 
    164179 
    165180 
  • projects/AsynCluster/trunk/asyncluster/master/test/test_nodes.py

    r20 r123  
    1 # Node Display Manager (NDM): 
    2 # A simple X display manager for cluster nodes that also serve as 
    3 # access-restricted workstations. An NDM client runs on each node and 
    4 # communicates via Twisted's Perspective Broker to a master NDM server, which 
    5 # regulates when and how much each user can use his account on any of the 
    6 # workstations. The NDM server also dispatches cluster operations to the nodes 
    7 # via the NDM clients, unbeknownst to the workstation users. 
     1# AsynCluster: Master 
     2# A cluster management server based on Twisted's Perspective Broker. Dispatches 
     3# cluster jobs and regulates when and how much each user can use his account on 
     4# any of the cluster node workstations. 
    85# 
    9 # Copyright (C) 2006 by Edwin A. Suominen, http://www.eepatents.com 
     6# Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 
    107# 
    118# This program is free software; you can redistribute it and/or modify it under 
     
    6360        d.addCallback(checkResult) 
    6461        return d 
    65  
     62     
    6663    def testDetached(self): 
     64        self.p.nodeClient = True 
    6765        self.p.ID = 234 
    6866        self.p.detached() 
  • projects/AsynCluster/trunk/asyncluster/ndm/client.py

    r122 r123  
    3939 
    4040def checkTrust(f): 
    41     if f.im_self.trusted: 
    42         return f 
    43     raise jobs.TrustError() 
     41    """ 
     42    Decorates methods so that they only run if their class instance has a 
     43    I{trusted} attribute that is set C{True}. 
     44    """ 
     45    def wrapper(self, *args, **kw): 
     46        if self.trusted: 
     47            return f(self, *args, **kw) 
     48        raise jobs.TrustError() 
     49 
     50    wrapper.__name__ = f.__name__ 
     51    return wrapper 
    4452 
    4553 
     
    135143        return values 
    136144 
    137     def _workersRunning(self): 
    138         """ 
    139         Returns the number of child worker processes currently running on the 
    140         client node. 
    141         """ 
    142         count = 0 
     145    def _pids(self): 
     146        """ 
     147        Returns a list of PIDs of all child worker processes currently running 
     148        on the client node. 
     149        """ 
     150        pids = [] 
    143151        for subdir in os.listdir("/proc/"): 
    144152            if not subdir.isdigit(): 
     
    155163                continue 
    156164            if cmdline[2] == self.workerCmd: 
    157                 count += 1 
    158         return count 
     165                pids.append(int(subdir)) 
     166        return pids 
    159167 
    160168    @checkTrust 
    161     def remote_spawnWorkers(self): 
     169    def remote_spawnWorkers(self, restart=False): 
    162170        """ 
    163171        Spawns child processes as needed to keep one child worker client 
    164172        running for each CPU core of the node. 
    165         """ 
    166         N = len(self._mips()) - self._workersRunning() 
     173 
     174        If the I{restart} keyword is set C{True}, kills any child processes 
     175        currently running and then spawns one new child process per CPU core. 
     176        """ 
     177        pids = self._pids() 
     178        N = len(self._mips()) 
     179        if restart: 
     180            for pid in pids: 
     181                os.kill(pid, signal.SIGHUP) 
     182        else: 
     183            N -= len(pids) 
    167184        for k in xrange(N): 
    168185            os.spawnl(os.P_NOWAIT, PYTHON, PYTHON, "-c", self.workerCmd) 
    169      
    170     @checkTrust 
    171     def remote_kill(self, *pids): 
    172         """ 
    173         Kills the processes corresponding to the PID(s) supplied as one or more 
    174         integer arguments. 
    175         """ 
    176         for pid in pids: 
    177             os.kill(pid, signal.SIGHUP) 
    178  
    179186     
    180187 
  • projects/AsynCluster/trunk/asyncluster/ndm/test/test_client.py

    r119 r123  
    99# to the workstation users. 
    1010# 
    11 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com 
     11# Copyright (C) 2006-2008 by Edwin A. Suominen, http://www.eepatents.com 
    1212# 
    1313# This program is free software; you can redistribute it and/or modify it under 
     
    2929 
    3030import os, signal 
    31 from twisted.internet import defer, utils 
     31from twisted.internet import defer, utils, reactor 
    3232from twisted.trial.unittest import TestCase 
    3333 
     
    3535 
    3636 
    37 class Test_Client(TestCase): 
     37def deferToDelay(delay=0.3): 
     38    d = defer.Deferred() 
     39    reactor.callLater(delay, d.callback, None) 
     40    return d 
     41 
     42 
     43class Test_SessionRoot(TestCase): 
    3844    def setUp(self): 
    39         self.client = client.ClientRoot(None, "foo") 
     45        self.root = client.SessionRoot("foo", None) 
     46        self.root.trusted = True 
    4047 
    4148    def _pids(self, *null): 
     
    6572        d = self._pids() 
    6673        d.addCallback(lambda x: setattr(self, '_count', len(x))) 
    67         d.addCallback(lambda _: self.client.remote_bash(script)) 
     74        d.addCallback(lambda _: self.root.remote_bash(script)) 
    6875        d.addCallback(self.failUnlessEqual, True) 
    6976        d.addCallback(self._pids) 
     
    7279 
    7380    def test_mips(self): 
    74         def gotResult(mips): 
    75             print mips 
    76             self.failUnless(len(mips) >= 1) 
     81        mips = self.root._mips() 
     82        self.failUnless(len(mips) >= 1) 
     83        self.failUnless(isinstance(mips[0], float)) 
     84 
     85    def test_pids(self): 
     86        def next(null): 
     87            pidsAfter = [x for x in self.root._pids() if x not in pidsBefore] 
     88            self.failUnlessEqual(pidsAfter, [pid]) 
     89            os.kill(pid, signal.SIGHUP) 
    7790         
    78         d = defer.maybeDeferred(self.client.remote_mips) 
    79         d.addCallback(gotResult) 
    80         return d 
     91        pidsBefore = self.root._pids() 
     92        pid = os.spawnl( 
     93            os.P_NOWAIT, 
     94            client.PYTHON, client.PYTHON, "-c", self.root.workerCmd) 
     95        return deferToDelay().addCallback(next) 
     96 
     97    def test_spawnWorkers_normal(self): 
     98        def first(null): 
     99            # Running spawn should start a new worker per core 
     100            self.root.remote_spawnWorkers() 
     101            return deferToDelay().addCallback(second) 
     102 
     103        def second(null): 
     104            pids = self.root._pids() 
     105            self.failUnlessEqual(len(pids), len(self.root._mips())) 
     106            # Running spawn again shouldn't do anything 
     107            self.root.remote_spawnWorkers() 
     108            self.failUnlessEqual(pids, self.root._pids()) 
     109         
     110        for pid in self.root._pids(): 
     111            os.kill(pid, signal.SIGHUP) 
     112        return deferToDelay().addCallback(first) 
     113     
     114    def test_spawnWorkers_restart(self): 
     115        def first(null): 
     116            # Running spawn again with restart should result in all new workers 
     117            self.root.remote_spawnWorkers(restart=True) 
     118            return deferToDelay().addCallback(second) 
     119 
     120        def second(null): 
     121            for pid in self.root._pids(): 
     122                self.failIf(pid in pidsA) 
     123 
     124        # The worker(s) after the first spawn 
     125        self.root.remote_spawnWorkers() 
     126        pidsA = self.root._pids() 
     127        return deferToDelay().addCallback(first) 
     128