OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 /** A library to illustrate pipelining. */ | 5 /** A library to illustrate pipelining. */ |
6 #library("promise"); | 6 #library("promise"); |
7 #import("dart:isolate"); | 7 #import("dart:isolate"); |
8 | 8 |
9 /** A promise to value of type [T] that may be computed asynchronously. */ | 9 /** A promise to value of type [T] that may be computed asynchronously. */ |
10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. | 10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. |
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
164 | 164 |
165 } | 165 } |
166 | 166 |
167 // When a promise is sent across a port, it is converted to a | 167 // When a promise is sent across a port, it is converted to a |
168 // Promise<SendPort> down which we must send a port to receive the | 168 // Promise<SendPort> down which we must send a port to receive the |
169 // completion value. Hand the Promise<SendPort> to this class to deal | 169 // completion value. Hand the Promise<SendPort> to this class to deal |
170 // with it. | 170 // with it. |
171 | 171 |
172 class PromiseProxy<T> extends PromiseImpl<T> { | 172 class PromiseProxy<T> extends PromiseImpl<T> { |
173 PromiseProxy(Promise<SendPort> sendCompleter) { | 173 PromiseProxy(Promise<SendPort> sendCompleter) { |
174 ReceivePort completer = new ReceivePort.singleShot(); | 174 ReceivePort completer = new ReceivePort(); |
175 completer.receive((var msg, SendPort _) { | 175 completer.then((var msg) { |
| 176 completer.close(); |
176 complete(msg[0]); | 177 complete(msg[0]); |
177 }); | 178 }); |
178 sendCompleter.addCompleteHandler((SendPort port) { | 179 sendCompleter.addCompleteHandler((SendPort port) { |
179 port.send([completer.toSendPort()], null); | 180 port.send([completer.toSendPort()], null); |
180 }); | 181 }); |
181 } | 182 } |
182 } | 183 } |
183 | 184 |
184 class PromiseImpl<T> implements Promise<T> { | 185 class PromiseImpl<T> implements Promise<T> { |
185 | 186 |
(...skipping 334 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
520 } | 521 } |
521 // Obviously this will be true if [entry] was a Proxy. | 522 // Obviously this will be true if [entry] was a Proxy. |
522 if (entry is Promise) { | 523 if (entry is Promise) { |
523 // Note that we could optimise this by just sending the value | 524 // Note that we could optimise this by just sending the value |
524 // if the promise is already complete. Let's get this working | 525 // if the promise is already complete. Let's get this working |
525 // first! | 526 // first! |
526 | 527 |
527 // This port will receive a SendPort that can be used to | 528 // This port will receive a SendPort that can be used to |
528 // signal completion of this promise to the corresponding | 529 // signal completion of this promise to the corresponding |
529 // promise that the other end has created. | 530 // promise that the other end has created. |
530 ReceivePort receiveCompleter = new ReceivePort.singleShot(); | 531 ReceivePort receiveCompleter = new ReceivePort(); |
531 marshalled[i] = receiveCompleter.toSendPort(); | 532 marshalled[i] = receiveCompleter.toSendPort(); |
532 Promise<SendPort> completer = new Promise<SendPort>(); | 533 Promise<SendPort> completer = new Promise<SendPort>(); |
533 receiveCompleter.receive((var msg, SendPort replyPort) { | 534 receiveCompleter.receive((var msg, SendPort replyPort) { |
| 535 port.close(); |
534 completer.complete(msg[0]); | 536 completer.complete(msg[0]); |
535 }); | 537 }); |
536 entry.addCompleteHandler((value) { | 538 entry.addCompleteHandler((value) { |
537 completer.addCompleteHandler((SendPort completePort) { | 539 completer.addCompleteHandler((SendPort completePort) { |
538 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); | 540 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); |
539 }); | 541 }); |
540 }); | 542 }); |
541 } else { | 543 } else { |
542 // FIXME(kasperl, benl): this should probably be a copy? | 544 // FIXME(kasperl, benl): this should probably be a copy? |
543 marshalled[i] = entry; | 545 marshalled[i] = entry; |
544 } | 546 } |
545 if (marshalled[i] is ReceivePort) { | 547 if (marshalled[i] is ReceivePort) { |
546 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); | 548 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); |
547 } | 549 } |
548 } | 550 } |
549 return process(marshalled); | 551 return process(marshalled); |
550 }).flatten(); | 552 }).flatten(); |
551 } | 553 } |
552 | 554 |
553 Promise<SendPort> _promise; | 555 Promise<SendPort> _promise; |
554 static Map<SendPort, Dispatcher> _dispatchers; | 556 static Map<SendPort, Dispatcher> _dispatchers; |
555 | 557 |
556 } | 558 } |
OLD | NEW |