| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 /** A library to illustrate pipelining. */ | 5 /** A library to illustrate pipelining. */ |
| 6 #library("promise"); | 6 #library("promise"); |
| 7 #import("dart:isolate"); | 7 #import("dart:isolate"); |
| 8 | 8 |
| 9 /** A promise to value of type [T] that may be computed asynchronously. */ | 9 /** A promise to value of type [T] that may be computed asynchronously. */ |
| 10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. | 10 // TODO(sigmund,benl): remove Promise<T> use Future<T> instead. |
| (...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 164 | 164 |
| 165 } | 165 } |
| 166 | 166 |
| 167 // When a promise is sent across a port, it is converted to a | 167 // When a promise is sent across a port, it is converted to a |
| 168 // Promise<SendPort> down which we must send a port to receive the | 168 // Promise<SendPort> down which we must send a port to receive the |
| 169 // completion value. Hand the Promise<SendPort> to this class to deal | 169 // completion value. Hand the Promise<SendPort> to this class to deal |
| 170 // with it. | 170 // with it. |
| 171 | 171 |
| 172 class PromiseProxy<T> extends PromiseImpl<T> { | 172 class PromiseProxy<T> extends PromiseImpl<T> { |
| 173 PromiseProxy(Promise<SendPort> sendCompleter) { | 173 PromiseProxy(Promise<SendPort> sendCompleter) { |
| 174 ReceivePort completer = new ReceivePort.singleShot(); | 174 ReceivePort completer = new ReceivePort(); |
| 175 completer.receive((var msg, SendPort _) { | 175 completer.receive((var msg, _) { |
| 176 completer.close(); |
| 176 complete(msg[0]); | 177 complete(msg[0]); |
| 177 }); | 178 }); |
| 178 sendCompleter.addCompleteHandler((SendPort port) { | 179 sendCompleter.addCompleteHandler((SendPort port) { |
| 179 port.send([completer.toSendPort()], null); | 180 port.send([completer.toSendPort()], null); |
| 180 }); | 181 }); |
| 181 } | 182 } |
| 182 } | 183 } |
| 183 | 184 |
| 184 class PromiseImpl<T> implements Promise<T> { | 185 class PromiseImpl<T> implements Promise<T> { |
| 185 | 186 |
| (...skipping 304 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 490 } | 491 } |
| 491 | 492 |
| 492 Promise call(List message) { | 493 Promise call(List message) { |
| 493 return _marshal(message, (List marshalled) { | 494 return _marshal(message, (List marshalled) { |
| 494 // TODO(kasperl): For now, the [Promise.then] implementation allows | 495 // TODO(kasperl): For now, the [Promise.then] implementation allows |
| 495 // me to return a promise and it will do the promise chaining. | 496 // me to return a promise and it will do the promise chaining. |
| 496 final result = new Promise(); | 497 final result = new Promise(); |
| 497 // The promise queue implementation guarantees that promise is | 498 // The promise queue implementation guarantees that promise is |
| 498 // resolved at this point. | 499 // resolved at this point. |
| 499 SendPort outgoing = _promise.value; | 500 SendPort outgoing = _promise.value; |
| 500 ReceivePort incoming = outgoing.call(marshalled); | 501 outgoing.call(marshalled).then((List receiveMessage) { |
| 501 incoming.receive((List receiveMessage, replyTo) { | |
| 502 result.complete(receiveMessage[0]); | 502 result.complete(receiveMessage[0]); |
| 503 }); | 503 }); |
| 504 return result; | 504 return result; |
| 505 }); | 505 }); |
| 506 } | 506 } |
| 507 | 507 |
| 508 // Marshal the [message] and pass it to the [process] callback | 508 // Marshal the [message] and pass it to the [process] callback |
| 509 // function. Any promises are converted to a port which expects to | 509 // function. Any promises are converted to a port which expects to |
| 510 // receive a port from the other side down which the remote promise | 510 // receive a port from the other side down which the remote promise |
| 511 // can be completed by sending the promise's completion value. | 511 // can be completed by sending the promise's completion value. |
| 512 Promise _marshal(List message, process(List marshalled)) { | 512 Promise _marshal(List message, process(List marshalled)) { |
| 513 return _promise.then((SendPort port) { | 513 return _promise.then((SendPort port) { |
| 514 List marshalled = new List(message.length); | 514 List marshalled = new List(message.length); |
| 515 | 515 |
| 516 for (int i = 0; i < marshalled.length; i++) { | 516 for (int i = 0; i < marshalled.length; i++) { |
| 517 var entry = message[i]; | 517 var entry = message[i]; |
| 518 if (entry is Proxy) { | 518 if (entry is Proxy) { |
| 519 entry = entry._promise; | 519 entry = entry._promise; |
| 520 } | 520 } |
| 521 // Obviously this will be true if [entry] was a Proxy. | 521 // Obviously this will be true if [entry] was a Proxy. |
| 522 if (entry is Promise) { | 522 if (entry is Promise) { |
| 523 // Note that we could optimise this by just sending the value | 523 // Note that we could optimise this by just sending the value |
| 524 // if the promise is already complete. Let's get this working | 524 // if the promise is already complete. Let's get this working |
| 525 // first! | 525 // first! |
| 526 | 526 |
| 527 // This port will receive a SendPort that can be used to | 527 // This port will receive a SendPort that can be used to |
| 528 // signal completion of this promise to the corresponding | 528 // signal completion of this promise to the corresponding |
| 529 // promise that the other end has created. | 529 // promise that the other end has created. |
| 530 ReceivePort receiveCompleter = new ReceivePort.singleShot(); | 530 ReceivePort receiveCompleter = new ReceivePort(); |
| 531 marshalled[i] = receiveCompleter.toSendPort(); | 531 marshalled[i] = receiveCompleter.toSendPort(); |
| 532 Promise<SendPort> completer = new Promise<SendPort>(); | 532 Promise<SendPort> completer = new Promise<SendPort>(); |
| 533 receiveCompleter.receive((var msg, SendPort replyPort) { | 533 receiveCompleter.receive((var msg, SendPort replyPort) { |
| 534 receiveCompleter.close(); |
| 534 completer.complete(msg[0]); | 535 completer.complete(msg[0]); |
| 535 }); | 536 }); |
| 536 entry.addCompleteHandler((value) { | 537 entry.addCompleteHandler((value) { |
| 537 completer.addCompleteHandler((SendPort completePort) { | 538 completer.addCompleteHandler((SendPort completePort) { |
| 538 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); | 539 _marshal([value], (List completeMessage) => completePort.send(comp
leteMessage, null)); |
| 539 }); | 540 }); |
| 540 }); | 541 }); |
| 541 } else { | 542 } else { |
| 542 // FIXME(kasperl, benl): this should probably be a copy? | 543 // FIXME(kasperl, benl): this should probably be a copy? |
| 543 marshalled[i] = entry; | 544 marshalled[i] = entry; |
| 544 } | 545 } |
| 545 if (marshalled[i] is ReceivePort) { | 546 if (marshalled[i] is ReceivePort) { |
| 546 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); | 547 throw new Exception("Despite the documentation, you cannot send a Rece
ivePort"); |
| 547 } | 548 } |
| 548 } | 549 } |
| 549 return process(marshalled); | 550 return process(marshalled); |
| 550 }).flatten(); | 551 }).flatten(); |
| 551 } | 552 } |
| 552 | 553 |
| 553 Promise<SendPort> _promise; | 554 Promise<SendPort> _promise; |
| 554 static Map<SendPort, Dispatcher> _dispatchers; | 555 static Map<SendPort, Dispatcher> _dispatchers; |
| 555 | 556 |
| 556 } | 557 } |
| OLD | NEW |