OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 /** A library to illustrate pipelining. */ | 5 /** A library to illustrate pipelining. */ |
6 #library("promise"); | 6 #library("promise"); |
7 #import("dart:isolate"); | 7 #import("dart:isolate"); |
8 | 8 |
9 /** A promise to value of type [T] that may be computed asynchronously. */ | 9 /** A promise to value of type [T] that may be computed asynchronously. */ |
10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. | 10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. |
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
164 | 164 |
165 } | 165 } |
166 | 166 |
167 // When a promise is sent across a port, it is converted to a | 167 // When a promise is sent across a port, it is converted to a |
168 // Promise<SendPort> down which we must send a port to receive the | 168 // Promise<SendPort> down which we must send a port to receive the |
169 // completion value. Hand the Promise<SendPort> to this class to deal | 169 // completion value. Hand the Promise<SendPort> to this class to deal |
170 // with it. | 170 // with it. |
171 | 171 |
172 class PromiseProxy<T> extends PromiseImpl<T> { | 172 class PromiseProxy<T> extends PromiseImpl<T> { |
173 PromiseProxy(Promise<SendPort> sendCompleter) { | 173 PromiseProxy(Promise<SendPort> sendCompleter) { |
174 ReceivePort completer = new ReceivePort.singleShot(); | 174 ReceivePort completer = new ReceivePort(); |
175 completer.receive((var msg, SendPort _) { | 175 completer.receive((var msg, _) { |
| 176 completer.close(); |
176 complete(msg[0]); | 177 complete(msg[0]); |
177 }); | 178 }); |
178 sendCompleter.addCompleteHandler((SendPort port) { | 179 sendCompleter.addCompleteHandler((SendPort port) { |
179 port.send([completer.toSendPort()], null); | 180 port.send([completer.toSendPort()], null); |
180 }); | 181 }); |
181 } | 182 } |
182 } | 183 } |
183 | 184 |
184 class PromiseImpl<T> implements Promise<T> { | 185 class PromiseImpl<T> implements Promise<T> { |
185 | 186 |
(...skipping 304 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
490 } | 491 } |
491 | 492 |
492 Promise call(List message) { | 493 Promise call(List message) { |
493 return _marshal(message, (List marshalled) { | 494 return _marshal(message, (List marshalled) { |
494 // TODO(kasperl): For now, the [Promise.then] implementation allows | 495 // TODO(kasperl): For now, the [Promise.then] implementation allows |
495 // me to return a promise and it will do the promise chaining. | 496 // me to return a promise and it will do the promise chaining. |
496 final result = new Promise(); | 497 final result = new Promise(); |
497 // The promise queue implementation guarantees that promise is | 498 // The promise queue implementation guarantees that promise is |
498 // resolved at this point. | 499 // resolved at this point. |
499 SendPort outgoing = _promise.value; | 500 SendPort outgoing = _promise.value; |
500 ReceivePort incoming = outgoing.call(marshalled); | 501 outgoing.call(marshalled).then((List receiveMessage) { |
501 incoming.receive((List receiveMessage, replyTo) { | |
502 result.complete(receiveMessage[0]); | 502 result.complete(receiveMessage[0]); |
503 }); | 503 }); |
504 return result; | 504 return result; |
505 }); | 505 }); |
506 } | 506 } |
507 | 507 |
508 // Marshal the [message] and pass it to the [process] callback | 508 // Marshal the [message] and pass it to the [process] callback |
509 // function. Any promises are converted to a port which expects to | 509 // function. Any promises are converted to a port which expects to |
510 // receive a port from the other side down which the remote promise | 510 // receive a port from the other side down which the remote promise |
511 // can be completed by sending the promise's completion value. | 511 // can be completed by sending the promise's completion value. |
512 Promise _marshal(List message, process(List marshalled)) { | 512 Promise _marshal(List message, process(List marshalled)) { |
513 return _promise.then((SendPort port) { | 513 return _promise.then((SendPort port) { |
514 List marshalled = new List(message.length); | 514 List marshalled = new List(message.length); |
515 | 515 |
516 for (int i = 0; i < marshalled.length; i++) { | 516 for (int i = 0; i < marshalled.length; i++) { |
517 var entry = message[i]; | 517 var entry = message[i]; |
518 if (entry is Proxy) { | 518 if (entry is Proxy) { |
519 entry = entry._promise; | 519 entry = entry._promise; |
520 } | 520 } |
521 // Obviously this will be true if [entry] was a Proxy. | 521 // Obviously this will be true if [entry] was a Proxy. |
522 if (entry is Promise) { | 522 if (entry is Promise) { |
523 // Note that we could optimise this by just sending the value | 523 // Note that we could optimise this by just sending the value |
524 // if the promise is already complete. Let's get this working | 524 // if the promise is already complete. Let's get this working |
525 // first! | 525 // first! |
526 | 526 |
527 // This port will receive a SendPort that can be used to | 527 // This port will receive a SendPort that can be used to |
528 // signal completion of this promise to the corresponding | 528 // signal completion of this promise to the corresponding |
529 // promise that the other end has created. | 529 // promise that the other end has created. |
530 ReceivePort receiveCompleter = new ReceivePort.singleShot(); | 530 ReceivePort receiveCompleter = new ReceivePort(); |
531 marshalled[i] = receiveCompleter.toSendPort(); | 531 marshalled[i] = receiveCompleter.toSendPort(); |
532 Promise<SendPort> completer = new Promise<SendPort>(); | 532 Promise<SendPort> completer = new Promise<SendPort>(); |
533 receiveCompleter.receive((var msg, SendPort replyPort) { | 533 receiveCompleter.receive((var msg, SendPort replyPort) { |
| 534 receiveCompleter.close(); |
534 completer.complete(msg[0]); | 535 completer.complete(msg[0]); |
535 }); | 536 }); |
536 entry.addCompleteHandler((value) { | 537 entry.addCompleteHandler((value) { |
537 completer.addCompleteHandler((SendPort completePort) { | 538 completer.addCompleteHandler((SendPort completePort) { |
538 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); | 539 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); |
539 }); | 540 }); |
540 }); | 541 }); |
541 } else { | 542 } else { |
542 // FIXME(kasperl, benl): this should probably be a copy? | 543 // FIXME(kasperl, benl): this should probably be a copy? |
543 marshalled[i] = entry; | 544 marshalled[i] = entry; |
544 } | 545 } |
545 if (marshalled[i] is ReceivePort) { | 546 if (marshalled[i] is ReceivePort) { |
546 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); | 547 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); |
547 } | 548 } |
548 } | 549 } |
549 return process(marshalled); | 550 return process(marshalled); |
550 }).flatten(); | 551 }).flatten(); |
551 } | 552 } |
552 | 553 |
553 Promise<SendPort> _promise; | 554 Promise<SendPort> _promise; |
554 static Map<SendPort, Dispatcher> _dispatchers; | 555 static Map<SendPort, Dispatcher> _dispatchers; |
555 | 556 |
556 } | 557 } |
OLD | NEW |