Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(540)

Unified Diff: lib/isolate/frog/isolateimpl.dart

Issue 9422019: isolates refactor: this change introduces 'dart:isolate' as a library. This is a (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: '' Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/isolate/frog/isolateimpl.dart
diff --git a/frog/lib/isolate.dart b/lib/isolate/frog/isolateimpl.dart
similarity index 58%
rename from frog/lib/isolate.dart
rename to lib/isolate/frog/isolateimpl.dart
index 8b68c1338bfeaed3c511c6423015af2562920dd4..00adae66ea1593042e03e2781a6566872252da88 100644
--- a/frog/lib/isolate.dart
+++ b/lib/isolate/frog/isolateimpl.dart
@@ -2,49 +2,38 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+#library("isolateimpl");
+
+#import("dart:isolate");
+#import("messages.dart", prefix: 'messages');
+#import("ports.dart", prefix: 'ports');
+#native("natives.js");
+
+/** Implementation of [Isolate2]. */
+class Isolate2Impl implements Isolate2 {
+ SendPort sendPort;
+
+ Isolate2Impl(this.sendPort);
+}
+
/**
* A native object that is shared across isolates. This object is visible to all
* isolates running on the same worker (either UI or background web worker).
*
* This is code that is intended to 'escape' the isolate boundaries in order to
- * implement the semantics of friendly isolates in JavaScript. Without this we
- * would have been forced to implement more code (including the top-level event
- * loop) in JavaScript itself.
- */
-GlobalState get _globalState() native "return \$globalState;";
-set _globalState(GlobalState val) native "\$globalState = val;";
-
-/**
- * Wrapper that takes the dart entry point and runs it within an isolate. The
- * frog compiler will inject a call of the form [: startRootIsolate(main); :]
- * when it determines that this wrapping is needed. For single-isolate
- * applications (e.g. hello world), this call is not emitted.
+ * implement the semantics of isolates in JavaScript. Without this we would have
+ * been forced to implement more code (including the top-level event loop) in
+ * JavaScript itself.
*/
-void startRootIsolate(entry) {
- _globalState = new GlobalState();
-
- // Don't start the main loop again, if we are in a worker.
- if (_globalState.isWorker) return;
- final rootContext = new IsolateContext();
- _globalState.rootContext = rootContext;
- _fillStatics(rootContext);
-
- // BUG(5151491): Setting currentContext should not be necessary, but
- // because closures passed to the DOM as event handlers do not bind their
- // isolate automatically we try to give them a reasonable context to live in
- // by having a "default" isolate (the first one created).
- _globalState.currentContext = rootContext;
-
- rootContext.eval(entry);
- _globalState.topEventLoop.run();
-}
+GlobalState get globalState() native "return \$globalState;";
+set globalState(GlobalState val) native "\$globalState = val;";
-void _fillStatics(context) native @"""
+void fillStatics(context) native @"""
$globals = context.isolateStatics;
$static_init();
""";
-/** Global state associated with the current worker. See [_globalState]. */
+/** Global state associated with the current worker. See [globalState]. */
// TODO(sigmund): split in multiple classes: global, thread, main-worker states?
class GlobalState {
@@ -102,7 +91,7 @@ class GlobalState {
MainWorker mainWorker;
/** Registry of active workers. Only used in the main worker. */
- Map<int, var> workers;
+ Map<int, Dynamic> workers;
GlobalState() {
topEventLoop = new EventLoop();
@@ -137,7 +126,7 @@ class GlobalState {
if (isWorker) {
if (!isolates.isEmpty()) return;
mainWorker.postMessage(
- _serializeMessage({'command': 'close'}));
+ messages.serialize({'command': 'close'}));
} else if (isolates.containsKey(rootContext.id) && workers.isEmpty() &&
!supportsWorkers && !inWindow) {
// This should only trigger when running on the command-line.
@@ -148,51 +137,6 @@ class GlobalState {
}
}
-_serializeMessage(message) {
- if (_globalState.needSerialization) {
- return new Serializer().traverse(message);
- } else {
- return new Copier().traverse(message);
- }
-}
-
-_deserializeMessage(message) {
- if (_globalState.needSerialization) {
- return new Deserializer().deserialize(message);
- } else {
- // Nothing more to do.
- return message;
- }
-}
-
-/** Wait until all ports in a message are resolved. */
-_waitForPendingPorts(var message, void callback()) {
- final finder = new PendingSendPortFinder();
- finder.traverse(message);
- Futures.wait(finder.ports).then((_) => callback());
-}
-
-/** Default worker. */
-class MainWorker {
- int id = 0;
- void postMessage(msg) native "return \$globalThis.postMessage(msg);";
- void set onmessage(f) native "\$globalThis.onmessage = f;";
- void terminate() {}
-}
-
-/**
- * A web worker. This type is also defined in 'dart:dom', but we define it here
- * to avoid introducing a dependency from corelib to dom. This definition uses a
- * 'hidden' type (* prefix on the native name) to enforce that the type is
- * defined dynamically only when web workers are actually available.
- */
-class _Worker native "*Worker" {
- get id() native "return this.id;";
- void set id(i) native "this.id = i;";
- void set onmessage(f) native "this.onmessage = f;";
- void postMessage(msg) native "return this.postMessage(msg);";
-}
-
/** Context information tracked for each isolate. */
class IsolateContext {
/** Current isolate id. */
@@ -205,7 +149,7 @@ class IsolateContext {
var isolateStatics; // native object containing all globals of an isolate.
IsolateContext() {
- id = _globalState.nextIsolateId++;
+ id = globalState.nextIsolateId++;
ports = {};
initGlobals();
}
@@ -218,14 +162,14 @@ class IsolateContext {
* is called from JavaScript (see $wrap_call in corejs.dart).
*/
void eval(Function code) {
- var old = _globalState.currentContext;
- _globalState.currentContext = this;
+ var old = globalState.currentContext;
+ globalState.currentContext = this;
this._setGlobals();
var result = null;
try {
result = code();
} finally {
- _globalState.currentContext = old;
+ globalState.currentContext = old;
if (old != null) old._setGlobals();
}
return result;
@@ -242,18 +186,19 @@ class IsolateContext {
throw new Exception("Registry: ports must be registered only once.");
}
ports[portId] = port;
- _globalState.isolates[id] = this; // indicate this isolate is active
+ globalState.isolates[id] = this; // indicate this isolate is active
}
/** Unregister a port on this isolate. */
void unregister(int portId) {
ports.remove(portId);
if (ports.isEmpty()) {
- _globalState.isolates.remove(id); // indicate this isolate is not active
+ globalState.isolates.remove(id); // indicate this isolate is not active
}
}
}
+
/** Represent the event loop on a javascript thread (DOM or worker). */
class EventLoop {
Queue<IsolateEvent> events;
@@ -273,7 +218,7 @@ class EventLoop {
bool runIteration() {
final event = dequeue();
if (event == null) {
- _globalState.closeWorker();
+ globalState.closeWorker();
return false;
}
event.process();
@@ -310,13 +255,13 @@ class EventLoop {
* this is called from JavaScript (see $wrap_call in corejs.dart).
*/
void run() {
- if (!_globalState.isWorker) {
+ if (!globalState.isWorker) {
_runHelper();
} else {
try {
_runHelper();
} catch(var e, var trace) {
- _globalState.mainWorker.postMessage(_serializeMessage(
+ globalState.mainWorker.postMessage(messages.serialize(
{'command': 'error', 'msg': '$e\n$trace' }));
}
}
@@ -336,239 +281,26 @@ class IsolateEvent {
}
}
-/** Common functionality to all send ports. */
-class BaseSendPort implements SendPort {
- /** Id for the destination isolate. */
- final int _isolateId;
-
- BaseSendPort(this._isolateId);
-
- ReceivePortSingleShotImpl call(var message) {
- final result = new ReceivePortSingleShotImpl();
- this.send(message, result.toSendPort());
- return result;
- }
-
- static void checkReplyTo(SendPort replyTo) {
- if (replyTo !== null
- && replyTo is! NativeJsSendPort
- && replyTo is! WorkerSendPort
- && replyTo is! BufferingSendPort) {
- throw new Exception("SendPort.send: Illegal replyTo port type");
- }
- }
-
- // TODO(sigmund): replace the current SendPort.call with the following:
- //Future call(var message) {
- //  final completer = new Completer();
- //  final port = new ReceivePort.singleShot();
- //  send(message, port.toSendPort());
- //  port.receive((value, ignoreReplyTo) {
- //    if (value is Exception) {
- //  completer.completeException(value);
- // } else {
- // completer.complete(value);
- // }
- // });
- //  return completer.future;
- //}
-
- abstract void send(var message, [SendPort replyTo]);
- abstract bool operator ==(var other);
- abstract int hashCode();
-}
-
-/** A send port that delivers messages in-memory via native JavaScript calls. */
-class NativeJsSendPort extends BaseSendPort implements SendPort {
- final ReceivePortImpl _receivePort;
-
- const NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
-
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- checkReplyTo(replyTo);
- // Check that the isolate still runs and the port is still open
- final isolate = _globalState.isolates[_isolateId];
- if (isolate == null) return;
- if (_receivePort._callback == null) return;
-
- // We force serialization/deserialization as a simple way to ensure
- // isolate communication restrictions are respected between isolates that
- // live in the same worker. NativeJsSendPort delivers both messages from
- // the same worker and messages from other workers. In particular,
- // messages sent from a worker via a WorkerSendPort are received at
- // [_processWorkerMessage] and forwarded to a native port. In such cases,
- // here we'll see [_globalState.currentContext == null].
- final shouldSerialize = _globalState.currentContext != null
- && _globalState.currentContext.id != _isolateId;
- var msg = message;
- var reply = replyTo;
- if (shouldSerialize) {
- msg = _serializeMessage(msg);
- reply = _serializeMessage(reply);
- }
- _globalState.topEventLoop.enqueue(isolate, () {
- if (_receivePort._callback != null) {
- if (shouldSerialize) {
- msg = _deserializeMessage(msg);
- reply = _deserializeMessage(reply);
- }
- _receivePort._callback(msg, reply);
- }
- }, 'receive ' + message);
- });
- }
-
- bool operator ==(var other) => (other is NativeJsSendPort) &&
- (_receivePort == other._receivePort);
-
- int hashCode() => _receivePort._id;
-}
-
-/** A send port that delivers messages via worker.postMessage. */
-class WorkerSendPort extends BaseSendPort implements SendPort {
- final int _workerId;
- final int _receivePortId;
-
- const WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
- : super(isolateId);
-
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- checkReplyTo(replyTo);
- final workerMessage = _serializeMessage({
- 'command': 'message',
- 'port': this,
- 'msg': message,
- 'replyTo': replyTo});
-
- if (_globalState.isWorker) {
- // communication from one worker to another go through the main worker:
- _globalState.mainWorker.postMessage(workerMessage);
- } else {
- _globalState.workers[_workerId].postMessage(workerMessage);
- }
- });
- }
-
- bool operator ==(var other) {
- return (other is WorkerSendPort) &&
- (_workerId == other._workerId) &&
- (_isolateId == other._isolateId) &&
- (_receivePortId == other._receivePortId);
- }
-
- int hashCode() {
- // TODO(sigmund): use a standard hash when we get one available in corelib.
- return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId;
- }
-}
-
-/** A port that buffers messages until an underlying port gets resolved. */
-class BufferingSendPort extends BaseSendPort implements SendPort {
- /** Internal counter to assign unique ids to each port. */
- static int _idCount = 0;
- /** For implementing equals and hashcode. */
- final int _id;
-
- /** Underlying port, when resolved. */
- SendPort _port;
-
- /**
- * Future of the underlying port, so that we can detect when this port can be
- * sent on messages.
- */
- Future<SendPort> _futurePort;
-
- /** Pending messages (and reply ports). */
- List pending;
-
- BufferingSendPort(isolateId, this._futurePort)
- : super(isolateId), _id = _idCount, pending = [] {
- _idCount++;
- _futurePort.then((p) {
- _port = p;
- for (final item in pending) {
- p.send(item['message'], item['replyTo']);
- }
- pending = null;
- });
- }
-
- BufferingSendPort.fromPort(isolateId, this._port)
- : super(isolateId), _id = _idCount {
- _idCount++;
- }
-
- void send(var message, [SendPort replyTo]) {
- if (_port != null) {
- _port.send(message, replyTo);
- } else {
- pending.add({'message': message, 'replyTo': replyTo});
- }
- }
-
- bool operator ==(var other) => other is BufferingSendPort && _id == other._id;
- int hashCode() => _id;
-}
-
-/** Default factory for receive ports. */
-class ReceivePortFactory {
-
- factory ReceivePort() {
- return new ReceivePortImpl();
- }
-
- factory ReceivePort.singleShot() {
- return new ReceivePortSingleShotImpl();
- }
-}
-
-/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
-class ReceivePortImpl implements ReceivePort {
- int _id;
- Function _callback;
- static int _nextFreeId = 1;
-
- ReceivePortImpl()
- : _id = _nextFreeId++ {
- _globalState.currentContext.register(_id, this);
- }
-
- void receive(void onMessage(var message, SendPort replyTo)) {
- _callback = onMessage;
- }
-
- void close() {
- _callback = null;
- _globalState.currentContext.unregister(_id);
- }
-
- SendPort toSendPort() {
- return new NativeJsSendPort(this, _globalState.currentContext.id);
- }
+/** Default worker. */
+class MainWorker {
+ int id = 0;
+ void postMessage(msg) native "return \$globalThis.postMessage(msg);";
+ void set onmessage(f) native "\$globalThis.onmessage = f;";
+ void terminate() {}
}
-/** Implementation of a single-shot [ReceivePort]. */
-class ReceivePortSingleShotImpl implements ReceivePort {
-
- ReceivePortSingleShotImpl() : _port = new ReceivePortImpl() { }
-
- void receive(void callback(var message, SendPort replyTo)) {
- _port.receive((var message, SendPort replyTo) {
- _port.close();
- callback(message, replyTo);
- });
- }
-
- void close() {
- _port.close();
- }
-
- SendPort toSendPort() => _port.toSendPort();
-
- final ReceivePortImpl _port;
+/**
+ * A web worker. This type is also defined in 'dart:dom', but we define it here
+ * to avoid introducing a dependency from corelib to dom. This definition uses a
+ * 'hidden' type (* prefix on the native name) to enforce that the type is
+ * defined dynamically only when web workers are actually available.
+ */
+class _Worker native "*Worker" {
+ get id() native "return this.id;";
+ void set id(i) native "this.id = i;";
+ void set onmessage(f) native "this.onmessage = f;";
+ void postMessage(msg) native "return this.postMessage(msg);";
}
final String _SPAWNED_SIGNAL = "spawned";
@@ -586,7 +318,7 @@ class IsolateNatives {
// TODO(floitsch): throw exception if isolate's class doesn't have a
// default constructor.
- if (_globalState.useWorkers && !isLight) {
+ if (globalState.useWorkers && !isLight) {
_startWorker(isolate, port.toSendPort());
} else {
_startNonWorker(isolate, port.toSendPort());
@@ -597,13 +329,13 @@ class IsolateNatives {
static SendPort _startWorker(Isolate runnable, SendPort replyPort) {
var factoryName = _getJSConstructorName(runnable);
- if (_globalState.isWorker) {
- _globalState.mainWorker.postMessage(_serializeMessage({
+ if (globalState.isWorker) {
+ globalState.mainWorker.postMessage(messages.serialize({
'command': 'spawn-worker',
'factoryName': factoryName,
- 'replyPort': _serializeMessage(replyPort)}));
+ 'replyPort': messages.serialize(replyPort)}));
} else {
- _spawnWorker(factoryName, _serializeMessage(replyPort));
+ _spawnWorker(factoryName, messages.serialize(replyPort));
}
}
@@ -647,11 +379,11 @@ class IsolateNatives {
static void _spawnWorker(factoryName, serializedReplyPort) {
final worker = _newWorker(_thisScript);
worker.onmessage = (e) { _processWorkerMessage(worker, e); };
- var workerId = _globalState.nextWorkerId++;
+ var workerId = globalState.nextWorkerId++;
// We also store the id on the worker itself so that we can unregister it.
worker.id = workerId;
- _globalState.workers[workerId] = worker;
- worker.postMessage(_serializeMessage({
+ globalState.workers[workerId] = worker;
+ worker.postMessage(messages.serialize({
'command': 'start',
'id': workerId,
'replyTo': serializedReplyPort,
@@ -670,28 +402,28 @@ class IsolateNatives {
* pass messages along to the isolate running in the worker.
*/
static void _processWorkerMessage(sender, e) {
- var msg = _deserializeMessage(_getEventData(e));
+ var msg = messages.deserialize(_getEventData(e));
switch (msg['command']) {
// TODO(sigmund): delete after we migrate to Isolate2
case 'start':
- _globalState.currentWorkerId = msg['id'];
+ globalState.currentWorkerId = msg['id'];
var runnerObject =
_allocate(_getJSConstructorFromName(msg['factoryName']));
var serializedReplyTo = msg['replyTo'];
- _globalState.topEventLoop.enqueue(new IsolateContext(), function() {
- var replyTo = _deserializeMessage(serializedReplyTo);
+ globalState.topEventLoop.enqueue(new IsolateContext(), function() {
+ var replyTo = messages.deserialize(serializedReplyTo);
_startIsolate(runnerObject, replyTo);
}, 'worker-start');
- _globalState.topEventLoop.run();
+ globalState.topEventLoop.run();
break;
case 'start2':
- _globalState.currentWorkerId = msg['id'];
+ globalState.currentWorkerId = msg['id'];
Function entryPoint = _getJSFunctionFromName(msg['functionName']);
- var replyTo = _deserializeMessage(msg['replyTo']);
- _globalState.topEventLoop.enqueue(new IsolateContext(), function() {
+ var replyTo = messages.deserialize(msg['replyTo']);
+ globalState.topEventLoop.enqueue(new IsolateContext(), function() {
_startIsolate2(entryPoint, replyTo);
}, 'worker-start');
- _globalState.topEventLoop.run();
+ globalState.topEventLoop.run();
break;
// TODO(sigmund): delete after we migrate to Isolate2
case 'spawn-worker':
@@ -702,21 +434,21 @@ class IsolateNatives {
break;
case 'message':
msg['port'].send(msg['msg'], msg['replyTo']);
- _globalState.topEventLoop.run();
+ globalState.topEventLoop.run();
break;
case 'close':
_log("Closing Worker");
- _globalState.workers.remove(sender.id);
+ globalState.workers.remove(sender.id);
sender.terminate();
- _globalState.topEventLoop.run();
+ globalState.topEventLoop.run();
break;
case 'log':
_log(msg['msg']);
break;
case 'print':
- if (_globalState.isWorker) {
- _globalState.mainWorker.postMessage(
- _serializeMessage({'command': 'print', 'msg': msg}));
+ if (globalState.isWorker) {
+ globalState.mainWorker.postMessage(
+ messages.serialize({'command': 'print', 'msg': msg}));
} else {
print(msg['msg']);
}
@@ -728,9 +460,9 @@ class IsolateNatives {
/** Log a message, forwarding to the main worker if appropriate. */
static _log(msg) {
- if (_globalState.isWorker) {
- _globalState.mainWorker.postMessage(
- _serializeMessage({'command': 'log', 'msg': msg }));
+ if (globalState.isWorker) {
+ globalState.mainWorker.postMessage(
+ messages.serialize({'command': 'log', 'msg': msg }));
} else {
try {
_consoleLog(msg);
@@ -781,14 +513,14 @@ class IsolateNatives {
// isolate. This way, we do not get cross-isolate references
// through the runnable.
final ctor = _getJSConstructor(runnable);
- _globalState.topEventLoop.enqueue(spawned, function() {
+ globalState.topEventLoop.enqueue(spawned, function() {
_startIsolate(_allocate(ctor), replyTo);
}, 'nonworker start');
}
/** Given a ready-to-start runnable, start running it. */
static void _startIsolate(Isolate isolate, SendPort replyTo) {
- _fillStatics(_globalState.currentContext);
+ fillStatics(globalState.currentContext);
ReceivePort port = new ReceivePort();
replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
isolate._run(port);
@@ -806,19 +538,19 @@ class IsolateNatives {
SendPort signalReply = port.toSendPort();
- if (_globalState.useWorkers && !isLight) {
+ if (globalState.useWorkers && !isLight) {
_startWorker2(functionName, uri, signalReply);
} else {
_startNonWorker2(functionName, uri, signalReply);
}
- return new BufferingSendPort(
- _globalState.currentContext.id, completer.future);
+ return new ports.BufferingSendPort(
+ globalState.currentContext.id, completer.future);
}
static SendPort _startWorker2(
String functionName, String uri, SendPort replyPort) {
- if (_globalState.isWorker) {
- _globalState.mainWorker.postMessage(_serializeMessage({
+ if (globalState.isWorker) {
+ globalState.mainWorker.postMessage(messages.serialize({
'command': 'spawn-worker2',
'functionName': functionName,
'uri': uri,
@@ -833,14 +565,14 @@ class IsolateNatives {
// TODO(eub): support IE9 using an iframe -- Dart issue 1702.
if (uri != null) throw new UnsupportedOperationException(
"Currently Isolate2.fromUri is not supported without web workers.");
- _globalState.topEventLoop.enqueue(new IsolateContext(), function() {
+ globalState.topEventLoop.enqueue(new IsolateContext(), function() {
final func = _getJSFunctionFromName(functionName);
_startIsolate2(func, replyPort);
}, 'nonworker start');
}
static void _startIsolate2(Function topLevel, SendPort replyTo) {
- _fillStatics(_globalState.currentContext);
+ fillStatics(globalState.currentContext);
final port = new ReceivePort();
replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
topLevel(port);
@@ -856,42 +588,18 @@ class IsolateNatives {
if (uri == null) uri = _thisScript;
final worker = _newWorker(uri);
worker.onmessage = (e) { _processWorkerMessage(worker, e); };
- var workerId = _globalState.nextWorkerId++;
+ var workerId = globalState.nextWorkerId++;
// We also store the id on the worker itself so that we can unregister it.
worker.id = workerId;
- _globalState.workers[workerId] = worker;
- worker.postMessage(_serializeMessage({
+ globalState.workers[workerId] = worker;
+ worker.postMessage(messages.serialize({
'command': 'start2',
'id': workerId,
// Note: we serialize replyPort twice because the child worker needs to
// first deserialize the worker id, before it can correctly deserialize
// the port (port deserialization is sensitive to what is the current
// workerId).
- 'replyTo': _serializeMessage(replyPort),
+ 'replyTo': messages.serialize(replyPort),
'functionName': functionName }));
}
}
-
-class Isolate2Impl implements Isolate2 {
- SendPort sendPort;
-
- Isolate2Impl(this.sendPort);
-
- void stop() {}
-}
-
-class IsolateFactory implements Isolate2 {
-
- factory Isolate2.fromCode(Function topLevelFunction) {
- final name = IsolateNatives._getJSFunctionName(topLevelFunction);
- if (name == null) {
- throw new UnsupportedOperationException(
- "only top-level functions can be spawned.");
- }
- return new Isolate2Impl(IsolateNatives._spawn2(name, null, false));
- }
-
- factory Isolate2.fromUri(String uri) {
- return new Isolate2Impl(IsolateNatives._spawn2(null, uri, false));
- }
-}

Powered by Google App Engine
This is Rietveld 408576698