OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // part of dart_async; | 5 // part of dart_async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
257 onData: (T data) { | 257 onData: (T data) { |
258 result.add(data); | 258 result.add(data); |
259 }, | 259 }, |
260 onError: future._setError, | 260 onError: future._setError, |
261 onDone: () { | 261 onDone: () { |
262 future._setValue(result); | 262 future._setValue(result); |
263 }, | 263 }, |
264 unsubscribeOnError: true); | 264 unsubscribeOnError: true); |
265 return future; | 265 return future; |
266 } | 266 } |
267 | |
268 /** | |
269 * Provide at most the first [n] values of this stream. | |
270 * | |
271 * Forwards the first [n] data events of this stream, and all error | |
272 * events, to the returned stream, and ends with a done event. | |
273 * | |
274 * If this stream produces less than [n] values before it's done, | |
Bob Nystrom
2012/11/26 16:10:19
Nit: "less" -> "fewer". :)
| |
275 * so will the returned stream. | |
276 */ | |
277 Stream<T> take(int n) { | |
278 return new TakeStream(this, n); | |
279 } | |
280 | |
281 /** | |
282 * Forwards data events while [test] is successful. | |
283 * | |
284 * The returned stream provides the same events as this stream as long | |
285 * as [test] returns [:true:] for the event data. The stream is done | |
286 * when either this stream is done, or when this stream first provides | |
287 * a value that [test] doesn't accept. | |
288 */ | |
289 Stream<T> takeWhile(bool test(T value)) { | |
290 return new TakeWhileStream(this, test); | |
291 } | |
292 | |
293 /** | |
294 * Skips the first [n] data events from this stream. | |
295 */ | |
296 Stream<T> skip(int n) { | |
297 return new SkipStream(this, n); | |
298 } | |
299 | |
300 /** | |
301 * Skip data events from this stream while they are matched by [test]. | |
302 * | |
303 * Error and done events are provided by the returned stream unmodified. | |
304 * | |
305 * Starting with the first data event where [test] returns true for the | |
306 * event data, the returned stream will have the same events as this stream. | |
307 */ | |
308 Stream<T> skipWhile(bool test(T value)) { | |
309 return new SkipWhileStream(this, test); | |
310 } | |
311 | |
312 /** | |
313 * Skip data events if they are equal to the previous data event. | |
314 * | |
315 * The returned stream provides the same events as this stream, except | |
316 * that it never provides two consequtive data events that are equal. | |
317 * | |
318 * Equality is determined by the provided [equals] method. If that is | |
319 * omitted, the '==' operator on the last provided data element is used. | |
320 */ | |
321 Stream<T> distinct([bool equals(T a, T b)]) { | |
322 return new DistinctStream(this, compare); | |
323 } | |
324 | |
325 /** | |
326 * Find the first element of this stream matching [test]. | |
327 * | |
328 * Returns a future that is filled with the first element of this stream | |
329 * that [test] returns true for. | |
330 * | |
331 * If no such element is found before this stream is done, and a | |
332 * [defaultValue] function is provided, the result of calling [defaultValue] | |
333 * becomes the value of the future. | |
334 * | |
335 * If an error occurs, or if this stream ends without finding a match and | |
336 * with no [defaultValue] function provided, the future will receive an | |
337 * error. | |
338 */ | |
339 Future<T> firstMatch(bool test(T value), {T defaultValue()}) { | |
340 _FutureImpl<T> future = new _FutureImpl<T>(); | |
341 StreamSubscription subscription; | |
342 subscription = subscribe( | |
343 onData: (T value) { | |
344 bool matches; | |
345 try { | |
346 matches = (true == test(value)); | |
347 } catch (e, s) { | |
348 future._setError(new AsyncError(e, s)); | |
349 subscription.unsubscribe(); | |
350 return; | |
351 } | |
352 if (matches) { | |
353 future._setValue(value); | |
354 subscription.unsubscribe(); | |
355 } | |
356 }, | |
357 onError: future._setError, | |
358 onDone: () { | |
359 if (defaultValue != null) { | |
360 T value; | |
361 try { | |
362 value = defaultValue(); | |
363 } catch (e, s) { | |
364 future._setError(new AsyncError(e, s)); | |
365 return; | |
366 } | |
367 future._setValue(value); | |
368 return; | |
369 } | |
370 future._setError( | |
371 new AsyncError(new StateError("firstMatch ended without match"))); | |
372 }, | |
373 unsubscribeOnError: true); | |
374 return future; | |
375 } | |
376 | |
377 /** | |
378 * Finds the last element in this stream matching [test]. | |
379 * | |
380 * As [firstMatch], except that the last matching element is found. | |
381 * That means that the result cannot be provided before this stream | |
382 * is done. | |
383 */ | |
384 Future<T> lastMatch(bool test(T value), {T defaultValue()}) { | |
385 _FutureImpl<T> future = new _FutureImpl<T>(); | |
386 T result = null; | |
387 bool foundResult = false; | |
388 StreamSubscription subscription; | |
389 subscription = subscribe( | |
390 onData: (T value) { | |
391 bool matches; | |
392 try { | |
393 matches = (true == test(value)); | |
394 } catch (e, s) { | |
395 future._setError(new AsyncError(e, s)); | |
396 subscription.unsubscribe(); | |
397 return; | |
398 } | |
399 if (matches) { | |
400 foundResult = true; | |
401 result = value; | |
402 } | |
403 }, | |
404 onError: future._setError, | |
405 onDone: () { | |
406 if (foundResult) { | |
407 future._setValue(result); | |
408 return; | |
409 } | |
410 if (defaultValue != null) { | |
411 T value; | |
412 try { | |
413 value = defaultValue(); | |
414 } catch (e, s) { | |
415 future._setError(new AsyncError(e, s)); | |
416 return; | |
417 } | |
418 future._setValue(value); | |
419 return; | |
420 } | |
421 future._setError( | |
422 new AsyncError(new StateError("lastMatch ended without match"))); | |
423 }, | |
424 unsubscribeOnError: true); | |
425 return future; | |
426 } | |
427 | |
428 /** | |
429 * Finds the single element in this stream matching [test]. | |
430 * | |
431 * Like [lastMatch], except that it is an error if more than one | |
432 * matching element occurs in the stream. | |
433 */ | |
434 Future<T> single(bool test(T value)) { | |
435 _FutureImpl<T> future = new _FutureImpl<T>(); | |
436 T result = null; | |
437 bool foundResult = false; | |
438 StreamSubscription subscription; | |
439 subscription = subscribe( | |
440 onData: (T value) { | |
441 bool matches; | |
442 try { | |
443 matches = (true == test(value)); | |
444 } catch (e, s) { | |
445 future._setError(new AsyncError(e, s)); | |
446 subscription.unsubscribe(); | |
447 return; | |
448 } | |
449 if (matches) { | |
450 if (foundResult) { | |
451 future._setError(new AsyncError( | |
452 new StateError('Multiple matches for "single"'))); | |
453 subscription.unsubscribe(); | |
454 return; | |
455 } | |
456 foundResult = true; | |
457 result = value; | |
458 } | |
459 }, | |
460 onError: future._setError, | |
461 onDone: () { | |
462 if (foundResult) { | |
463 future._setValue(result); | |
464 return; | |
465 } | |
466 future._setError( | |
467 new AsyncError(new StateError("single ended without match"))); | |
468 }, | |
469 unsubscribeOnError: true); | |
470 return future; | |
471 } | |
472 | |
473 /** | |
474 * Returns the value of the [index]th data event of this stream. | |
475 * | |
476 * If an error event occurs, the future will end with this error. | |
477 * | |
478 * If this stream provides fewer than [index] elements before closing, | |
479 * an error is reported. | |
480 */ | |
481 Future<T> elementAt(int index) { | |
482 _FutureImpl<T> future = new _FutureImpl(); | |
483 StreamSubscription subscription; | |
484 subscription = subscribe( | |
485 onData: (T value) { | |
486 if (index == 0) { | |
487 future._setValue(value); | |
488 subscription.unsubscribe(); | |
489 return; | |
490 } | |
491 index -= 1; | |
492 }, | |
493 onError: future._setError, | |
494 onDone: () { | |
495 _future._setError(new AsyncError( | |
496 new StateError("Not enough elements for elementAt"))); | |
497 }, | |
498 unsubscribeOnError: true); | |
499 return future; | |
500 } | |
267 } | 501 } |
268 | 502 |
269 /** | 503 /** |
270 * A control object for the subscription on a [Stream]. | 504 * A control object for the subscription on a [Stream]. |
271 * | 505 * |
272 * When you subscribe on a [Stream] using [Stream.subscribe], | 506 * When you subscribe on a [Stream] using [Stream.subscribe], |
273 * a [StreamSubscription] object is returned. This object | 507 * a [StreamSubscription] object is returned. This object |
274 * is used to later unsubscribe again, or to temporarily pause | 508 * is used to later unsubscribe again, or to temporarily pause |
275 * the stream's events. | 509 * the stream's events. |
276 */ | 510 */ |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
374 sink.signalError(error); | 608 sink.signalError(error); |
375 } | 609 } |
376 | 610 |
377 /** | 611 /** |
378 * Handle an incoming done event. | 612 * Handle an incoming done event. |
379 */ | 613 */ |
380 void handleDone(StreamSink<T> sink) { | 614 void handleDone(StreamSink<T> sink) { |
381 sink.close(data); | 615 sink.close(data); |
382 } | 616 } |
383 } | 617 } |
OLD | NEW |