OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # | |
3 | |
4 import base64 | |
5 import uuid | |
6 import unittest | |
7 | |
8 # websocket-client | |
9 import websocket as ws | |
10 | |
11 TRACABLE=False | |
12 | |
13 def create_mask_key(n): | |
14 return "abcd" | |
15 | |
16 class StringSockMock: | |
17 def __init__(self): | |
18 self.set_data("") | |
19 self.sent = [] | |
20 | |
21 def set_data(self, data): | |
22 self.data = data | |
23 self.pos = 0 | |
24 self.len = len(data) | |
25 | |
26 def recv(self, bufsize): | |
27 if self.len < self.pos: | |
28 return | |
29 buf = self.data[self.pos: self.pos + bufsize] | |
30 self.pos += bufsize | |
31 return buf | |
32 | |
33 def send(self, data): | |
34 self.sent.append(data) | |
35 | |
36 | |
37 class HeaderSockMock(StringSockMock): | |
38 def __init__(self, fname): | |
39 self.set_data(open(fname).read()) | |
40 self.sent = [] | |
41 | |
42 | |
43 class WebSocketTest(unittest.TestCase): | |
44 def setUp(self): | |
45 ws.enableTrace(TRACABLE) | |
46 | |
47 def tearDown(self): | |
48 pass | |
49 | |
50 def testDefaultTimeout(self): | |
51 self.assertEquals(ws.getdefaulttimeout(), None) | |
52 ws.setdefaulttimeout(10) | |
53 self.assertEquals(ws.getdefaulttimeout(), 10) | |
54 ws.setdefaulttimeout(None) | |
55 | |
56 def testParseUrl(self): | |
57 p = ws._parse_url("ws://www.example.com/r") | |
58 self.assertEquals(p[0], "www.example.com") | |
59 self.assertEquals(p[1], 80) | |
60 self.assertEquals(p[2], "/r") | |
61 self.assertEquals(p[3], False) | |
62 | |
63 p = ws._parse_url("ws://www.example.com/r/") | |
64 self.assertEquals(p[0], "www.example.com") | |
65 self.assertEquals(p[1], 80) | |
66 self.assertEquals(p[2], "/r/") | |
67 self.assertEquals(p[3], False) | |
68 | |
69 p = ws._parse_url("ws://www.example.com/") | |
70 self.assertEquals(p[0], "www.example.com") | |
71 self.assertEquals(p[1], 80) | |
72 self.assertEquals(p[2], "/") | |
73 self.assertEquals(p[3], False) | |
74 | |
75 p = ws._parse_url("ws://www.example.com") | |
76 self.assertEquals(p[0], "www.example.com") | |
77 self.assertEquals(p[1], 80) | |
78 self.assertEquals(p[2], "/") | |
79 self.assertEquals(p[3], False) | |
80 | |
81 p = ws._parse_url("ws://www.example.com:8080/r") | |
82 self.assertEquals(p[0], "www.example.com") | |
83 self.assertEquals(p[1], 8080) | |
84 self.assertEquals(p[2], "/r") | |
85 self.assertEquals(p[3], False) | |
86 | |
87 p = ws._parse_url("ws://www.example.com:8080/") | |
88 self.assertEquals(p[0], "www.example.com") | |
89 self.assertEquals(p[1], 8080) | |
90 self.assertEquals(p[2], "/") | |
91 self.assertEquals(p[3], False) | |
92 | |
93 p = ws._parse_url("ws://www.example.com:8080") | |
94 self.assertEquals(p[0], "www.example.com") | |
95 self.assertEquals(p[1], 8080) | |
96 self.assertEquals(p[2], "/") | |
97 self.assertEquals(p[3], False) | |
98 | |
99 p = ws._parse_url("wss://www.example.com:8080/r") | |
100 self.assertEquals(p[0], "www.example.com") | |
101 self.assertEquals(p[1], 8080) | |
102 self.assertEquals(p[2], "/r") | |
103 self.assertEquals(p[3], True) | |
104 | |
105 p = ws._parse_url("wss://www.example.com:8080/r?key=value") | |
106 self.assertEquals(p[0], "www.example.com") | |
107 self.assertEquals(p[1], 8080) | |
108 self.assertEquals(p[2], "/r?key=value") | |
109 self.assertEquals(p[3], True) | |
110 | |
111 self.assertRaises(ValueError, ws._parse_url, "http://www.example.com/r") | |
112 | |
113 def testWSKey(self): | |
114 key = ws._create_sec_websocket_key() | |
115 self.assert_(key != 24) | |
116 self.assert_("¥n" not in key) | |
117 | |
118 def testWsUtils(self): | |
119 sock = ws.WebSocket() | |
120 | |
121 key = "c6b8hTg4EeGb2gQMztV1/g==" | |
122 required_header = { | |
123 "upgrade": "websocket", | |
124 "connection": "upgrade", | |
125 "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=", | |
126 } | |
127 self.assertEquals(sock._validate_header(required_header, key), True) | |
128 | |
129 header = required_header.copy() | |
130 header["upgrade"] = "http" | |
131 self.assertEquals(sock._validate_header(header, key), False) | |
132 del header["upgrade"] | |
133 self.assertEquals(sock._validate_header(header, key), False) | |
134 | |
135 header = required_header.copy() | |
136 header["connection"] = "something" | |
137 self.assertEquals(sock._validate_header(header, key), False) | |
138 del header["connection"] | |
139 self.assertEquals(sock._validate_header(header, key), False) | |
140 | |
141 | |
142 header = required_header.copy() | |
143 header["sec-websocket-accept"] = "something" | |
144 self.assertEquals(sock._validate_header(header, key), False) | |
145 del header["sec-websocket-accept"] | |
146 self.assertEquals(sock._validate_header(header, key), False) | |
147 | |
148 def testReadHeader(self): | |
149 sock = ws.WebSocket() | |
150 sock.io_sock = sock.sock = HeaderSockMock("data/header01.txt") | |
151 status, header = sock._read_headers() | |
152 self.assertEquals(status, 101) | |
153 self.assertEquals(header["connection"], "upgrade") | |
154 | |
155 sock.io_sock = sock.sock = HeaderSockMock("data/header02.txt") | |
156 self.assertRaises(ws.WebSocketException, sock._read_headers) | |
157 | |
158 def testSend(self): | |
159 # TODO: add longer frame data | |
160 sock = ws.WebSocket() | |
161 sock.set_mask_key(create_mask_key) | |
162 s = sock.io_sock = sock.sock = HeaderSockMock("data/header01.txt") | |
163 sock.send("Hello") | |
164 self.assertEquals(s.sent[0], "\x81\x85abcd)\x07\x0f\x08\x0e") | |
165 | |
166 sock.send("こんにちは") | |
167 self.assertEquals(s.sent[1], "\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\x
e5\xca\x81\xe2\xc5\x82\xe3\xcc") | |
168 | |
169 sock.send(u"こんにちは") | |
170 self.assertEquals(s.sent[1], "\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\x
e5\xca\x81\xe2\xc5\x82\xe3\xcc") | |
171 | |
172 def testRecv(self): | |
173 # TODO: add longer frame data | |
174 sock = ws.WebSocket() | |
175 s = sock.io_sock = sock.sock = StringSockMock() | |
176 s.set_data("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5
\x82\xe3\xcc") | |
177 data = sock.recv() | |
178 self.assertEquals(data, "こんにちは") | |
179 | |
180 s.set_data("\x81\x85abcd)\x07\x0f\x08\x0e") | |
181 data = sock.recv() | |
182 self.assertEquals(data, "Hello") | |
183 | |
184 def testWebSocket(self): | |
185 s = ws.create_connection("ws://echo.websocket.org/") #ws://localhost:80
80/echo") | |
186 self.assertNotEquals(s, None) | |
187 s.send("Hello, World") | |
188 result = s.recv() | |
189 self.assertEquals(result, "Hello, World") | |
190 | |
191 s.send("こにゃにゃちは、世界") | |
192 result = s.recv() | |
193 self.assertEquals(result, "こにゃにゃちは、世界") | |
194 s.close() | |
195 | |
196 def testPingPong(self): | |
197 s = ws.create_connection("ws://echo.websocket.org/") | |
198 self.assertNotEquals(s, None) | |
199 s.ping("Hello") | |
200 s.pong("Hi") | |
201 s.close() | |
202 | |
203 def testSecureWebSocket(self): | |
204 s = ws.create_connection("wss://echo.websocket.org/") | |
205 self.assertNotEquals(s, None) | |
206 self.assert_(isinstance(s.io_sock, ws._SSLSocketWrapper)) | |
207 s.send("Hello, World") | |
208 result = s.recv() | |
209 self.assertEquals(result, "Hello, World") | |
210 s.send("こにゃにゃちは、世界") | |
211 result = s.recv() | |
212 self.assertEquals(result, "こにゃにゃちは、世界") | |
213 s.close() | |
214 | |
215 def testWebSocketWihtCustomHeader(self): | |
216 s = ws.create_connection("ws://echo.websocket.org/", | |
217 headers={"User-Agent": "PythonWebsocketClient"
}) | |
218 self.assertNotEquals(s, None) | |
219 s.send("Hello, World") | |
220 result = s.recv() | |
221 self.assertEquals(result, "Hello, World") | |
222 s.close() | |
223 | |
224 def testAfterClose(self): | |
225 from socket import error | |
226 s = ws.create_connection("ws://echo.websocket.org/") | |
227 self.assertNotEquals(s, None) | |
228 s.close() | |
229 self.assertRaises(error, s.send, "Hello") | |
230 self.assertRaises(error, s.recv) | |
231 | |
232 def testUUID4(self): | |
233 """ WebSocket key should be a UUID4. | |
234 """ | |
235 key = ws._create_sec_websocket_key() | |
236 u = uuid.UUID(bytes=base64.b64decode(key)) | |
237 self.assertEquals(4, u.version) | |
238 | |
239 class WebSocketAppTest(unittest.TestCase): | |
240 | |
241 class NotSetYet(object): | |
242 """ A marker class for signalling that a value hasn't been set yet. | |
243 """ | |
244 | |
245 def setUp(self): | |
246 ws.enableTrace(TRACABLE) | |
247 | |
248 WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() | |
249 WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() | |
250 WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() | |
251 | |
252 def tearDown(self): | |
253 | |
254 WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() | |
255 WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() | |
256 WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() | |
257 | |
258 def testKeepRunning(self): | |
259 """ A WebSocketApp should keep running as long as its self.keep_running | |
260 is not False (in the boolean context). | |
261 """ | |
262 | |
263 def on_open(self, *args, **kwargs): | |
264 """ Set the keep_running flag for later inspection and immediately | |
265 close the connection. | |
266 """ | |
267 WebSocketAppTest.keep_running_open = self.keep_running | |
268 self.close() | |
269 | |
270 def on_close(self, *args, **kwargs): | |
271 """ Set the keep_running flag for the test to use. | |
272 """ | |
273 WebSocketAppTest.keep_running_close = self.keep_running | |
274 | |
275 app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_cl
ose=on_close) | |
276 app.run_forever() | |
277 | |
278 self.assertFalse(isinstance(WebSocketAppTest.keep_running_open, | |
279 WebSocketAppTest.NotSetYet)) | |
280 | |
281 self.assertFalse(isinstance(WebSocketAppTest.keep_running_close, | |
282 WebSocketAppTest.NotSetYet)) | |
283 | |
284 self.assertEquals(True, WebSocketAppTest.keep_running_open) | |
285 self.assertEquals(False, WebSocketAppTest.keep_running_close) | |
286 | |
287 def testSockMaskKey(self): | |
288 """ A WebSocketApp should forward the received mask_key function down | |
289 to the actual socket. | |
290 """ | |
291 | |
292 def my_mask_key_func(): | |
293 pass | |
294 | |
295 def on_open(self, *args, **kwargs): | |
296 """ Set the value so the test can use it later on and immediately | |
297 close the connection. | |
298 """ | |
299 WebSocketAppTest.get_mask_key_id = id(self.get_mask_key) | |
300 self.close() | |
301 | |
302 app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_m
ask_key=my_mask_key_func) | |
303 app.run_forever() | |
304 | |
305 # Note: We can't use 'is' for comparing the functions directly, need to
use 'id'. | |
306 self.assertEquals(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func)
) | |
307 | |
308 | |
309 if __name__ == "__main__": | |
310 unittest.main() | |
311 | |
OLD | NEW |