Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(807)

Unified Diff: lib/isolate/frog/ports.dart

Issue 9422019: isolates refactor: this change introduces 'dart:isolate' as a library. This is a (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: '' Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/isolate/frog/natives.js ('k') | lib/isolate/isolate_api.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/isolate/frog/ports.dart
diff --git a/lib/isolate/frog/ports.dart b/lib/isolate/frog/ports.dart
new file mode 100644
index 0000000000000000000000000000000000000000..5504b7470b7f444fa3587cd8725dd7fd63781e94
--- /dev/null
+++ b/lib/isolate/frog/ports.dart
@@ -0,0 +1,284 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/** Common functionality to all send ports. */
+class _BaseSendPort implements SendPort {
+ /** Id for the destination isolate. */
+ final int _isolateId;
+
+ _BaseSendPort(this._isolateId);
+
+ _ReceivePortSingleShotImpl call(var message) {
+ final result = new _ReceivePortSingleShotImpl();
+ this.send(message, result.toSendPort());
+ return result;
+ }
+
+ static void checkReplyTo(SendPort replyTo) {
+ if (replyTo !== null
+ && replyTo is! _NativeJsSendPort
+ && replyTo is! _WorkerSendPort
+ && replyTo is! _BufferingSendPort) {
+ throw new Exception("SendPort.send: Illegal replyTo port type");
+ }
+ }
+
+ // TODO(sigmund): replace the current SendPort.call with the following:
+ //Future call(var message) {
+ //  final completer = new Completer();
+ //  final port = new _ReceivePort.singleShot();
+ //  send(message, port.toSendPort());
+ //  port.receive((value, ignoreReplyTo) {
+ //    if (value is Exception) {
+ //  completer.completeException(value);
+ // } else {
+ // completer.complete(value);
+ // }
+ // });
+ //  return completer.future;
+ //}
+
+ abstract void send(var message, [SendPort replyTo]);
+ abstract bool operator ==(var other);
+ abstract int hashCode();
+}
+
+/** A send port that delivers messages in-memory via native JavaScript calls. */
+class _NativeJsSendPort extends _BaseSendPort implements SendPort {
+ final _ReceivePortImpl _receivePort;
+
+ const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
+
+ void send(var message, [SendPort replyTo = null]) {
+ _waitForPendingPorts([message, replyTo], () {
+ checkReplyTo(replyTo);
+ // Check that the isolate still runs and the port is still open
+ final isolate = globalState.isolates[_isolateId];
+ if (isolate == null) return;
+ if (_receivePort._callback == null) return;
+
+ // We force serialization/deserialization as a simple way to ensure
+ // isolate communication restrictions are respected between isolates that
+ // live in the same worker. [_NativeJsSendPort] delivers both messages
+ // from the same worker and messages from other workers. In particular,
+ // messages sent from a worker via a [_WorkerSendPort] are received at
+ // [_processWorkerMessage] and forwarded to a native port. In such cases,
+ // here we'll see [globalState.currentContext == null].
+ final shouldSerialize = globalState.currentContext != null
+ && globalState.currentContext.id != _isolateId;
+ var msg = message;
+ var reply = replyTo;
+ if (shouldSerialize) {
+ msg = _serializeMessage(msg);
+ reply = _serializeMessage(reply);
+ }
+ globalState.topEventLoop.enqueue(isolate, () {
+ if (_receivePort._callback != null) {
+ if (shouldSerialize) {
+ msg = _deserializeMessage(msg);
+ reply = _deserializeMessage(reply);
+ }
+ _receivePort._callback(msg, reply);
+ }
+ }, 'receive ' + message);
+ });
+ }
+
+ bool operator ==(var other) => (other is _NativeJsSendPort) &&
+ (_receivePort == other._receivePort);
+
+ int hashCode() => _receivePort._id;
+}
+
+/** A send port that delivers messages via worker.postMessage. */
+class _WorkerSendPort extends _BaseSendPort implements SendPort {
+ final int _workerId;
+ final int _receivePortId;
+
+ const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
+ : super(isolateId);
+
+ void send(var message, [SendPort replyTo = null]) {
+ _waitForPendingPorts([message, replyTo], () {
+ checkReplyTo(replyTo);
+ final workerMessage = _serializeMessage({
+ 'command': 'message',
+ 'port': this,
+ 'msg': message,
+ 'replyTo': replyTo});
+
+ if (globalState.isWorker) {
+ // communication from one worker to another go through the main worker:
+ globalState.mainWorker.postMessage(workerMessage);
+ } else {
+ globalState.workers[_workerId].postMessage(workerMessage);
+ }
+ });
+ }
+
+ bool operator ==(var other) {
+ return (other is _WorkerSendPort) &&
+ (_workerId == other._workerId) &&
+ (_isolateId == other._isolateId) &&
+ (_receivePortId == other._receivePortId);
+ }
+
+ int hashCode() {
+ // TODO(sigmund): use a standard hash when we get one available in corelib.
+ return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId;
+ }
+}
+
+/** A port that buffers messages until an underlying port gets resolved. */
+class _BufferingSendPort extends _BaseSendPort implements SendPort {
+ /** Internal counter to assign unique ids to each port. */
+ static int _idCount = 0;
+
+ /** For implementing equals and hashcode. */
+ final int _id;
+
+ /** Underlying port, when resolved. */
+ SendPort _port;
+
+ /**
+ * Future of the underlying port, so that we can detect when this port can be
+ * sent on messages.
+ */
+ Future<SendPort> _futurePort;
+
+ /** Pending messages (and reply ports). */
+ List pending;
+
+ _BufferingSendPort(isolateId, this._futurePort)
+ : super(isolateId), _id = _idCount, pending = [] {
+ _idCount++;
+ _futurePort.then((p) {
+ _port = p;
+ for (final item in pending) {
+ p.send(item['message'], item['replyTo']);
+ }
+ pending = null;
+ });
+ }
+
+ _BufferingSendPort.fromPort(isolateId, this._port)
+ : super(isolateId), _id = _idCount {
+ _idCount++;
+ }
+
+ void send(var message, [SendPort replyTo]) {
+ if (_port != null) {
+ _port.send(message, replyTo);
+ } else {
+ pending.add({'message': message, 'replyTo': replyTo});
+ }
+ }
+
+ bool operator ==(var other) =>
+ other is _BufferingSendPort && _id == other._id;
+ int hashCode() => _id;
+}
+
+/** Default factory for receive ports. */
+class _ReceivePortFactory {
+
+ factory ReceivePort() {
+ return new _ReceivePortImpl();
+ }
+
+ factory ReceivePort.singleShot() {
+ return new _ReceivePortSingleShotImpl();
+ }
+}
+
+/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
+class _ReceivePortImpl implements ReceivePort {
+ int _id;
+ Function _callback;
+ static int _nextFreeId = 1;
+
+ _ReceivePortImpl()
+ : _id = _nextFreeId++ {
+ globalState.currentContext.register(_id, this);
+ }
+
+ void receive(void onMessage(var message, SendPort replyTo)) {
+ _callback = onMessage;
+ }
+
+ void close() {
+ _callback = null;
+ globalState.currentContext.unregister(_id);
+ }
+
+ SendPort toSendPort() {
+ return new _NativeJsSendPort(this, globalState.currentContext.id);
+ }
+}
+
+/** Implementation of a single-shot [ReceivePort]. */
+class _ReceivePortSingleShotImpl implements ReceivePort {
+
+ _ReceivePortSingleShotImpl() : _port = new _ReceivePortImpl() { }
+
+ void receive(void callback(var message, SendPort replyTo)) {
+ _port.receive((var message, SendPort replyTo) {
+ _port.close();
+ callback(message, replyTo);
+ });
+ }
+
+ void close() {
+ _port.close();
+ }
+
+ SendPort toSendPort() => _port.toSendPort();
+
+ final _ReceivePortImpl _port;
+}
+
+/** Wait until all ports in a message are resolved. */
+_waitForPendingPorts(var message, void callback()) {
+ final finder = new _PendingSendPortFinder();
+ finder.traverse(message);
+ Futures.wait(finder.ports).then((_) => callback());
+}
+
+
+/** Visitor that finds all unresolved [SendPort]s in a message. */
+class _PendingSendPortFinder extends _MessageTraverser {
+ List<Future<SendPort>> ports;
+ _PendingSendPortFinder() : super(), ports = [];
+
+ visitPrimitive(x) {}
+ visitNativeJsSendPort(_NativeJsSendPort port) {}
+ visitWorkerSendPort(_WorkerSendPort port) {}
+ visitReceivePort(_ReceivePortImpl port) {}
+ visitReceivePortSingleShot(_ReceivePortSingleShotImpl port) {}
+
+ visitList(List list) {
+ final visited = _getInfo(list);
+ if (visited !== null) return;
+ _attachInfo(list, true);
+ // TODO(sigmund): replace with the following: (bug #1660)
+ // list.forEach(_dispatch);
+ list.forEach((e) => _dispatch(e));
+ }
+
+ visitMap(Map map) {
+ final visited = _getInfo(map);
+ if (visited !== null) return;
+
+ _attachInfo(map, true);
+ // TODO(sigmund): replace with the following: (bug #1660)
+ // map.getValues().forEach(_dispatch);
+ map.getValues().forEach((e) => _dispatch(e));
+ }
+
+ visitBufferingSendPort(_BufferingSendPort port) {
+ if (port._port == null) {
+ ports.add(port._futurePort);
+ }
+ }
+}
« no previous file with comments | « lib/isolate/frog/natives.js ('k') | lib/isolate/isolate_api.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698