OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // Test the event/callback protocol of the stream implementations. | 5 // Test the event/callback protocol of the stream implementations. |
| 6 library stream_state_test; |
6 | 7 |
7 import "../../../pkg/unittest/lib/unittest.dart"; | 8 import "../../../pkg/unittest/lib/unittest.dart"; |
8 import "dart:collection"; | 9 import "stream_state_helper.dart"; |
9 import "dart:async"; | |
10 | 10 |
11 const ms5 = const Duration(milliseconds: 5); | 11 const ms5 = const Duration(milliseconds: 5); |
12 | 12 |
13 main() { | 13 main() { |
14 mainTest(false); | 14 mainTest(false); |
15 mainTest(true); | 15 mainTest(true); |
16 } | 16 } |
17 | 17 |
18 mainTest(bool broadcast) { | 18 mainTest(bool broadcast) { |
19 var p = broadcast ? "BC" : "SC"; | 19 var p = broadcast ? "BC" : "SC"; |
(...skipping 12 matching lines...) Expand all Loading... |
32 t..expectDone(); | 32 t..expectDone(); |
33 } else { | 33 } else { |
34 t..expectSubscription(true, false) | 34 t..expectSubscription(true, false) |
35 ..expectData(42) | 35 ..expectData(42) |
36 ..expectDone() | 36 ..expectDone() |
37 ..expectSubscription(false, false); | 37 ..expectSubscription(false, false); |
38 } | 38 } |
39 t..add(42)..close()..subscribe(); | 39 t..add(42)..close()..subscribe(); |
40 }); | 40 }); |
41 | 41 |
42 test("$p-sub-data/pause-done", () { | |
43 var t = new StreamProtocolTest(broadcast); | |
44 t..expectSubscription(true, false) | |
45 ..expectData(42, () { | |
46 t.pause(new Future.delayed(ms5, () => null)); | |
47 }) | |
48 ..expectPause(true) | |
49 ..expectDone() | |
50 ..expectSubscription(false, false); | |
51 // We are calling "close" while the controller is actually paused, | |
52 // and it will stay paused until the pending events are sent. | |
53 t..subscribe()..add(42)..close(); | |
54 }); | |
55 | |
56 test("$p-sub-data/pause-resume/done", () { | |
57 var t = new StreamProtocolTest(broadcast); | |
58 t..expectSubscription(true, false) | |
59 ..expectData(42, () { | |
60 t.pause(new Future.delayed(ms5, () => null)); | |
61 }) | |
62 ..expectPause(true) | |
63 ..expectPause(false, () { t.close(); }) | |
64 ..expectDone() | |
65 ..expectSubscription(false, false); | |
66 t..subscribe()..add(42); | |
67 }); | |
68 | |
69 test("$p-sub-data/pause/resume/pause/resume-done", () { | |
70 var t = new StreamProtocolTest(broadcast); | |
71 t..expectSubscription(true, false) | |
72 ..expectData(42, () { | |
73 t.pause(); | |
74 }) | |
75 ..expectPause(true, () { t.resume(); }) | |
76 ..expectPause(false, () { t.pause(); }) | |
77 ..expectPause(true, () { t.resume(); }) | |
78 ..expectPause(false, () { t.close(); }) | |
79 ..expectDone() | |
80 ..expectSubscription(false, false); | |
81 t..subscribe()..add(42); | |
82 }); | |
83 | |
84 test("$p-sub-data/pause+resume-done", () { | 42 test("$p-sub-data/pause+resume-done", () { |
85 var t = new StreamProtocolTest(broadcast); | 43 var t = new StreamProtocolTest(broadcast); |
86 t..expectSubscription(true, false) | 44 t..expectSubscription(true, false) |
87 ..expectData(42, () { | 45 ..expectData(42, () { |
88 t.pause(); | 46 t.pause(); |
89 t.resume(); | 47 t.resume(); |
90 t.close(); | 48 t.close(); |
91 }) | 49 }) |
92 ..expectDone() | 50 ..expectDone() |
93 ..expectSubscription(false, false); | 51 ..expectSubscription(false, false); |
94 t..subscribe()..add(42); | 52 t..subscribe()..add(42); |
95 }); | 53 }); |
96 | 54 |
97 test("$p-sub-data/data+pause-data-resume-done", () { | |
98 var t = new StreamProtocolTest(broadcast); | |
99 t..expectSubscription(true, false) | |
100 ..expectData(42, () { | |
101 t.add(43); | |
102 t.pause(new Future.delayed(ms5, () => null)); | |
103 // Should now be paused until the future finishes. | |
104 // After that, the controller stays paused until the pending queue | |
105 // is empty. | |
106 }) | |
107 ..expectPause(true) | |
108 ..expectData(43) | |
109 ..expectPause(false, () { t.close(); }) | |
110 ..expectDone() | |
111 ..expectSubscription(false, false); | |
112 t..subscribe()..add(42); | |
113 }); | |
114 | |
115 test("$p-sub-data-unsubonerror", () { | 55 test("$p-sub-data-unsubonerror", () { |
116 var t = new StreamProtocolTest(broadcast); | 56 var t = new StreamProtocolTest(broadcast); |
117 t..expectSubscription(true, false) | 57 t..expectSubscription(true, false) |
118 ..expectData(42) | 58 ..expectData(42) |
119 ..expectError("bad") | 59 ..expectError("bad") |
120 ..expectSubscription(false, !broadcast); | 60 ..expectSubscription(false, !broadcast); |
121 t..subscribe(unsubscribeOnError: true) | 61 t..subscribe(unsubscribeOnError: true) |
122 ..add(42) | 62 ..add(42) |
123 ..error("bad") | 63 ..error("bad") |
124 ..add(43) | 64 ..add(43) |
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
214 var t = new StreamProtocolTest(broadcast); | 154 var t = new StreamProtocolTest(broadcast); |
215 t..expectSubscription(true, false) | 155 t..expectSubscription(true, false) |
216 ..expectDone() | 156 ..expectDone() |
217 ..expectSubscription(false, false) | 157 ..expectSubscription(false, false) |
218 ..expectDone(); | 158 ..expectDone(); |
219 t..subscribe() | 159 t..subscribe() |
220 ..close() | 160 ..close() |
221 ..subscribe(); // Subscribe after done does not cause callbacks at all. | 161 ..subscribe(); // Subscribe after done does not cause callbacks at all. |
222 }); | 162 }); |
223 } | 163 } |
224 | |
225 // -------------------------------------------------------------------- | |
226 // Utility classes. | |
227 | |
228 class StreamProtocolTest { | |
229 StreamController _controller; | |
230 StreamSubscription _subscription; | |
231 List<Event> _expectations = new List<Event>(); | |
232 int _nextExpectationIndex = 0; | |
233 Function _onComplete; | |
234 | |
235 StreamProtocolTest([bool broadcast = false]) { | |
236 if (broadcast) { | |
237 _controller = new StreamController.broadcast( | |
238 onPauseStateChange: _onPause, | |
239 onSubscriptionStateChange: _onSubcription); | |
240 // TODO(lrn): Make it work with multiple subscribers too. | |
241 } else { | |
242 _controller = new StreamController( | |
243 onPauseStateChange: _onPause, | |
244 onSubscriptionStateChange: _onSubcription); | |
245 } | |
246 _onComplete = expectAsync0((){ | |
247 _onComplete = null; // Being null marks the test to be complete. | |
248 }); | |
249 } | |
250 | |
251 // Actions on the stream and controller. | |
252 void add(var data) { _controller.add(data); } | |
253 void error(var error) { _controller.signalError(error); } | |
254 void close() { _controller.close(); } | |
255 | |
256 void subscribe({bool unsubscribeOnError : false}) { | |
257 // TODO(lrn): Handle more subscriptions (e.g., a subscription-id | |
258 // per subscription, and an id on event _expectations). | |
259 if (_subscription != null) throw new StateError("Already subscribed"); | |
260 _subscription = _controller.stream.listen(_onData, | |
261 onError: _onError, | |
262 onDone: _onDone, | |
263 unsubscribeOnError: | |
264 unsubscribeOnError); | |
265 } | |
266 | |
267 void pause([Future resumeSignal]) { | |
268 if (_subscription == null) throw new StateError("Not subscribed"); | |
269 _subscription.pause(resumeSignal); | |
270 } | |
271 | |
272 void resume([Future resumeSignal]) { | |
273 if (_subscription == null) throw new StateError("Not subscribed"); | |
274 _subscription.resume(); | |
275 } | |
276 | |
277 void cancel() { | |
278 if (_subscription == null) throw new StateError("Not subscribed"); | |
279 _subscription.cancel(); | |
280 _subscription = null; | |
281 } | |
282 | |
283 // Handling of stream events. | |
284 void _onData(var data) { | |
285 _withNextExpectation((Event expect) { | |
286 if (!expect.matchData(data)) { | |
287 _fail("Expected: $expect\n" | |
288 "Found : [Data: $data]"); | |
289 } | |
290 }); | |
291 } | |
292 | |
293 void _onError(AsyncError error) { | |
294 _withNextExpectation((Event expect) { | |
295 if (!expect.matchError(error)) { | |
296 _fail("Expected: $expect\n" | |
297 "Found : [Data: ${error.error}]"); | |
298 } | |
299 }); | |
300 } | |
301 | |
302 void _onDone() { | |
303 _subscription = null; | |
304 _withNextExpectation((Event expect) { | |
305 if (!expect.matchDone()) { | |
306 _fail("Expected: $expect\n" | |
307 "Found : [Done]"); | |
308 } | |
309 }); | |
310 } | |
311 | |
312 void _onPause() { | |
313 _withNextExpectation((Event expect) { | |
314 if (!expect.matchPauseChange(_controller)) { | |
315 _fail("Expected: $expect\n" | |
316 "Found : [Paused:${_controller.isPaused}]"); | |
317 } | |
318 }); | |
319 } | |
320 | |
321 void _onSubcription() { | |
322 _withNextExpectation((Event expect) { | |
323 if (!expect.matchSubscriptionChange(_controller)) { | |
324 _fail("Expected: $expect\n" | |
325 "Found: [Subscribed:${_controller.hasSubscribers}, " | |
326 "Paused:${_controller.isPaused}]"); | |
327 } | |
328 }); | |
329 } | |
330 | |
331 void _withNextExpectation(void action(Event expect)) { | |
332 if (_nextExpectationIndex == _expectations.length) { | |
333 action(new MismatchEvent()); | |
334 } else { | |
335 Event next = _expectations[_nextExpectationIndex++]; | |
336 action(next); | |
337 } | |
338 _checkDone(); | |
339 } | |
340 | |
341 void _checkDone() { | |
342 if (_nextExpectationIndex == _expectations.length) { | |
343 _onComplete(); | |
344 } | |
345 } | |
346 | |
347 | |
348 // Adds _expectations. | |
349 void expectAny([void action()]) { | |
350 if (_onComplete == null) { | |
351 _fail("Adding expectation after completing"); | |
352 } | |
353 _expectations.add(new LogAnyEvent(action)); | |
354 } | |
355 void expectData(var data, [void action()]) { | |
356 if (_onComplete == null) { | |
357 _fail("Adding expectation after completing"); | |
358 } | |
359 _expectations.add(new DataEvent(data, action)); | |
360 } | |
361 void expectError(var error, [void action()]) { | |
362 if (_onComplete == null) { | |
363 _fail("Adding expectation after completing"); | |
364 } | |
365 _expectations.add(new ErrorEvent(error, action)); | |
366 } | |
367 void expectDone([void action()]) { | |
368 if (_onComplete == null) { | |
369 _fail("Adding expectation after completing"); | |
370 } | |
371 _expectations.add(new DoneEvent(action)); | |
372 } | |
373 void expectPause(bool isPaused, [void action()]) { | |
374 if (_onComplete == null) { | |
375 _fail("Adding expectation after completing"); | |
376 } | |
377 _expectations.add(new PauseCallbackEvent(isPaused, action)); | |
378 } | |
379 void expectSubscription(bool hasSubscribers, bool isPaused, [void action()]) { | |
380 if (_onComplete == null) { | |
381 _fail("Adding expectation after completing"); | |
382 } | |
383 _expectations.add( | |
384 new SubscriptionCallbackEvent(hasSubscribers, isPaused, action)); | |
385 } | |
386 | |
387 void _fail(String message) { | |
388 if (_nextExpectationIndex == 0) { | |
389 throw "Unexpected event:\n$message\nNo earlier events matched."; | |
390 } | |
391 throw "Unexpected event:\n$message\nMatched so far:\n" | |
392 " ${_expectations.take(_nextExpectationIndex).join("\n ")}"; | |
393 } | |
394 } | |
395 | |
396 class EventCollector { | |
397 final Queue<Event> events = new Queue<Event>(); | |
398 | |
399 } | |
400 | |
401 class Event { | |
402 Function _action; | |
403 Event(void this._action()); | |
404 | |
405 bool matchData(var data) { | |
406 if (!_testData(data)) return false; | |
407 if (_action != null) _action(); | |
408 return true; | |
409 } | |
410 bool matchError(AsyncError e) { | |
411 if (!_testError(e)) return false; | |
412 if (_action != null) _action(); | |
413 return true; | |
414 } | |
415 bool matchDone() { | |
416 if (!_testDone()) return false; | |
417 if (_action != null) _action(); | |
418 return true; | |
419 } | |
420 bool matchPauseChange(StreamController c) { | |
421 if (!_testPause(c)) return false; | |
422 if (_action != null) _action(); | |
423 return true; | |
424 } | |
425 bool matchSubscriptionChange(StreamController c) { | |
426 if (!_testSubscribe(c)) return false; | |
427 if (_action != null) _action(); | |
428 return true; | |
429 } | |
430 | |
431 bool _testData(_) => false; | |
432 bool _testError(_) => false; | |
433 bool _testDone() => false; | |
434 bool _testPause(_) => false; | |
435 bool _testSubscribe(_) => false; | |
436 } | |
437 | |
438 class MismatchEvent extends Event { | |
439 MismatchEvent() : super(null); | |
440 toString() => "[No event expected]"; | |
441 } | |
442 | |
443 class DataEvent extends Event { | |
444 final data; | |
445 DataEvent(this.data, void action()) : super(action); | |
446 bool _testData(var data) => this.data == data; | |
447 String toString() => "[Data: $data]"; | |
448 } | |
449 | |
450 class ErrorEvent extends Event { | |
451 final error; | |
452 ErrorEvent(this.error, void action()) : super(action); | |
453 bool _testError(AsyncError error) => this.error == error.error; | |
454 String toString() => "[Error: $error]"; | |
455 } | |
456 | |
457 class DoneEvent extends Event { | |
458 DoneEvent(void action()) : super(action); | |
459 bool _testDone() => true; | |
460 String toString() => "[Done]"; | |
461 } | |
462 | |
463 class PauseCallbackEvent extends Event { | |
464 final bool isPaused; | |
465 PauseCallbackEvent(this.isPaused, void action()) | |
466 : super(action); | |
467 bool _testPause(StreamController c) => isPaused == c.isPaused; | |
468 String toString() => "[Paused:$isPaused]"; | |
469 } | |
470 | |
471 class SubscriptionCallbackEvent extends Event { | |
472 final bool hasSubscribers; | |
473 final bool isPaused; | |
474 SubscriptionCallbackEvent(this.hasSubscribers, this.isPaused, void action()) | |
475 : super(action); | |
476 bool _testSubscribe(StreamController c) { | |
477 return hasSubscribers == c.hasSubscribers && isPaused == c.isPaused; | |
478 } | |
479 String toString() => "[Subscribers:$hasSubscribers, Paused:$isPaused]"; | |
480 } | |
481 | |
482 | |
483 class LogAnyEvent extends Event { | |
484 String _actual = "*Not matched yet*"; | |
485 LogAnyEvent(void action()) : super(action); | |
486 bool _testData(var data) { | |
487 _actual = "*[Data $data]"; | |
488 return true; | |
489 } | |
490 bool _testError(AsyncError error) { | |
491 _actual = "*[Error ${error.error}]"; | |
492 return true; | |
493 } | |
494 bool _testDone() { | |
495 _actual = "*[Done]"; | |
496 return true; | |
497 } | |
498 bool _testPause(StreamController c) { | |
499 _actual = "*[Paused:${c.isPaused}]"; | |
500 return true; | |
501 } | |
502 bool _testSubcribe(StreamController c) { | |
503 _actual = "*[Subscribers:${c.hasSubscribers}, Paused:${c.isPaused}]"; | |
504 return true; | |
505 } | |
506 | |
507 String toString() => _actual; | |
508 } | |
OLD | NEW |