| Index: tests/lib/async/stream_controller_async_test.dart
 | 
| diff --git a/tests/lib/async/stream_controller_async_test.dart b/tests/lib/async/stream_controller_async_test.dart
 | 
| index 7c134055bc249134739f8f69a72e0a50b0bb5738..e97ab2c4e4f96e4a2ea4d44547114fdaebda830f 100644
 | 
| --- a/tests/lib/async/stream_controller_async_test.dart
 | 
| +++ b/tests/lib/async/stream_controller_async_test.dart
 | 
| @@ -464,7 +464,6 @@ void testBroadcastController() {
 | 
|  
 | 
|    test("broadcast-controller-individual-pause", () {
 | 
|      StreamProtocolTest test = new StreamProtocolTest.broadcast();
 | 
| -    test.trace = true;
 | 
|      var sub1;
 | 
|      test..expectListen()
 | 
|          ..expectData(42)
 | 
| @@ -498,6 +497,114 @@ void testBroadcastController() {
 | 
|    });
 | 
|  }
 | 
|  
 | 
| +void testSink({bool sync, bool broadcast, bool asBroadcast}) {
 | 
| +  String type = "${sync?"S":"A"}${broadcast?"B":"S"}${asBroadcast?"aB":""}";
 | 
| +  test("$type-controller-sink", () {
 | 
| +    var done = expectAsync0((){});
 | 
| +    var c = broadcast ? new StreamController.broadcast(sync: sync)
 | 
| +                      : new StreamController(sync: sync);
 | 
| +    var expected = new Events()
 | 
| +        ..add(42)..error("error")
 | 
| +        ..add(1)..add(2)..add(3)..add(4)..add(5)
 | 
| +        ..add(43)..close();
 | 
| +    var actual = new Events.capture(asBroadcast ? c.stream.asBroadcastStream()
 | 
| +                                                : c.stream);
 | 
| +    var sink = c.sink;
 | 
| +    sink.add(42);
 | 
| +    sink.addError("error");
 | 
| +    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
 | 
| +        .then((_) {
 | 
| +          sink.add(43);
 | 
| +          return sink.close();
 | 
| +        })
 | 
| +        .then((_) {
 | 
| +          Expect.listEquals(expected.events, actual.events);
 | 
| +          done();
 | 
| +        });
 | 
| +  });
 | 
| +
 | 
| +  test("$type-controller-sink-canceled", () {
 | 
| +    var done = expectAsync0((){});
 | 
| +    var c = broadcast ? new StreamController.broadcast(sync: sync)
 | 
| +                      : new StreamController(sync: sync);
 | 
| +    var expected = new Events()
 | 
| +        ..add(42)..error("error")
 | 
| +        ..add(1)..add(2)..add(3);
 | 
| +    var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
 | 
| +    var actual = new Events();
 | 
| +    var sub;
 | 
| +    // Cancel subscription after receiving "3" event.
 | 
| +    sub = stream.listen((v) {
 | 
| +      if (v == 3) sub.cancel();
 | 
| +      actual.add(v);
 | 
| +    }, onError: actual.error);
 | 
| +    var sink = c.sink;
 | 
| +    sink.add(42);
 | 
| +    sink.addError("error");
 | 
| +    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
 | 
| +        .then((_) {
 | 
| +          Expect.listEquals(expected.events, actual.events);
 | 
| +          // Close controller as well. It has no listener. If it is a broadcast
 | 
| +          // stream, it will still be open, and we read the "done" future before
 | 
| +          // closing. A normal stream is already done when its listener cancels.
 | 
| +          Future doneFuture = sink.done;
 | 
| +          sink.close();
 | 
| +          return doneFuture;
 | 
| +        })
 | 
| +        .then((_) {
 | 
| +          // No change in events.
 | 
| +          Expect.listEquals(expected.events, actual.events);
 | 
| +          done();
 | 
| +        });
 | 
| +  });
 | 
| +
 | 
| +  test("$type-controller-sink-paused", () {
 | 
| +    var done = expectAsync0((){});
 | 
| +    var c = broadcast ? new StreamController.broadcast(sync: sync)
 | 
| +                      : new StreamController(sync: sync);
 | 
| +    var expected = new Events()
 | 
| +        ..add(42)..error("error")
 | 
| +        ..add(1)..add(2)..add(3)
 | 
| +        ..add(4)..add(5)..add(43)..close();
 | 
| +    var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
 | 
| +    var actual = new Events();
 | 
| +    var sub;
 | 
| +    sub = stream.listen(
 | 
| +        (v) {
 | 
| +          if (v == 3) {
 | 
| +            sub.pause(new Future.delayed(const Duration(milliseconds: 15),
 | 
| +                                         () => null));
 | 
| +          }
 | 
| +          actual.add(v);
 | 
| +        },
 | 
| +        onError: actual.error,
 | 
| +        onDone: actual.close);
 | 
| +    var sink = c.sink;
 | 
| +    sink.add(42);
 | 
| +    sink.addError("error");
 | 
| +    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
 | 
| +        .then((_) {
 | 
| +          sink.add(43);
 | 
| +          return sink.close();
 | 
| +        })
 | 
| +        .then((_) {
 | 
| +          if (asBroadcast) {
 | 
| +            // The done-future of the sink completes when it passes
 | 
| +            // the done event to the asBroadcastStream controller, which is
 | 
| +            // before the final listener gets the event.
 | 
| +            // Wait for the pause to end before testing the events.
 | 
| +            return new Future.delayed(const Duration(milliseconds: 50), () {
 | 
| +              Expect.listEquals(expected.events, actual.events);
 | 
| +              done();
 | 
| +            });
 | 
| +          } else {
 | 
| +            Expect.listEquals(expected.events, actual.events);
 | 
| +            done();
 | 
| +          }
 | 
| +        });
 | 
| +  });
 | 
| +}
 | 
| +
 | 
|  main() {
 | 
|    testController();
 | 
|    testSingleController();
 | 
| @@ -505,4 +612,10 @@ main() {
 | 
|    testPause();
 | 
|    testRethrow();
 | 
|    testBroadcastController();
 | 
| +  testSink(sync: true, broadcast: false, asBroadcast: false);
 | 
| +  testSink(sync: true, broadcast: false, asBroadcast: true);
 | 
| +  testSink(sync: true, broadcast: true, asBroadcast: false);
 | 
| +  testSink(sync: false, broadcast: false, asBroadcast: false);
 | 
| +  testSink(sync: false, broadcast: false, asBroadcast: true);
 | 
| +  testSink(sync: false, broadcast: true, asBroadcast: false);
 | 
|  }
 | 
| 
 |