root/projects/AsynCluster/trunk/asyncluster/master/jobs.py

Revision 115, 6.0 kB (checked in by edsuom, 1 year ago)

Misc tweaks

Line 
1 # AsynCluster: Master
2 # A cluster management server based on Twisted's Perspective Broker. Dispatches
3 # cluster jobs and regulates when and how much each user can use his account on
4 # any of the cluster node workstations.
5 #
6 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
7 #
8 # This program is free software; you can redistribute it and/or modify it under
9 # the terms of the GNU General Public License as published by the Free Software
10 # Foundation; either version 2 of the License, or (at your option) any later
11 # version.
12 #
13 # This program is distributed in the hope that it will be useful, but WITHOUT
14 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
16 #
17 # You should have received a copy of the GNU General Public License along with
18 # this program; if not, write to the Free Software Foundation, Inc., 51
19 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
20
21 """
22 Running jobs via the Master's UNIX socket PB interface.
23 """
24
25 import os.path, textwrap
26
27 from zope.interface import implements
28 from twisted.internet import defer, reactor
29 from twisted.spread import pb
30
31 from asynqueue import workers, TaskQueue
32
33
34 class JobClient(object):
35     """
36     I connect to the master TCP server via a UNIX socket and provide an
37     interface for running jobs defined by a Python code file.
38
39     @ivar root: A remote reference to the root object provided by the server.
40     
41     """
42     root = None
43    
44     def __init__(self, socket, codePath=None, codeString=None):
45         if not os.path.exists(socket):
46             raise RuntimeError("No UNIX socket available at '%s'" % socket)
47         self.socket = socket
48         if codeString is None and codePath is None:
49             raise RuntimeError(
50                 "You must specify either a file or a string containing "+\
51                 "Python source for the job.")
52         if codeString is None:
53             if not os.path.exists(codePath):
54                 raise RuntimeError("No code file available at '%s'" % codePath)
55             fh = open(codePath)
56             codeString = fh.read()
57             fh.close()
58         self.jobCode = textwrap.dedent(codeString)
59    
60     def startup(self):
61         """
62         Makes the UNIX socket connection, storing a remote reference to the
63         server's control root object if the connection is successful. Returns a
64         deferred that fires C{True} if so, and I am thus ready to accept
65         commands as a result, or C{False} otherwise.
66         """
67         def gotAnswer(answer):
68             if pb.IUnjellyable.providedBy(answer):
69                 self.root = answer
70                 d = self.root.callRemote('newJob', self.jobCode)
71                 d.addCallback(gotID)
72                 return d
73             return False
74
75         def gotID(jobID):
76             if jobID:
77                 self.jobID = jobID
78                 return True
79             return False
80
81         def gotFinalStatus(success):
82             if not success:
83                 self.client.disconnect()
84             return success
85
86         factory = pb.PBClientFactory()
87         self.connector = reactor.connectUNIX(self.socket, factory)
88         d = factory.getRootObject()
89         d.addBoth(gotAnswer)
90         d.addCallback(gotFinalStatus)
91         return d
92
93     def shutdown(self):
94         """
95         Disconnects from the master TCP server, returning a deferred that fires
96         when the disconnection is complete. Before the TCP disconnection
97         occurs, any jobs that are running are allowed to finish and any active
98         session is ended.
99         """
100         def doneCanceling(null):
101             self.root = None
102             self.connector.disconnect()
103        
104         jobID = getattr(self, 'jobID', None)
105         if jobID is None:
106             d = defer.succeed(None)
107         else:
108             d = self.root.callRemote('cancelJob', jobID)
109             d.addCallback(doneCanceling)
110         return d
111
112     def update(self, cmd, *args, **kw):
113         """
114         Arranges for a job update to be done on all present and future nodes
115         before they next run a job. Note that the update will apply to any jobs
116         currently queued up, too!
117
118         The update is done via the specified I{cmd} with any args and keywords
119         supplied.
120         """
121         if hasattr(self, 'jobID'):
122             return self.root.callRemote('updateJob', self.jobID, cmd, *args, **kw)
123         raise Exception("No job registered!")
124
125     def registerClasses(self, *args):
126         """
127         Instructs the controller to register the classes specified by the
128         argument(s) as self-unjellyable and allowable past PB security, and to
129         arrange an update to have its nodes do the same.
130
131         The classes are specified by their string representations::
132         
133             <package(s).module.class>
134
135         Use judiciously!
136         """
137         return self.root.callRemote('registerClasses', *args)
138
139     def run(self, cmd, *args, **kw):
140         """
141         Does a job run on the next available node with any args supplied.
142         """
143         if hasattr(self, 'jobID'):
144             return self.root.callRemote('runJob', self.jobID, cmd, *args, **kw)
145         raise Exception("No job registered!")
146
147
148 class JobWorker(workers.RemoteCallWorker):
149     """
150     Instantiate me with a started instance of L{JobClient} and I'll use its
151     root reference and job runner.
152     """
153     N = 30
154
155     def __init__(self, jobClient):
156         if not hasattr(jobClient, 'jobID'):
157             raise Exception("Supplied job client not started!")
158         self.client = jobClient
159         self.iQualified = [jobClient.jobID]
160         self.startup(jobClient.root)
161
162     def runNow(self, null, task):
163         cmd, args, kw = task.callTuple
164         d = self.client.run(cmd, *args, **kw)
165         job = (task, d)
166         self.jobs.append(job)
167         d.addCallback(self.doneTrying, job)
168         d.addErrback(self.oops)
169         # The task's deferred is NOT returned!
170
171     def stop(self):
172         d = workers.RemoteCallWorker.stop(self)
173         d.addCallback(lambda _: self.client.shutdown())
174         return d
Note: See TracBrowser for help on using the browser.