Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(205)

Unified Diff: sdk/lib/_internal/lib/isolate_helper.dart

Issue 43663003: dart2js. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/_internal/lib/isolate_helper.dart
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
index 8359a30042ffec4faec87402a1da0477f0c11e03..34bdfbe4606e6ae158c5d350dbe75bdd85517b71 100644
--- a/sdk/lib/_internal/lib/isolate_helper.dart
+++ b/sdk/lib/_internal/lib/isolate_helper.dart
@@ -58,7 +58,13 @@ void startRootIsolate(entry) {
// by having a "default" isolate (the first one created).
_globalState.currentContext = rootContext;
- rootContext.eval(entry);
+ if (entry is _MainFunctionArgs) {
+ rootContext.eval(() { entry([]); });
+ } else if (entry is _MainFunctionArgsMessage) {
+ rootContext.eval(() { entry([], null); });
+ } else {
+ rootContext.eval(entry);
Lasse Reichstein Nielsen 2013/10/25 14:28:24 Ack, it works. If it can accept only one argument,
+ }
_globalState.topEventLoop.run();
}
@@ -385,6 +391,10 @@ var globalWorker = JS('', "#.Worker", globalThis);
bool globalPostMessageDefined =
JS('', "#.postMessage !== (void 0)", globalThis);
+typedef _MainFunction();
+typedef _MainFunctionArgs(args);
+typedef _MainFunctionArgsMessage(args, message);
+
class IsolateNatives {
static String thisScript = computeThisScript();
@@ -468,10 +478,13 @@ class IsolateNatives {
Function entryPoint = (functionName == null)
? _globalState.entry
: _getJSFunctionFromName(functionName);
+ var args = msg['args'];
+ var message = _deserializeMessage(msg['msg']);
+ var isSpawnUri = msg['isSpawnUri'];
var replyTo = _deserializeMessage(msg['replyTo']);
var context = new _IsolateContext();
_globalState.topEventLoop.enqueue(context, () {
- _startIsolate(entryPoint, replyTo);
+ _startIsolate(entryPoint, args, message, isSpawnUri, replyTo);
}, 'worker-start');
// Make sure we always have a current context in this worker.
// TODO(7907): This is currently needed because we're using
@@ -483,13 +496,15 @@ class IsolateNatives {
_globalState.topEventLoop.run();
break;
case 'spawn-worker':
- _spawnWorker(msg['functionName'], msg['uri'], msg['replyPort']);
+ _spawnWorker(msg['functionName'], msg['uri'],
+ msg['args'], msg['msg'],
+ msg['isSpawnUri'], msg['replyPort']);
break;
case 'message':
SendPort port = msg['port'];
// If the port has been closed, we ignore the message.
if (port != null) {
- msg['port'].send(msg['msg'], msg['replyTo']);
+ msg['port'].send(msg['msg']);
}
_globalState.topEventLoop.run();
break;
@@ -550,19 +565,25 @@ class IsolateNatives {
return JS("", "new #()", ctor);
}
- static SendPort spawnFunction(void topLevelFunction()) {
+ static SendPort spawnFunction(void topLevelFunction(message), message) {
final name = _getJSFunctionName(topLevelFunction);
if (name == null) {
throw new UnsupportedError(
"only top-level functions can be spawned.");
}
- return spawn(name, null, false);
+ return spawn(name, null, null, message, false, false);
+ }
+
+ static SendPort spawnUri(Uri uri, List<String> args, message) {
+ return spawn(null, uri.path, args, message, false, true);
}
// TODO(sigmund): clean up above, after we make the new API the default:
/// If [uri] is `null` it is replaced with the current script.
- static spawn(String functionName, String uri, bool isLight) {
+ static SendPort spawn(String functionName, String uri,
+ List<String> args, message,
+ bool isLight, bool isSpawnUri) {
// Assume that the compiled version of the Dart file lives just next to the
// dart file.
// TODO(floitsch): support precompiled version of dart2js output.
@@ -570,60 +591,86 @@ class IsolateNatives {
Completer<SendPort> completer = new Completer<SendPort>.sync();
ReceivePort port = new ReceivePort();
- port.receive((msg, SendPort replyPort) {
+ port.listen((msg) {
port.close();
- assert(msg == _SPAWNED_SIGNAL);
- completer.complete(replyPort);
+ assert(msg[0] == _SPAWNED_SIGNAL);
+ completer.complete(msg[1]);
});
- SendPort signalReply = port.toSendPort();
+ SendPort signalReply = port.sendPort;
if (_globalState.useWorkers && !isLight) {
- _startWorker(functionName, uri, signalReply);
+ _startWorker(functionName, uri, args, message, isSpawnUri, signalReply);
} else {
- _startNonWorker(functionName, uri, signalReply);
+ _startNonWorker(
+ functionName, uri, args, message, isSpawnUri, signalReply);
}
return new _BufferingSendPort(
_globalState.currentContext.id, completer.future);
}
- static SendPort _startWorker(
- String functionName, String uri, SendPort replyPort) {
+ static void _startWorker(
+ String functionName, String uri,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyPort) {
if (_globalState.isWorker) {
_globalState.mainManager.postMessage(_serializeMessage({
'command': 'spawn-worker',
'functionName': functionName,
+ 'args': args,
+ 'msg': message,
'uri': uri,
+ 'isSpawnUri': isSpawnUri,
'replyPort': replyPort}));
} else {
- _spawnWorker(functionName, uri, replyPort);
+ _spawnWorker(functionName, uri, args, message, isSpawnUri, replyPort);
}
}
- static SendPort _startNonWorker(
- String functionName, String uri, SendPort replyPort) {
+ static void _startNonWorker(
+ String functionName, String uri,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyPort) {
// TODO(eub): support IE9 using an iframe -- Dart issue 1702.
- if (uri != null) throw new UnsupportedError(
- "Currently spawnUri is not supported without web workers.");
+ if (uri != null) {
+ throw new UnsupportedError(
+ "Currently spawnUri is not supported without web workers.");
+ }
_globalState.topEventLoop.enqueue(new _IsolateContext(), () {
final func = _getJSFunctionFromName(functionName);
- _startIsolate(func, replyPort);
+ _startIsolate(func, args, message, isSpawnUri, replyPort);
}, 'nonworker start');
}
- static void _startIsolate(Function topLevel, SendPort replyTo) {
+ static void _startIsolate(Function topLevel,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyTo) {
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
Primitives.initializeStatics(context.id);
lazyPort = new ReceivePort();
- replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
- topLevel();
+ replyTo.send([_SPAWNED_SIGNAL, lazyPort.sendPort]);
+ if (!isSpawnUri) {
+ topLevel(message);
+ } else if (topLevel is _MainFunctionArgsMessage) {
+ topLevel(args, message);
+ } else if (topLevel is _MainFunctionArgs) {
+ topLevel(args);
+ } else {
+ topLevel();
+ }
}
/**
* Spawns an isolate in a worker. [factoryName] is the Javascript constructor
* name for the isolate entry point class.
*/
- static void _spawnWorker(functionName, uri, replyPort) {
+ static void _spawnWorker(functionName, String uri,
+ List<String> args, message,
+ bool isSpawnUri,
+ SendPort replyPort) {
if (uri == null) uri = thisScript;
final worker = JS('var', 'new Worker(#)', uri);
@@ -644,6 +691,9 @@ class IsolateNatives {
// the port (port deserialization is sensitive to what is the current
// workerId).
'replyTo': _serializeMessage(replyPort),
+ 'args': args,
+ 'msg': _serializeMessage(message),
+ 'isSpawnUri': isSpawnUri,
'functionName': functionName }));
}
}
@@ -668,22 +718,7 @@ class _BaseSendPort implements SendPort {
}
}
- Future call(var message) {
- final completer = new Completer();
- final port = new ReceivePortImpl();
- send(message, port.toSendPort());
- port.receive((value, ignoreReplyTo) {
- port.close();
- if (value is Exception) {
- completer.completeError(value);
- } else {
- completer.complete(value);
- }
- });
- return completer.future;
- }
-
- void send(var message, [SendPort replyTo]);
+ void send(var message);
bool operator ==(var other);
int get hashCode;
}
@@ -694,13 +729,12 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- _checkReplyTo(replyTo);
+ void send(var message, [SendPort replyTo]) {
+ _waitForPendingPorts(message, () {
// Check that the isolate still runs and the port is still open
final isolate = _globalState.isolates[_isolateId];
if (isolate == null) return;
- if (_receivePort._callback == null) return;
+ if (_receivePort._controller.isClosed) return;
// We force serialization/deserialization as a simple way to ensure
// isolate communication restrictions are respected between isolates that
@@ -712,18 +746,15 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
final shouldSerialize = _globalState.currentContext != null
&& _globalState.currentContext.id != _isolateId;
var msg = message;
- var reply = replyTo;
if (shouldSerialize) {
msg = _serializeMessage(msg);
- reply = _serializeMessage(reply);
}
_globalState.topEventLoop.enqueue(isolate, () {
- if (_receivePort._callback != null) {
+ if (!_receivePort._controller.isClosed) {
if (shouldSerialize) {
msg = _deserializeMessage(msg);
- reply = _deserializeMessage(reply);
}
- _receivePort._callback(msg, reply);
+ _receivePort._controller.add(msg);
}
}, 'receive $message');
});
@@ -744,14 +775,12 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
: super(isolateId);
- void send(var message, [SendPort replyTo = null]) {
- _waitForPendingPorts([message, replyTo], () {
- _checkReplyTo(replyTo);
+ void send(var message, [SendPort replyTo]) {
+ _waitForPendingPorts(message, () {
final workerMessage = _serializeMessage({
'command': 'message',
'port': this,
- 'msg': message,
- 'replyTo': replyTo});
+ 'msg': message});
if (_globalState.isWorker) {
// Communication from one worker to another go through the
@@ -806,7 +835,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
_futurePort.then((p) {
_port = p;
for (final item in pending) {
- p.send(item['message'], item['replyTo']);
+ p.send(item);
}
pending = null;
});
@@ -821,7 +850,7 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
if (_port != null) {
_port.send(message, replyTo);
} else {
- pending.add({'message': message, 'replyTo': replyTo});
+ pending.add(message);
}
}
@@ -831,26 +860,32 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
}
/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
-class ReceivePortImpl implements ReceivePort {
- int _id;
- Function _callback;
+class ReceivePortImpl extends Stream implements ReceivePort {
static int _nextFreeId = 1;
+ final int _id;
+ StreamController _controller;
ReceivePortImpl()
: _id = _nextFreeId++ {
+ _controller = new StreamController(onCancel: close, sync: true);
_globalState.currentContext.register(_id, this);
}
- void receive(void onMessage(var message, SendPort replyTo)) {
- _callback = onMessage;
+ StreamSubscription listen(void onData(var event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
}
void close() {
- _callback = null;
+ if (_controller.isClosed) return;
+ _controller.close();
_globalState.currentContext.unregister(_id);
}
- SendPort toSendPort() {
+ SendPort get sendPort {
return new _NativeJsSendPort(this, _globalState.currentContext.id);
}
}
@@ -876,9 +911,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
final seen = _visited[list];
if (seen != null) return;
_visited[list] = true;
- // TODO(sigmund): replace with the following: (bug #1660)
- // list.forEach(_dispatch);
- list.forEach((e) => _dispatch(e));
+ list.forEach(_dispatch);
}
visitMap(Map map) {
@@ -886,9 +919,7 @@ class _PendingSendPortFinder extends _MessageTraverser {
if (seen != null) return;
_visited[map] = true;
- // TODO(sigmund): replace with the following: (bug #1660)
- // map.values.forEach(_dispatch);
- map.values.forEach((e) => _dispatch(e));
+ map.values.forEach(_dispatch);
}
visitSendPort(var port) {

Powered by Google App Engine
This is Rietveld 408576698