Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 11421073: Add more methods to Stream. (Closed) Base URL: https://dart.googlecode.com/svn/experimental/lib_v2/dart
Patch Set: Created 8 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // part of dart_async; 5 // part of dart_async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after
257 onData: (T data) { 257 onData: (T data) {
258 result.add(data); 258 result.add(data);
259 }, 259 },
260 onError: future._setError, 260 onError: future._setError,
261 onDone: () { 261 onDone: () {
262 future._setValue(result); 262 future._setValue(result);
263 }, 263 },
264 unsubscribeOnError: true); 264 unsubscribeOnError: true);
265 return future; 265 return future;
266 } 266 }
267
268 /**
269 * Provide at most the first [n] values of this stream.
270 *
271 * Forwards the first [n] data events of this stream, and all error
272 * events, to the returned stream, and ends with a done event.
273 *
274 * If this stream produces less than [n] values before it's done,
Bob Nystrom 2012/11/26 16:10:19 Nit: "less" -> "fewer". :)
275 * so will the returned stream.
276 */
277 Stream<T> take(int n) {
278 return new TakeStream(this, n);
279 }
280
281 /**
282 * Forwards data events while [test] is successful.
283 *
284 * The returned stream provides the same events as this stream as long
285 * as [test] returns [:true:] for the event data. The stream is done
286 * when either this stream is done, or when this stream first provides
287 * a value that [test] doesn't accept.
288 */
289 Stream<T> takeWhile(bool test(T value)) {
290 return new TakeWhileStream(this, test);
291 }
292
293 /**
294 * Skips the first [n] data events from this stream.
295 */
296 Stream<T> skip(int n) {
297 return new SkipStream(this, n);
298 }
299
300 /**
301 * Skip data events from this stream while they are matched by [test].
302 *
303 * Error and done events are provided by the returned stream unmodified.
304 *
305 * Starting with the first data event where [test] returns true for the
306 * event data, the returned stream will have the same events as this stream.
307 */
308 Stream<T> skipWhile(bool test(T value)) {
309 return new SkipWhileStream(this, test);
310 }
311
312 /**
313 * Skip data events if they are equal to the previous data event.
314 *
315 * The returned stream provides the same events as this stream, except
316 * that it never provides two consequtive data events that are equal.
317 *
318 * Equality is determined by the provided [equals] method. If that is
319 * omitted, the '==' operator on the last provided data element is used.
320 */
321 Stream<T> distinct([bool equals(T a, T b)]) {
322 return new DistinctStream(this, compare);
323 }
324
325 /**
326 * Find the first element of this stream matching [test].
327 *
328 * Returns a future that is filled with the first element of this stream
329 * that [test] returns true for.
330 *
331 * If no such element is found before this stream is done, and a
332 * [defaultValue] function is provided, the result of calling [defaultValue]
333 * becomes the value of the future.
334 *
335 * If an error occurs, or if this stream ends without finding a match and
336 * with no [defaultValue] function provided, the future will receive an
337 * error.
338 */
339 Future<T> firstMatch(bool test(T value), {T defaultValue()}) {
340 _FutureImpl<T> future = new _FutureImpl<T>();
341 StreamSubscription subscription;
342 subscription = subscribe(
343 onData: (T value) {
344 bool matches;
345 try {
346 matches = (true == test(value));
347 } catch (e, s) {
348 future._setError(new AsyncError(e, s));
349 subscription.unsubscribe();
350 return;
351 }
352 if (matches) {
353 future._setValue(value);
354 subscription.unsubscribe();
355 }
356 },
357 onError: future._setError,
358 onDone: () {
359 if (defaultValue != null) {
360 T value;
361 try {
362 value = defaultValue();
363 } catch (e, s) {
364 future._setError(new AsyncError(e, s));
365 return;
366 }
367 future._setValue(value);
368 return;
369 }
370 future._setError(
371 new AsyncError(new StateError("firstMatch ended without match")));
372 },
373 unsubscribeOnError: true);
374 return future;
375 }
376
377 /**
378 * Finds the last element in this stream matching [test].
379 *
380 * As [firstMatch], except that the last matching element is found.
381 * That means that the result cannot be provided before this stream
382 * is done.
383 */
384 Future<T> lastMatch(bool test(T value), {T defaultValue()}) {
385 _FutureImpl<T> future = new _FutureImpl<T>();
386 T result = null;
387 bool foundResult = false;
388 StreamSubscription subscription;
389 subscription = subscribe(
390 onData: (T value) {
391 bool matches;
392 try {
393 matches = (true == test(value));
394 } catch (e, s) {
395 future._setError(new AsyncError(e, s));
396 subscription.unsubscribe();
397 return;
398 }
399 if (matches) {
400 foundResult = true;
401 result = value;
402 }
403 },
404 onError: future._setError,
405 onDone: () {
406 if (foundResult) {
407 future._setValue(result);
408 return;
409 }
410 if (defaultValue != null) {
411 T value;
412 try {
413 value = defaultValue();
414 } catch (e, s) {
415 future._setError(new AsyncError(e, s));
416 return;
417 }
418 future._setValue(value);
419 return;
420 }
421 future._setError(
422 new AsyncError(new StateError("lastMatch ended without match")));
423 },
424 unsubscribeOnError: true);
425 return future;
426 }
427
428 /**
429 * Finds the single element in this stream matching [test].
430 *
431 * Like [lastMatch], except that it is an error if more than one
432 * matching element occurs in the stream.
433 */
434 Future<T> single(bool test(T value)) {
435 _FutureImpl<T> future = new _FutureImpl<T>();
436 T result = null;
437 bool foundResult = false;
438 StreamSubscription subscription;
439 subscription = subscribe(
440 onData: (T value) {
441 bool matches;
442 try {
443 matches = (true == test(value));
444 } catch (e, s) {
445 future._setError(new AsyncError(e, s));
446 subscription.unsubscribe();
447 return;
448 }
449 if (matches) {
450 if (foundResult) {
451 future._setError(new AsyncError(
452 new StateError('Multiple matches for "single"')));
453 subscription.unsubscribe();
454 return;
455 }
456 foundResult = true;
457 result = value;
458 }
459 },
460 onError: future._setError,
461 onDone: () {
462 if (foundResult) {
463 future._setValue(result);
464 return;
465 }
466 future._setError(
467 new AsyncError(new StateError("single ended without match")));
468 },
469 unsubscribeOnError: true);
470 return future;
471 }
472
473 /**
474 * Returns the value of the [index]th data event of this stream.
475 *
476 * If an error event occurs, the future will end with this error.
477 *
478 * If this stream provides fewer than [index] elements before closing,
479 * an error is reported.
480 */
481 Future<T> elementAt(int index) {
482 _FutureImpl<T> future = new _FutureImpl();
483 StreamSubscription subscription;
484 subscription = subscribe(
485 onData: (T value) {
486 if (index == 0) {
487 future._setValue(value);
488 subscription.unsubscribe();
489 return;
490 }
491 index -= 1;
492 },
493 onError: future._setError,
494 onDone: () {
495 _future._setError(new AsyncError(
496 new StateError("Not enough elements for elementAt")));
497 },
498 unsubscribeOnError: true);
499 return future;
500 }
267 } 501 }
268 502
269 /** 503 /**
270 * A control object for the subscription on a [Stream]. 504 * A control object for the subscription on a [Stream].
271 * 505 *
272 * When you subscribe on a [Stream] using [Stream.subscribe], 506 * When you subscribe on a [Stream] using [Stream.subscribe],
273 * a [StreamSubscription] object is returned. This object 507 * a [StreamSubscription] object is returned. This object
274 * is used to later unsubscribe again, or to temporarily pause 508 * is used to later unsubscribe again, or to temporarily pause
275 * the stream's events. 509 * the stream's events.
276 */ 510 */
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
374 sink.signalError(error); 608 sink.signalError(error);
375 } 609 }
376 610
377 /** 611 /**
378 * Handle an incoming done event. 612 * Handle an incoming done event.
379 */ 613 */
380 void handleDone(StreamSink<T> sink) { 614 void handleDone(StreamSink<T> sink) {
381 sink.close(data); 615 sink.close(data);
382 } 616 }
383 } 617 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698