OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library barback.test.stream_pool_test; | 5 library barback.test.stream_pool_test; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 | 8 |
9 import 'package:barback/src/utils.dart'; | 9 import 'package:barback/src/utils.dart'; |
10 import 'package:barback/src/utils/stream_pool.dart'; | 10 import 'package:barback/src/utils/stream_pool.dart'; |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
43 controller1.add("first"); | 43 controller1.add("first"); |
44 | 44 |
45 var controller2 = new StreamController<String>(); | 45 var controller2 = new StreamController<String>(); |
46 pool.add(controller2.stream); | 46 pool.add(controller2.stream); |
47 controller2.add("second"); | 47 controller2.add("second"); |
48 controller1.addError("third"); | 48 controller1.addError("third"); |
49 controller2.addError("fourth"); | 49 controller2.addError("fourth"); |
50 controller1.add("fifth"); | 50 controller1.add("fifth"); |
51 | 51 |
52 expect(newFuture(() { | 52 expect(newFuture(() { |
53 return pool.stream.transform(new StreamTransformer.fromHandlers( | 53 return pool.stream |
54 handleData: (data, sink) => sink.add(["data", data]), | 54 .transform(new StreamTransformer.fromHandlers( |
55 handleError: (error, stackTrace, sink) { | 55 handleData: (data, sink) => sink.add(["data", data]), |
56 sink.add(["error", error]); | 56 handleError: (error, stackTrace, sink) { |
57 })).toList(); | 57 sink.add(["error", error]); |
58 }), completion(equals([ | 58 })) |
59 ["data", "first"], | 59 .toList(); |
60 ["data", "second"], | 60 }), |
61 ["error", "third"], | 61 completion(equals([ |
62 ["error", "fourth"], | 62 ["data", "first"], |
63 ["data", "fifth"] | 63 ["data", "second"], |
64 ]))); | 64 ["error", "third"], |
| 65 ["error", "fourth"], |
| 66 ["data", "fifth"] |
| 67 ]))); |
65 | 68 |
66 pumpEventQueue().then((_) => pool.close()); | 69 pumpEventQueue().then((_) => pool.close()); |
67 }); | 70 }); |
68 | 71 |
69 test("buffers inputs from a broadcast stream", () { | 72 test("buffers inputs from a broadcast stream", () { |
70 var pool = new StreamPool<String>(); | 73 var pool = new StreamPool<String>(); |
71 var controller = new StreamController<String>.broadcast(); | 74 var controller = new StreamController<String>.broadcast(); |
72 pool.add(controller.stream); | 75 pool.add(controller.stream); |
73 controller.add("first"); | 76 controller.add("first"); |
74 controller.add("second"); | 77 controller.add("second"); |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
106 | 109 |
107 var controller1 = new StreamController<String>.broadcast(); | 110 var controller1 = new StreamController<String>.broadcast(); |
108 pool.add(controller1.stream); | 111 pool.add(controller1.stream); |
109 controller1.addError("first"); | 112 controller1.addError("first"); |
110 | 113 |
111 var controller2 = new StreamController<String>.broadcast(); | 114 var controller2 = new StreamController<String>.broadcast(); |
112 pool.add(controller2.stream); | 115 pool.add(controller2.stream); |
113 controller2.addError("second"); | 116 controller2.addError("second"); |
114 | 117 |
115 expect(newFuture(() { | 118 expect(newFuture(() { |
116 return pool.stream.transform(new StreamTransformer.fromHandlers( | 119 return pool.stream |
117 handleData: (data, sink) => sink.add(data), | 120 .transform(new StreamTransformer.fromHandlers( |
118 handleError: (error, stackTrace, sink) { sink.add(error); })) | 121 handleData: (data, sink) => sink.add(data), |
| 122 handleError: (error, stackTrace, sink) { |
| 123 sink.add(error); |
| 124 })) |
119 .toList(); | 125 .toList(); |
120 }), completion(isEmpty)); | 126 }), completion(isEmpty)); |
121 | 127 |
122 pumpEventQueue().then((_) => pool.close()); | 128 pumpEventQueue().then((_) => pool.close()); |
123 }); | 129 }); |
124 | 130 |
125 test("doesn't buffer inputs from a buffered stream", () { | 131 test("doesn't buffer inputs from a buffered stream", () { |
126 var pool = new StreamPool<String>.broadcast(); | 132 var pool = new StreamPool<String>.broadcast(); |
127 var controller = new StreamController<String>(); | 133 var controller = new StreamController<String>(); |
128 pool.add(controller.stream); | 134 pool.add(controller.stream); |
129 controller.add("first"); | 135 controller.add("first"); |
130 controller.add("second"); | 136 controller.add("second"); |
131 | 137 |
132 expect(pumpEventQueue().then((_) => pool.stream.toList()), | 138 expect(pumpEventQueue().then((_) => pool.stream.toList()), |
133 completion(isEmpty)); | 139 completion(isEmpty)); |
134 | 140 |
135 pumpEventQueue().then((_) => pool.close()); | 141 pumpEventQueue().then((_) => pool.close()); |
136 }); | 142 }); |
137 }); | 143 }); |
138 | 144 |
139 for (var type in ["buffered", "broadcast"]) { | 145 for (var type in ["buffered", "broadcast"]) { |
140 group(type, () { | 146 group(type, () { |
141 var pool; | 147 var pool; |
142 var bufferedController; | 148 var bufferedController; |
143 var bufferedStream; | |
144 var bufferedSyncController; | 149 var bufferedSyncController; |
145 var broadcastController; | 150 var broadcastController; |
146 var broadcastStream; | |
147 var broadcastSyncController; | 151 var broadcastSyncController; |
148 | 152 |
149 setUp(() { | 153 setUp(() { |
150 if (type == "buffered") { | 154 if (type == "buffered") { |
151 pool = new StreamPool<String>(); | 155 pool = new StreamPool<String>(); |
152 } else { | 156 } else { |
153 pool = new StreamPool<String>.broadcast(); | 157 pool = new StreamPool<String>.broadcast(); |
154 } | 158 } |
155 | 159 |
156 bufferedController = new StreamController<String>(); | 160 bufferedController = new StreamController<String>(); |
157 pool.add(bufferedController.stream); | 161 pool.add(bufferedController.stream); |
158 | 162 |
159 bufferedSyncController = new StreamController<String>(sync: true); | 163 bufferedSyncController = new StreamController<String>(sync: true); |
160 pool.add(bufferedSyncController.stream); | 164 pool.add(bufferedSyncController.stream); |
161 | 165 |
162 broadcastController = new StreamController<String>.broadcast(); | 166 broadcastController = new StreamController<String>.broadcast(); |
163 pool.add(broadcastController.stream); | 167 pool.add(broadcastController.stream); |
164 | 168 |
165 broadcastSyncController = | 169 broadcastSyncController = |
166 new StreamController<String>.broadcast(sync: true); | 170 new StreamController<String>.broadcast(sync: true); |
167 pool.add(broadcastSyncController.stream); | 171 pool.add(broadcastSyncController.stream); |
168 }); | 172 }); |
169 | 173 |
170 test("emits events to a listener", () { | 174 test("emits events to a listener", () { |
171 expect(pool.stream.toList(), completion(equals(["first", "second"]))); | 175 expect(pool.stream.toList(), completion(equals(["first", "second"]))); |
172 | 176 |
173 bufferedController.add("first"); | 177 bufferedController.add("first"); |
174 broadcastController.add("second"); | 178 broadcastController.add("second"); |
175 pumpEventQueue().then((_) => pool.close()); | 179 pumpEventQueue().then((_) => pool.close()); |
176 }); | 180 }); |
(...skipping 18 matching lines...) Expand all Loading... |
195 expect(events, isEmpty); | 199 expect(events, isEmpty); |
196 | 200 |
197 expect(pumpEventQueue().then((_) => events), | 201 expect(pumpEventQueue().then((_) => events), |
198 completion(equals(["first", "second"]))); | 202 completion(equals(["first", "second"]))); |
199 }); | 203 }); |
200 | 204 |
201 test("doesn't emit events from removed streams", () { | 205 test("doesn't emit events from removed streams", () { |
202 expect(pool.stream.toList(), completion(equals(["first", "third"]))); | 206 expect(pool.stream.toList(), completion(equals(["first", "third"]))); |
203 | 207 |
204 bufferedController.add("first"); | 208 bufferedController.add("first"); |
205 expect(pumpEventQueue().then((_) { | 209 expect( |
206 pool.remove(bufferedController.stream); | 210 pumpEventQueue().then((_) { |
207 bufferedController.add("second"); | 211 pool.remove(bufferedController.stream); |
208 }).then((_) { | 212 bufferedController.add("second"); |
209 broadcastController.add("third"); | 213 }).then((_) { |
210 return pumpEventQueue(); | 214 broadcastController.add("third"); |
211 }).then((_) { | 215 return pumpEventQueue(); |
212 pool.remove(broadcastController.stream); | 216 }).then((_) { |
213 broadcastController.add("fourth"); | 217 pool.remove(broadcastController.stream); |
214 pool.close(); | 218 broadcastController.add("fourth"); |
215 }), completes); | 219 pool.close(); |
| 220 }), |
| 221 completes); |
216 }); | 222 }); |
217 }); | 223 }); |
218 } | 224 } |
219 } | 225 } |
OLD | NEW |