root/projects/WinDictator/trunk/test/observer.py

Revision 19, 9.2 kB (checked in by edsuom, 1 year ago)

Importing windictator into the global FOSS repo

Line 
1 # WinDictator:
2 # Dictate in Windows, have the text typed in Linux via X faked keystroke events
3 #
4 # Copyright (C) 2005-2006 by Edwin A. Suominen, http://www.eepatents.com
5 #
6 # This program is free software; you can redistribute it and/or modify it under
7 # the terms of the GNU General Public License as published by the Free Software
8 # Foundation; either version 2 of the License, or (at your option) any later
9 # version.
10 #
11 # This program is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
14 #
15 # You should have received a copy of the GNU General Public License along with
16 # this program; if not, write to the Free Software Foundation, Inc., 51
17 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19
20 """
21 Unit tests for windicator.linux.observer
22 """
23
24 import os, time
25 from twisted.internet import defer, task, reactor
26 from twisted.trial.unittest import TestCase
27
28 import windictator.linux.observer as observer
29 import mock
30
31 REAL_BINARY = observer.BINARY
32 DELAY = 0.1
33 # Interval of 0.1 sec between wait loop iterations
34 WAIT_INTERVAL = 0.05
35 # No more than 15 iterations, or 1.5 sec of waiting
36 MAX_INTERVALS = 30
37 # Key events "typed" less than 0.7 sec after starting xnee don't register for
38 # some reason.
39 FIRST_TYPING_WAIT = 0.7
40                        
41
42 class TestKeyHistory(TestCase):
43     def setUp(self):
44         self.fakeStartTime = divmod(int(500*time.time()),2**32)[1]
45         self.fakeTimeCount = 0
46         self.history = observer.KeyHistory(16)
47
48     def fakeTime(self):
49         self.fakeTimeCount += 1
50         return self.fakeStartTime + 300*self.fakeTimeCount
51
52     def putStuff(self):
53         toPut = (
54             ((38,), True),
55             ((56, 50), False),
56             ((54,), False),
57             ((40, 50), True))
58         for keyCombo, isRealTyping in toPut:
59             self.history.put(keyCombo, isRealTyping, self.fakeTime())
60
61     def testFakeVsRealBuffer(self):
62         self.putStuff()
63         # Real buffer
64         values = self.history.realHistory.values()
65         values.sort()
66         self.failUnlessEqual(values, [(38,), (40,50)])
67         # Fake buffer
68         values = self.history.fakeHistory.values()
69         values.sort()
70         self.failUnlessEqual(values, [(54,), (56,50)])
71
72     def testGetKeys(self):
73         self.putStuff()
74         for N in (4,100):
75             self.failUnlessEqual(
76                 self.history.getKeys(N), [(38,), (56,50), (54,), (40,50)])
77         self.failUnlessEqual(
78             self.history.getKeys(2), [(54,), (40,50)])
79         self.failUnlessEqual(
80             self.history.getKeys(3, realOnly=True), [(38,), (40,50)])
81         self.failUnlessEqual(
82             self.history.getKeys(1, fakeOnly=True), [(54,)])
83
84     def testGracefulOverflow(self):
85         for k in xrange(10):
86             # Putting 40 items total, lots of overflow going on
87             self.putStuff()
88         lastSixExpected = [(54,), (40,50), (38,), (56,50), (54,), (40,50)]
89         self.failUnlessEqual(
90             self.history.getKeys(6), lastSixExpected)
91
92     def testSameTimeValues(self):
93         timeValue = self.fakeTime()
94         self.history.put((38,), True, timeValue)
95         self.history.put((54,), True, timeValue)
96         values = self.history.realHistory.values()
97         values.sort()
98         self.failUnlessEqual(values, [(38,), (54,)])
99
100     def testReportKeyEvent(self):
101         # keycode, isKeyPress, isRealTyping, timeReported
102         keyEvents = (
103             (38, True),
104             (38, False),
105             (50, True),
106             (56, True),
107             (56, False),
108             (50, False),
109             (54, True),
110             (54, False)
111             )
112         for keycode, isKeypress in keyEvents:
113             self.history.reportKeyEvent(
114                 keycode, isKeypress, True, self.fakeTime())
115         timeKeys = self.history.realHistory.keys()
116         timeKeys.sort()
117         for k, shouldBe in enumerate([(38,), (50,56), (54,)]):
118             value = self.history.realHistory[timeKeys[k]]
119             self.failUnless(
120                 [True for x in shouldBe if x in value],
121                 "Keycode combo '%s' doesn't match expected combination '%s'" \
122                 % (value, shouldBe))
123
124
125 class LoopForReport:
126     def __init__(self, history):
127         self.count = 0
128         self.events = history.events
129         self.loop = task.LoopingCall(self.checkIfDone)
130
131     def start(self, N):
132         self.N = N
133         return self.loop.start(WAIT_INTERVAL)
134
135     def checkIfDone(self):
136         self.count += 1
137         if self.count >= MAX_INTERVALS or len(self.events) >= self.N:
138             self.loop.stop()
139
140
141 class TestObserverInVitro(TestCase):
142     def setUp(self):
143         observer.BINARY = ("../test/mockxnee.py",)
144         self.mockHistory = mock.MockHistory()
145         self.observer = observer.Observer(self.mockHistory)
146         self.loop = LoopForReport(self.mockHistory)
147
148     def tearDown(self):
149         self.observer.shutdown()
150
151     def testRealSingle(self):
152         def started(null):
153             self.observer.process.write("d 38\n")
154
155         def done(null):
156             # Event tuple:
157             # (keycode, isKeyPress, isRealTyping, timeReported)
158             self.failUnlessEqual(
159                 self.mockHistory.events[0][0:3],
160                 (38, True, True))
161            
162         d1 = self.loop.start(1)
163         d2 = self.observer.startup()
164         d2.addCallback(started)
165         return defer.DeferredList([d1,d2]).addCallback(done)
166
167     def testRealMultiple(self):
168         cmdList = (
169             ("d", 38), ("d", 50), ("u", 50), ("u", 38),
170             ("d", 56), ("u", 56)
171             )
172         def started(null):
173             for cmd in cmdList:
174                 self.observer.process.write("%s %d\n" % cmd)
175
176         def done(null):
177             # Event tuple:
178             # (keycode, isKeyPress, isRealTyping, timeReported)
179             for k, event in enumerate(self.mockHistory.events):
180                 cmd = cmdList[k]
181                 self.failUnlessEqual(
182                     event[0:3], (cmd[1], (cmd[0] == "d"), True))
183
184         d1 = self.loop.start(len(cmdList))
185         d2 = self.observer.startup()
186         d2.addCallback(started)
187         return defer.DeferredList([d1,d2]).addCallback(done)
188
189     def testFakeSingle(self):
190         def started(null):
191             d = self.observer.confirm(38)
192             self.observer.process.write("d 38\n")
193             return d
194
195         def done(null):
196             # Event tuple:
197             # (keycode, isKeyPress, isRealTyping, timeReported)
198             self.failUnlessEqual(
199                 self.mockHistory.events[0][0:3],
200                 (38, True, False))
201            
202         d1 = self.loop.start(1)
203         d2 = self.observer.startup()
204         d2.addCallback(started)
205         return defer.DeferredList([d1,d2]).addCallback(done)
206    
207     def testMixedMultiple(self):
208         cmdList = (
209             ("d", 38), ("u", 38),                               # a
210             ("d", 56), ("d", 50), ("u", 50), ("u", 56),         # B
211             ("d", 54), ("u", 54),                               # c
212             ("d", 40), ("u", 40)                                # d
213             )
214         def started(null):
215             dList = [self.observer.confirm(54), self.observer.confirm(54)]
216             for cmd in cmdList:
217                 self.observer.process.write("%s %d\n" % cmd)
218             return defer.DeferredList(dList)
219
220         def done(null):
221             # Event tuple:
222             # (keycode, isKeyPress, isRealTyping, timeReported)
223             for k, event in enumerate(self.mockHistory.events):
224                 cmd = cmdList[k]
225                 shouldBeReal = (cmd[1] != 54)
226                 self.failUnlessEqual(
227                     event[0:3], (cmd[1], (cmd[0] == "d"), shouldBeReal))
228        
229         d1 = self.loop.start(len(cmdList))
230         d2 = self.observer.startup()
231         d2.addCallback(started)
232         return defer.DeferredList([d1,d2]).addCallback(done)
233
234
235 class TestObserverInVivo(TestCase):
236     def setUp(self):
237         import windictator.linux.keyer as keyer
238
239         observer.BINARY = REAL_BINARY
240         self.keyer = keyer.Keyer()
241         self.mockHistory = mock.MockHistory()
242         self.observer = observer.Observer(self.mockHistory)
243         self.loop = LoopForReport(self.mockHistory)
244
245     def tearDown(self):
246         self.keyer.shutdown()
247         self.observer.shutdown()
248
249     def testRealSingle(self):
250         def started(null):
251             d = defer.Deferred()
252             d.addCallback(pressing)
253             reactor.callLater(FIRST_TYPING_WAIT, d.callback, None)
254             return d
255
256         def pressing(null):
257             d = self.keyer.pressKey('a')
258             d.addCallback(pressed)
259             return d
260        
261         def pressed(null):
262             d = self.keyer.releaseKey('a')
263             return d
264        
265         def done(null):
266             # Event tuple:
267             # (keycode, isKeyPress, isRealTyping, timeReported)
268             self.failUnlessEqual(
269                 self.mockHistory.events[0][0:3], (38, True, True))
270             self.failUnlessEqual(
271                 self.mockHistory.events[1][0:3], (38, False, True))
272
273         d1 = self.loop.start(2)
274         d2 = self.observer.startup()
275         d2.addCallback(started)
276         return defer.DeferredList([d1,d2]).addCallback(done)
Note: See TracBrowser for help on using the browser.