OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # |
| 3 |
| 4 import base64 |
| 5 import uuid |
| 6 import unittest |
| 7 |
| 8 # websocket-client |
| 9 import websocket as ws |
| 10 |
| 11 TRACABLE=False |
| 12 |
| 13 def create_mask_key(n): |
| 14 return "abcd" |
| 15 |
| 16 class StringSockMock: |
| 17 def __init__(self): |
| 18 self.set_data("") |
| 19 self.sent = [] |
| 20 |
| 21 def set_data(self, data): |
| 22 self.data = data |
| 23 self.pos = 0 |
| 24 self.len = len(data) |
| 25 |
| 26 def recv(self, bufsize): |
| 27 if self.len < self.pos: |
| 28 return |
| 29 buf = self.data[self.pos: self.pos + bufsize] |
| 30 self.pos += bufsize |
| 31 return buf |
| 32 |
| 33 def send(self, data): |
| 34 self.sent.append(data) |
| 35 |
| 36 |
| 37 class HeaderSockMock(StringSockMock): |
| 38 def __init__(self, fname): |
| 39 self.set_data(open(fname).read()) |
| 40 self.sent = [] |
| 41 |
| 42 |
| 43 class WebSocketTest(unittest.TestCase): |
| 44 def setUp(self): |
| 45 ws.enableTrace(TRACABLE) |
| 46 |
| 47 def tearDown(self): |
| 48 pass |
| 49 |
| 50 def testDefaultTimeout(self): |
| 51 self.assertEquals(ws.getdefaulttimeout(), None) |
| 52 ws.setdefaulttimeout(10) |
| 53 self.assertEquals(ws.getdefaulttimeout(), 10) |
| 54 ws.setdefaulttimeout(None) |
| 55 |
| 56 def testParseUrl(self): |
| 57 p = ws._parse_url("ws://www.example.com/r") |
| 58 self.assertEquals(p[0], "www.example.com") |
| 59 self.assertEquals(p[1], 80) |
| 60 self.assertEquals(p[2], "/r") |
| 61 self.assertEquals(p[3], False) |
| 62 |
| 63 p = ws._parse_url("ws://www.example.com/r/") |
| 64 self.assertEquals(p[0], "www.example.com") |
| 65 self.assertEquals(p[1], 80) |
| 66 self.assertEquals(p[2], "/r/") |
| 67 self.assertEquals(p[3], False) |
| 68 |
| 69 p = ws._parse_url("ws://www.example.com/") |
| 70 self.assertEquals(p[0], "www.example.com") |
| 71 self.assertEquals(p[1], 80) |
| 72 self.assertEquals(p[2], "/") |
| 73 self.assertEquals(p[3], False) |
| 74 |
| 75 p = ws._parse_url("ws://www.example.com") |
| 76 self.assertEquals(p[0], "www.example.com") |
| 77 self.assertEquals(p[1], 80) |
| 78 self.assertEquals(p[2], "/") |
| 79 self.assertEquals(p[3], False) |
| 80 |
| 81 p = ws._parse_url("ws://www.example.com:8080/r") |
| 82 self.assertEquals(p[0], "www.example.com") |
| 83 self.assertEquals(p[1], 8080) |
| 84 self.assertEquals(p[2], "/r") |
| 85 self.assertEquals(p[3], False) |
| 86 |
| 87 p = ws._parse_url("ws://www.example.com:8080/") |
| 88 self.assertEquals(p[0], "www.example.com") |
| 89 self.assertEquals(p[1], 8080) |
| 90 self.assertEquals(p[2], "/") |
| 91 self.assertEquals(p[3], False) |
| 92 |
| 93 p = ws._parse_url("ws://www.example.com:8080") |
| 94 self.assertEquals(p[0], "www.example.com") |
| 95 self.assertEquals(p[1], 8080) |
| 96 self.assertEquals(p[2], "/") |
| 97 self.assertEquals(p[3], False) |
| 98 |
| 99 p = ws._parse_url("wss://www.example.com:8080/r") |
| 100 self.assertEquals(p[0], "www.example.com") |
| 101 self.assertEquals(p[1], 8080) |
| 102 self.assertEquals(p[2], "/r") |
| 103 self.assertEquals(p[3], True) |
| 104 |
| 105 p = ws._parse_url("wss://www.example.com:8080/r?key=value") |
| 106 self.assertEquals(p[0], "www.example.com") |
| 107 self.assertEquals(p[1], 8080) |
| 108 self.assertEquals(p[2], "/r?key=value") |
| 109 self.assertEquals(p[3], True) |
| 110 |
| 111 self.assertRaises(ValueError, ws._parse_url, "http://www.example.com/r") |
| 112 |
| 113 def testWSKey(self): |
| 114 key = ws._create_sec_websocket_key() |
| 115 self.assert_(key != 24) |
| 116 self.assert_("¥n" not in key) |
| 117 |
| 118 def testWsUtils(self): |
| 119 sock = ws.WebSocket() |
| 120 |
| 121 key = "c6b8hTg4EeGb2gQMztV1/g==" |
| 122 required_header = { |
| 123 "upgrade": "websocket", |
| 124 "connection": "upgrade", |
| 125 "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=", |
| 126 } |
| 127 self.assertEquals(sock._validate_header(required_header, key), True) |
| 128 |
| 129 header = required_header.copy() |
| 130 header["upgrade"] = "http" |
| 131 self.assertEquals(sock._validate_header(header, key), False) |
| 132 del header["upgrade"] |
| 133 self.assertEquals(sock._validate_header(header, key), False) |
| 134 |
| 135 header = required_header.copy() |
| 136 header["connection"] = "something" |
| 137 self.assertEquals(sock._validate_header(header, key), False) |
| 138 del header["connection"] |
| 139 self.assertEquals(sock._validate_header(header, key), False) |
| 140 |
| 141 |
| 142 header = required_header.copy() |
| 143 header["sec-websocket-accept"] = "something" |
| 144 self.assertEquals(sock._validate_header(header, key), False) |
| 145 del header["sec-websocket-accept"] |
| 146 self.assertEquals(sock._validate_header(header, key), False) |
| 147 |
| 148 def testReadHeader(self): |
| 149 sock = ws.WebSocket() |
| 150 sock.io_sock = sock.sock = HeaderSockMock("data/header01.txt") |
| 151 status, header = sock._read_headers() |
| 152 self.assertEquals(status, 101) |
| 153 self.assertEquals(header["connection"], "upgrade") |
| 154 |
| 155 sock.io_sock = sock.sock = HeaderSockMock("data/header02.txt") |
| 156 self.assertRaises(ws.WebSocketException, sock._read_headers) |
| 157 |
| 158 def testSend(self): |
| 159 # TODO: add longer frame data |
| 160 sock = ws.WebSocket() |
| 161 sock.set_mask_key(create_mask_key) |
| 162 s = sock.io_sock = sock.sock = HeaderSockMock("data/header01.txt") |
| 163 sock.send("Hello") |
| 164 self.assertEquals(s.sent[0], "\x81\x85abcd)\x07\x0f\x08\x0e") |
| 165 |
| 166 sock.send("こんにちは") |
| 167 self.assertEquals(s.sent[1], "\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\x
e5\xca\x81\xe2\xc5\x82\xe3\xcc") |
| 168 |
| 169 sock.send(u"こんにちは") |
| 170 self.assertEquals(s.sent[1], "\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\x
e5\xca\x81\xe2\xc5\x82\xe3\xcc") |
| 171 |
| 172 def testRecv(self): |
| 173 # TODO: add longer frame data |
| 174 sock = ws.WebSocket() |
| 175 s = sock.io_sock = sock.sock = StringSockMock() |
| 176 s.set_data("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5
\x82\xe3\xcc") |
| 177 data = sock.recv() |
| 178 self.assertEquals(data, "こんにちは") |
| 179 |
| 180 s.set_data("\x81\x85abcd)\x07\x0f\x08\x0e") |
| 181 data = sock.recv() |
| 182 self.assertEquals(data, "Hello") |
| 183 |
| 184 def testWebSocket(self): |
| 185 s = ws.create_connection("ws://echo.websocket.org/") #ws://localhost:80
80/echo") |
| 186 self.assertNotEquals(s, None) |
| 187 s.send("Hello, World") |
| 188 result = s.recv() |
| 189 self.assertEquals(result, "Hello, World") |
| 190 |
| 191 s.send("こにゃにゃちは、世界") |
| 192 result = s.recv() |
| 193 self.assertEquals(result, "こにゃにゃちは、世界") |
| 194 s.close() |
| 195 |
| 196 def testPingPong(self): |
| 197 s = ws.create_connection("ws://echo.websocket.org/") |
| 198 self.assertNotEquals(s, None) |
| 199 s.ping("Hello") |
| 200 s.pong("Hi") |
| 201 s.close() |
| 202 |
| 203 def testSecureWebSocket(self): |
| 204 s = ws.create_connection("wss://echo.websocket.org/") |
| 205 self.assertNotEquals(s, None) |
| 206 self.assert_(isinstance(s.io_sock, ws._SSLSocketWrapper)) |
| 207 s.send("Hello, World") |
| 208 result = s.recv() |
| 209 self.assertEquals(result, "Hello, World") |
| 210 s.send("こにゃにゃちは、世界") |
| 211 result = s.recv() |
| 212 self.assertEquals(result, "こにゃにゃちは、世界") |
| 213 s.close() |
| 214 |
| 215 def testWebSocketWihtCustomHeader(self): |
| 216 s = ws.create_connection("ws://echo.websocket.org/", |
| 217 headers={"User-Agent": "PythonWebsocketClient"
}) |
| 218 self.assertNotEquals(s, None) |
| 219 s.send("Hello, World") |
| 220 result = s.recv() |
| 221 self.assertEquals(result, "Hello, World") |
| 222 s.close() |
| 223 |
| 224 def testAfterClose(self): |
| 225 from socket import error |
| 226 s = ws.create_connection("ws://echo.websocket.org/") |
| 227 self.assertNotEquals(s, None) |
| 228 s.close() |
| 229 self.assertRaises(error, s.send, "Hello") |
| 230 self.assertRaises(error, s.recv) |
| 231 |
| 232 def testUUID4(self): |
| 233 """ WebSocket key should be a UUID4. |
| 234 """ |
| 235 key = ws._create_sec_websocket_key() |
| 236 u = uuid.UUID(bytes=base64.b64decode(key)) |
| 237 self.assertEquals(4, u.version) |
| 238 |
| 239 class WebSocketAppTest(unittest.TestCase): |
| 240 |
| 241 class NotSetYet(object): |
| 242 """ A marker class for signalling that a value hasn't been set yet. |
| 243 """ |
| 244 |
| 245 def setUp(self): |
| 246 ws.enableTrace(TRACABLE) |
| 247 |
| 248 WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() |
| 249 WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() |
| 250 WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() |
| 251 |
| 252 def tearDown(self): |
| 253 |
| 254 WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() |
| 255 WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() |
| 256 WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() |
| 257 |
| 258 def testKeepRunning(self): |
| 259 """ A WebSocketApp should keep running as long as its self.keep_running |
| 260 is not False (in the boolean context). |
| 261 """ |
| 262 |
| 263 def on_open(self, *args, **kwargs): |
| 264 """ Set the keep_running flag for later inspection and immediately |
| 265 close the connection. |
| 266 """ |
| 267 WebSocketAppTest.keep_running_open = self.keep_running |
| 268 self.close() |
| 269 |
| 270 def on_close(self, *args, **kwargs): |
| 271 """ Set the keep_running flag for the test to use. |
| 272 """ |
| 273 WebSocketAppTest.keep_running_close = self.keep_running |
| 274 |
| 275 app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_cl
ose=on_close) |
| 276 app.run_forever() |
| 277 |
| 278 self.assertFalse(isinstance(WebSocketAppTest.keep_running_open, |
| 279 WebSocketAppTest.NotSetYet)) |
| 280 |
| 281 self.assertFalse(isinstance(WebSocketAppTest.keep_running_close, |
| 282 WebSocketAppTest.NotSetYet)) |
| 283 |
| 284 self.assertEquals(True, WebSocketAppTest.keep_running_open) |
| 285 self.assertEquals(False, WebSocketAppTest.keep_running_close) |
| 286 |
| 287 def testSockMaskKey(self): |
| 288 """ A WebSocketApp should forward the received mask_key function down |
| 289 to the actual socket. |
| 290 """ |
| 291 |
| 292 def my_mask_key_func(): |
| 293 pass |
| 294 |
| 295 def on_open(self, *args, **kwargs): |
| 296 """ Set the value so the test can use it later on and immediately |
| 297 close the connection. |
| 298 """ |
| 299 WebSocketAppTest.get_mask_key_id = id(self.get_mask_key) |
| 300 self.close() |
| 301 |
| 302 app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_m
ask_key=my_mask_key_func) |
| 303 app.run_forever() |
| 304 |
| 305 # Note: We can't use 'is' for comparing the functions directly, need to
use 'id'. |
| 306 self.assertEquals(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func)
) |
| 307 |
| 308 |
| 309 if __name__ == "__main__": |
| 310 unittest.main() |
| 311 |
OLD | NEW |