Index: lib/isolate/frog/ports.dart |
diff --git a/lib/isolate/frog/ports.dart b/lib/isolate/frog/ports.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5504b7470b7f444fa3587cd8725dd7fd63781e94 |
--- /dev/null |
+++ b/lib/isolate/frog/ports.dart |
@@ -0,0 +1,284 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+/** Common functionality to all send ports. */ |
+class _BaseSendPort implements SendPort { |
+ /** Id for the destination isolate. */ |
+ final int _isolateId; |
+ |
+ _BaseSendPort(this._isolateId); |
+ |
+ _ReceivePortSingleShotImpl call(var message) { |
+ final result = new _ReceivePortSingleShotImpl(); |
+ this.send(message, result.toSendPort()); |
+ return result; |
+ } |
+ |
+ static void checkReplyTo(SendPort replyTo) { |
+ if (replyTo !== null |
+ && replyTo is! _NativeJsSendPort |
+ && replyTo is! _WorkerSendPort |
+ && replyTo is! _BufferingSendPort) { |
+ throw new Exception("SendPort.send: Illegal replyTo port type"); |
+ } |
+ } |
+ |
+ // TODO(sigmund): replace the current SendPort.call with the following: |
+ //Future call(var message) { |
+ // final completer = new Completer(); |
+ // final port = new _ReceivePort.singleShot(); |
+ // send(message, port.toSendPort()); |
+ // port.receive((value, ignoreReplyTo) { |
+ // if (value is Exception) { |
+ // completer.completeException(value); |
+ // } else { |
+ // completer.complete(value); |
+ // } |
+ // }); |
+ // return completer.future; |
+ //} |
+ |
+ abstract void send(var message, [SendPort replyTo]); |
+ abstract bool operator ==(var other); |
+ abstract int hashCode(); |
+} |
+ |
+/** A send port that delivers messages in-memory via native JavaScript calls. */ |
+class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
+ final _ReceivePortImpl _receivePort; |
+ |
+ const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
+ |
+ void send(var message, [SendPort replyTo = null]) { |
+ _waitForPendingPorts([message, replyTo], () { |
+ checkReplyTo(replyTo); |
+ // Check that the isolate still runs and the port is still open |
+ final isolate = globalState.isolates[_isolateId]; |
+ if (isolate == null) return; |
+ if (_receivePort._callback == null) return; |
+ |
+ // We force serialization/deserialization as a simple way to ensure |
+ // isolate communication restrictions are respected between isolates that |
+ // live in the same worker. [_NativeJsSendPort] delivers both messages |
+ // from the same worker and messages from other workers. In particular, |
+ // messages sent from a worker via a [_WorkerSendPort] are received at |
+ // [_processWorkerMessage] and forwarded to a native port. In such cases, |
+ // here we'll see [globalState.currentContext == null]. |
+ final shouldSerialize = globalState.currentContext != null |
+ && globalState.currentContext.id != _isolateId; |
+ var msg = message; |
+ var reply = replyTo; |
+ if (shouldSerialize) { |
+ msg = _serializeMessage(msg); |
+ reply = _serializeMessage(reply); |
+ } |
+ globalState.topEventLoop.enqueue(isolate, () { |
+ if (_receivePort._callback != null) { |
+ if (shouldSerialize) { |
+ msg = _deserializeMessage(msg); |
+ reply = _deserializeMessage(reply); |
+ } |
+ _receivePort._callback(msg, reply); |
+ } |
+ }, 'receive ' + message); |
+ }); |
+ } |
+ |
+ bool operator ==(var other) => (other is _NativeJsSendPort) && |
+ (_receivePort == other._receivePort); |
+ |
+ int hashCode() => _receivePort._id; |
+} |
+ |
+/** A send port that delivers messages via worker.postMessage. */ |
+class _WorkerSendPort extends _BaseSendPort implements SendPort { |
+ final int _workerId; |
+ final int _receivePortId; |
+ |
+ const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
+ : super(isolateId); |
+ |
+ void send(var message, [SendPort replyTo = null]) { |
+ _waitForPendingPorts([message, replyTo], () { |
+ checkReplyTo(replyTo); |
+ final workerMessage = _serializeMessage({ |
+ 'command': 'message', |
+ 'port': this, |
+ 'msg': message, |
+ 'replyTo': replyTo}); |
+ |
+ if (globalState.isWorker) { |
+ // communication from one worker to another go through the main worker: |
+ globalState.mainWorker.postMessage(workerMessage); |
+ } else { |
+ globalState.workers[_workerId].postMessage(workerMessage); |
+ } |
+ }); |
+ } |
+ |
+ bool operator ==(var other) { |
+ return (other is _WorkerSendPort) && |
+ (_workerId == other._workerId) && |
+ (_isolateId == other._isolateId) && |
+ (_receivePortId == other._receivePortId); |
+ } |
+ |
+ int hashCode() { |
+ // TODO(sigmund): use a standard hash when we get one available in corelib. |
+ return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId; |
+ } |
+} |
+ |
+/** A port that buffers messages until an underlying port gets resolved. */ |
+class _BufferingSendPort extends _BaseSendPort implements SendPort { |
+ /** Internal counter to assign unique ids to each port. */ |
+ static int _idCount = 0; |
+ |
+ /** For implementing equals and hashcode. */ |
+ final int _id; |
+ |
+ /** Underlying port, when resolved. */ |
+ SendPort _port; |
+ |
+ /** |
+ * Future of the underlying port, so that we can detect when this port can be |
+ * sent on messages. |
+ */ |
+ Future<SendPort> _futurePort; |
+ |
+ /** Pending messages (and reply ports). */ |
+ List pending; |
+ |
+ _BufferingSendPort(isolateId, this._futurePort) |
+ : super(isolateId), _id = _idCount, pending = [] { |
+ _idCount++; |
+ _futurePort.then((p) { |
+ _port = p; |
+ for (final item in pending) { |
+ p.send(item['message'], item['replyTo']); |
+ } |
+ pending = null; |
+ }); |
+ } |
+ |
+ _BufferingSendPort.fromPort(isolateId, this._port) |
+ : super(isolateId), _id = _idCount { |
+ _idCount++; |
+ } |
+ |
+ void send(var message, [SendPort replyTo]) { |
+ if (_port != null) { |
+ _port.send(message, replyTo); |
+ } else { |
+ pending.add({'message': message, 'replyTo': replyTo}); |
+ } |
+ } |
+ |
+ bool operator ==(var other) => |
+ other is _BufferingSendPort && _id == other._id; |
+ int hashCode() => _id; |
+} |
+ |
+/** Default factory for receive ports. */ |
+class _ReceivePortFactory { |
+ |
+ factory ReceivePort() { |
+ return new _ReceivePortImpl(); |
+ } |
+ |
+ factory ReceivePort.singleShot() { |
+ return new _ReceivePortSingleShotImpl(); |
+ } |
+} |
+ |
+/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
+class _ReceivePortImpl implements ReceivePort { |
+ int _id; |
+ Function _callback; |
+ static int _nextFreeId = 1; |
+ |
+ _ReceivePortImpl() |
+ : _id = _nextFreeId++ { |
+ globalState.currentContext.register(_id, this); |
+ } |
+ |
+ void receive(void onMessage(var message, SendPort replyTo)) { |
+ _callback = onMessage; |
+ } |
+ |
+ void close() { |
+ _callback = null; |
+ globalState.currentContext.unregister(_id); |
+ } |
+ |
+ SendPort toSendPort() { |
+ return new _NativeJsSendPort(this, globalState.currentContext.id); |
+ } |
+} |
+ |
+/** Implementation of a single-shot [ReceivePort]. */ |
+class _ReceivePortSingleShotImpl implements ReceivePort { |
+ |
+ _ReceivePortSingleShotImpl() : _port = new _ReceivePortImpl() { } |
+ |
+ void receive(void callback(var message, SendPort replyTo)) { |
+ _port.receive((var message, SendPort replyTo) { |
+ _port.close(); |
+ callback(message, replyTo); |
+ }); |
+ } |
+ |
+ void close() { |
+ _port.close(); |
+ } |
+ |
+ SendPort toSendPort() => _port.toSendPort(); |
+ |
+ final _ReceivePortImpl _port; |
+} |
+ |
+/** Wait until all ports in a message are resolved. */ |
+_waitForPendingPorts(var message, void callback()) { |
+ final finder = new _PendingSendPortFinder(); |
+ finder.traverse(message); |
+ Futures.wait(finder.ports).then((_) => callback()); |
+} |
+ |
+ |
+/** Visitor that finds all unresolved [SendPort]s in a message. */ |
+class _PendingSendPortFinder extends _MessageTraverser { |
+ List<Future<SendPort>> ports; |
+ _PendingSendPortFinder() : super(), ports = []; |
+ |
+ visitPrimitive(x) {} |
+ visitNativeJsSendPort(_NativeJsSendPort port) {} |
+ visitWorkerSendPort(_WorkerSendPort port) {} |
+ visitReceivePort(_ReceivePortImpl port) {} |
+ visitReceivePortSingleShot(_ReceivePortSingleShotImpl port) {} |
+ |
+ visitList(List list) { |
+ final visited = _getInfo(list); |
+ if (visited !== null) return; |
+ _attachInfo(list, true); |
+ // TODO(sigmund): replace with the following: (bug #1660) |
+ // list.forEach(_dispatch); |
+ list.forEach((e) => _dispatch(e)); |
+ } |
+ |
+ visitMap(Map map) { |
+ final visited = _getInfo(map); |
+ if (visited !== null) return; |
+ |
+ _attachInfo(map, true); |
+ // TODO(sigmund): replace with the following: (bug #1660) |
+ // map.getValues().forEach(_dispatch); |
+ map.getValues().forEach((e) => _dispatch(e)); |
+ } |
+ |
+ visitBufferingSendPort(_BufferingSendPort port) { |
+ if (port._port == null) { |
+ ports.add(port._futurePort); |
+ } |
+ } |
+} |