root/projects/WinDictator/trunk/windictator/linux/observer.py

Revision 19, 11.3 kB (checked in by edsuom, 1 year ago)

Importing windictator into the global FOSS repo

Line 
1 # WinDictator:
2 # Dictate in Windows, have the text typed in Linux via X faked keystroke events
3 #
4 # Copyright (C) 2005-2006 by Edwin A. Suominen, http://www.eepatents.com
5 #
6 # This program is free software; you can redistribute it and/or modify it under
7 # the terms of the GNU General Public License as published by the Free Software
8 # Foundation; either version 2 of the License, or (at your option) any later
9 # version.
10 #
11 # This program is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
14 #
15 # You should have received a copy of the GNU General Public License along with
16 # this program; if not, write to the Free Software Foundation, Inc., 51
17 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19
20 """
21 Keystroke observer
22 """
23
24 import os, re
25 from twisted.internet import defer, reactor
26 import protocol, keycodes
27
28 BINARY = ("/usr/local/bin/cnee",
29           "--record", "--keyboard", "--events-to-record", "-1")
30 HISTORY_LENGTH = 128
31 MAX_WAIT = 1.0
32
33
34 class KeyHistory(object):
35     """
36     I maintain a seekable history of key events.
37     """
38     def __init__(self, N=HISTORY_LENGTH):
39         self.N = N
40         self.position = 0
41         self.keysDown = {}
42         self.realHistory = {}
43         self.fakeHistory = {}
44         self.keyDecoder = keycodes.KeyDecoder()
45
46     def backspace(self, N):
47         """
48         Deletes the most recent I{N} events that are in my combined history
49         buffers.
50         """
51         real, fake = self.realHistory, self.fakeHistory
52         N_limited = min([N, self.N, len(real)+len(fake)])
53         timeDict = dict.fromkeys(real.keys(), True)
54         timeDict.update(dict.fromkeys(fake.keys(), False))
55         timeList = timeDict.keys()
56         timeList.sort()
57         for key in timeList[-N_limited:]:
58             if timeDict[key]:
59                 whichHistory = real
60             else:
61                 whichHistory = fake
62             del whichHistory[key]
63
64     def reportKeyEvent(self, keycode, isKeyPress, isRealTyping, timeReported):
65         """
66         Reports observation of a new keying event with I{keycode}, indicating:
67
68             - if it I{isKeyPress} (C{True}) or, if C{False}, a key release;
69
70             - if I{isRealTyping} (C{True}) or, if C{False}, if it is a
71               combination of faked key events from dictation;
72
73             - the I{timeReported} of the event.
74             
75         """
76         if isKeyPress:
77             self.currentEventIsReal = isRealTyping
78             self.keysDown[keycode] = None
79         elif keycode in self.keysDown:
80             self.keysDown[keycode] = timeReported
81         if len(self.keysDown) and None not in self.keysDown.values():
82             # All pressed keys have now been released
83             timeCompleted = max(self.keysDown.values())
84             keyCombo = self.keysDown.keys()
85             self.put(keyCombo, self.currentEventIsReal, timeCompleted)
86             self.keysDown.clear()
87
88     def put(self, keyCombo, isRealTyping, timeCompleted):
89         """
90         Puts a new key code combination into my history buffer, indicating:
91
92             - if it I{isRealTyping} (C{True}) or, if C{False}, if it is a
93               combination of faked key events from dictation;
94
95             - the I{timeCompleted} of the key events in the combination.
96             
97         """
98         # Make a local reference to the right history buffer for this call
99         if isRealTyping:
100             history = self.realHistory
101         else:
102             history = self.fakeHistory
103         # Make sure we don't overwrite an existing buffer item due to integer
104         # time value being the same for very quick succession of key events
105         while timeCompleted in history:
106             timeCompleted += 1
107         # Write the new key event to the buffer
108         history[timeCompleted] = keyCombo
109         # Keep the total real + fake buffer size at N, doing the rotation on
110         # the current buffer if necessary.
111         if len(self.realHistory) + len(self.fakeHistory) > self.N:
112             oldestKey = min(history.keys())
113             del history[oldestKey]
114         # All done
115
116     def getKeys(self, N, realOnly=False, fakeOnly=False):
117         """
118         Get a list of the I{N} key combinations before the current position in
119         my history buffer. You can specify I{realOnly} or I{fakeOnly} by
120         setting one of those options C{True}.
121
122         The history buffer is of limited size, and values of I{N} in excess of
123         that size will only return the full buffer contents. Also, only as many
124         items as are in the buffer up to the current position will be returned;
125         there is no wrap-around.
126         """
127         history = {}
128         if not realOnly:
129             history.update(self.fakeHistory)
130         if not fakeOnly:
131             history.update(self.realHistory)
132         N_limited = min([N, self.N, len(history)])
133         timeList = history.keys()
134         timeList.sort()
135         return [history[x] for x in timeList[-N_limited:]]
136
137     def getText(self, N, realOnly=False, fakeOnly=False):
138         """
139         Returns a string of the text typed via the past I{N} key combinations
140         from my history buffer. You can specify I{realOnly} or I{fakeOnly} by
141         setting one of those options C{True}.
142
143         The history buffer is of limited size, and increased values of I{N}
144         will not return more text after a certain point. Also, only as many
145         items as are in the buffer up to the current position will be returned;
146         there is no wrap-around.
147         """
148         charList = [self.keyDecoder.keyComboToText(x)
149                     for x in
150                     self.getKeys(N, realOnly=realOnly, fakeOnly=fakeOnly)]
151         return ''.join(charList)
152
153
154 class Observer:
155     """
156     I observe what keystrokes have been generated (real and fake alike) using
157     the I{xnee} process.
158     """
159     def __init__(self, history=None):
160         self.dStop = defer.Deferred()
161         self.re = re.compile(r'^0,[23],(0,){3}[1-9][0-9]*,0,[0-9]+')
162         self.history = history
163         self.confirmationRequestFIFO = []
164
165     def __del__(self):
166         self.shutdown()
167
168     def startup(self, *null):
169         """
170         Opens a pipe to the I{xnee} process, which reports each observed
171         keystroke with a new line sent via stdout.
172         """
173         def ready(transport):
174             self.process = transport
175
176         def error(self, failure):
177             print "Error running xnee process:", failure.printTraceback()
178             return failure
179
180         self.confirmationRequestFIFO = []
181         d = defer.Deferred()
182         d.addCallbacks(ready, error)
183         p = protocol.ProcessProtocol(d, self.dStop, self.observed)
184         reactor.spawnProcess(
185             p, BINARY[0], ("xnee",) + BINARY[1:], env=os.environ)
186         return d
187
188     def shutdown(self, *null):
189         """
190         Shuts down the I{xnee} process in an orderly fashion.
191         """
192         if hasattr(self, 'process'):
193             try:
194                 self.process.signalProcess(2)
195                 self.process.loseConnection()
196             except:
197                 pass
198             del self.process
199             return self.dStop
200         else:
201             return defer.succeed(None)
202
203     def observed(self, line):
204         """
205         Callback for reporting new keystrokes observed.
206         """
207         # Got a report line...
208         if not self.re.match(line):
209             return
210         # ...and it's of a new keying event
211         keycode, isKeyPress, timeReported = self.lineParser(line)
212         if self.confirmationRequestFIFO:
213             # There's a confirmation request outstanding, so it may be a faked
214             # event.
215             firstRequest = self.confirmationRequestFIFO[0]
216             if keycode == firstRequest[0]:
217                 # Yes, it matches the oldest request.
218                 firstRequest[1].callback(True)
219                 self.confirmationRequestFIFO.pop(0)
220                 isRealTyping = False
221             else:
222                 isRealTyping = True
223         else:
224             # No confirmation requests are outstanding, so it must be a real
225             # event.
226             isRealTyping = True
227         # Report the keying event, real or fake, to whatever history object I
228         # am using
229         if self.history is not None:
230             self.history.reportKeyEvent(
231                 keycode, isKeyPress, isRealTyping, timeReported)
232
233     def lineParser(self, line):
234         """
235         Parses a line received from the xnee process, returning a tuple of
236         parsed data if the line reported on a keying event::
237
238             0: *
239             1: Keypress = '2', Release = '3'
240             2: *
241             3: *
242             4: *
243             5: Keycode
244             6: *
245             7: Time
246             -----------------------------------------------------
247             * Seems to be '0' for every keystroke-reporting line
248             
249         """
250         fields = [int(x) for x in line.strip('()').split(',')]
251         if fields[0] == 0 and fields[1] in (2,3):
252             keycode = fields[5]
253             isKeyPress = (fields[1] == 2)
254             timeReported = fields[7]
255             return keycode, isKeyPress, timeReported
256
257     def confirm(self, keycode):
258         """
259         Calling this method constitutes a request to confirm when the supplied
260         I{keycode} is observed as entered. It immediately returns a C{Deferred}
261         in response, which fires with C{True} if the keystroke is confirmed
262         after a reasonable amount of time or with C{False} if not.
263
264         Supplying something other than an integer as the I{keycode} will result
265         in a C{Deferred} that has fired with C{False}, since no keystroke can
266         be confirmed.
267         """
268         def cancelTimeout(result, callID):
269             callID.cancel()
270             return result
271
272         def processError(failure, callID):
273             print "OBSERVER ERROR, RESTARTING"
274             d = restart()
275             d.addCallback(lambda _: False)
276
277         def timeout(d_Confirm):
278             print "OBSERVER TIMEOUT, RESTARTING"
279             if not d_Confirm.called:
280                 self.confirmationRequestFIFO.pop(0)
281                 d_NewProcess = restart()
282                 # Fire the confirmation request's Deferred from the new process
283                 # startup's Deferred
284                 d_NewProcess.chainDeferred(d_Confirm)
285
286         def restart():
287             # The confirmation deferred is timing out, so the confirmation
288             # xnee process must have gone into zombie mode. Start a new one.
289             d = self.shutdown()
290             d.addCallback(self.startup)
291             # When the new process has started, report a failure to confirm
292             # the key event that inspired the process restart
293             d.addCallback(lambda _: False)
294             return d
295        
296         if isinstance(keycode, int):
297             # Let's return a Deferred to the confirmation...
298             d = defer.Deferred()
299             # ...that times out if it waits too long...
300             callID = reactor.callLater(MAX_WAIT, timeout, d)
301             d.addCallback(cancelTimeout, callID)
302             d.addErrback(processError, callID)
303             # ...and put the new confirmation request into the FIFO
304             confirmationRequest = (keycode, d)
305             self.confirmationRequestFIFO.append(confirmationRequest)
306         else:
307             # We can't confirm an invalid keycode
308             d = defer.succeed(False)
309         return d
310
311
312        
Note: See TracBrowser for help on using the browser.