Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(479)

Side by Side Diff: lib/isolate/frog/isolateimpl.dart

Issue 9422019: isolates refactor: this change introduces 'dart:isolate' as a library. This is a (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: '' Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « lib/isolate/frog/compiler_hooks.dart ('k') | lib/isolate/frog/messages.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /** Implementation of [Isolate2]. */
6 class _Isolate2Impl implements Isolate2 {
7 SendPort sendPort;
8
9 _Isolate2Impl(this.sendPort);
10 }
11
5 /** 12 /**
6 * A native object that is shared across isolates. This object is visible to all 13 * A native object that is shared across isolates. This object is visible to all
7 * isolates running on the same worker (either UI or background web worker). 14 * isolates running on the same worker (either UI or background web worker).
8 * 15 *
9 * This is code that is intended to 'escape' the isolate boundaries in order to 16 * This is code that is intended to 'escape' the isolate boundaries in order to
10 * implement the semantics of friendly isolates in JavaScript. Without this we 17 * implement the semantics of isolates in JavaScript. Without this we would have
11 * would have been forced to implement more code (including the top-level event 18 * been forced to implement more code (including the top-level event loop) in
12 * loop) in JavaScript itself. 19 * JavaScript itself.
13 */ 20 */
14 GlobalState get _globalState() native "return \$globalState;"; 21 GlobalState get globalState() native "return \$globalState;";
15 set _globalState(GlobalState val) native "\$globalState = val;"; 22 set globalState(GlobalState val) native "\$globalState = val;";
16 23
17 /** 24 void fillStatics(context) native @"""
18 * Wrapper that takes the dart entry point and runs it within an isolate. The
19 * frog compiler will inject a call of the form [: startRootIsolate(main); :]
20 * when it determines that this wrapping is needed. For single-isolate
21 * applications (e.g. hello world), this call is not emitted.
22 */
23 void startRootIsolate(entry) {
24 _globalState = new GlobalState();
25
26 // Don't start the main loop again, if we are in a worker.
27 if (_globalState.isWorker) return;
28 final rootContext = new IsolateContext();
29 _globalState.rootContext = rootContext;
30 _fillStatics(rootContext);
31
32 // BUG(5151491): Setting currentContext should not be necessary, but
33 // because closures passed to the DOM as event handlers do not bind their
34 // isolate automatically we try to give them a reasonable context to live in
35 // by having a "default" isolate (the first one created).
36 _globalState.currentContext = rootContext;
37
38 rootContext.eval(entry);
39 _globalState.topEventLoop.run();
40 }
41
42 void _fillStatics(context) native @"""
43 $globals = context.isolateStatics; 25 $globals = context.isolateStatics;
44 $static_init(); 26 $static_init();
45 """; 27 """;
46 28
47 /** Global state associated with the current worker. See [_globalState]. */ 29 /** Global state associated with the current worker. See [globalState]. */
48 // TODO(sigmund): split in multiple classes: global, thread, main-worker states? 30 // TODO(sigmund): split in multiple classes: global, thread, main-worker states?
49 class GlobalState { 31 class GlobalState {
50 32
51 /** Next available isolate id. */ 33 /** Next available isolate id. */
52 int nextIsolateId = 0; 34 int nextIsolateId = 0;
53 35
54 /** Worker id associated with this worker. */ 36 /** Worker id associated with this worker. */
55 int currentWorkerId = 0; 37 int currentWorkerId = 0;
56 38
57 /** 39 /**
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
95 * Registry of isolates. Isolates must be registered if, and only if, receive 77 * Registry of isolates. Isolates must be registered if, and only if, receive
96 * ports are alive. Normally no open receive-ports means that the isolate is 78 * ports are alive. Normally no open receive-ports means that the isolate is
97 * dead, but DOM callbacks could resurrect it. 79 * dead, but DOM callbacks could resurrect it.
98 */ 80 */
99 Map<int, IsolateContext> isolates; 81 Map<int, IsolateContext> isolates;
100 82
101 /** Reference to the main worker. */ 83 /** Reference to the main worker. */
102 MainWorker mainWorker; 84 MainWorker mainWorker;
103 85
104 /** Registry of active workers. Only used in the main worker. */ 86 /** Registry of active workers. Only used in the main worker. */
105 Map<int, var> workers; 87 Map<int, Dynamic> workers;
106 88
107 GlobalState() { 89 GlobalState() {
108 topEventLoop = new EventLoop(); 90 topEventLoop = new EventLoop();
109 isolates = {}; 91 isolates = {};
110 workers = {}; 92 workers = {};
111 mainWorker = new MainWorker(); 93 mainWorker = new MainWorker();
112 _nativeInit(); 94 _nativeInit();
113 } 95 }
114 96
115 void _nativeInit() native @""" 97 void _nativeInit() native @"""
116 this.isWorker = typeof ($globalThis['importScripts']) != 'undefined'; 98 this.isWorker = typeof ($globalThis['importScripts']) != 'undefined';
117 this.inWindow = typeof(window) !== 'undefined'; 99 this.inWindow = typeof(window) !== 'undefined';
118 this.supportsWorkers = this.isWorker || 100 this.supportsWorkers = this.isWorker ||
119 ((typeof $globalThis['Worker']) != 'undefined'); 101 ((typeof $globalThis['Worker']) != 'undefined');
120 102
121 // if workers are supported, treat this as a main worker: 103 // if workers are supported, treat this as a main worker:
122 if (this.supportsWorkers) { 104 if (this.supportsWorkers) {
123 $globalThis.onmessage = function(e) { 105 $globalThis.onmessage = function(e) {
124 IsolateNatives._processWorkerMessage(this.mainWorker, e); 106 _IsolateNatives._processWorkerMessage(this.mainWorker, e);
125 }; 107 };
126 } 108 }
127 """ { 109 """ {
128 // Declare that the native code has a dependency on this fn. 110 // Declare that the native code has a dependency on this fn.
129 IsolateNatives._processWorkerMessage(null, null); 111 _IsolateNatives._processWorkerMessage(null, null);
130 } 112 }
131 113
132 /** 114 /**
133 * Close the worker running this code, called when there is nothing else to 115 * Close the worker running this code, called when there is nothing else to
134 * run. 116 * run.
135 */ 117 */
136 void closeWorker() { 118 void closeWorker() {
137 if (isWorker) { 119 if (isWorker) {
138 if (!isolates.isEmpty()) return; 120 if (!isolates.isEmpty()) return;
139 mainWorker.postMessage( 121 mainWorker.postMessage(
140 _serializeMessage({'command': 'close'})); 122 _serializeMessage({'command': 'close'}));
141 } else if (isolates.containsKey(rootContext.id) && workers.isEmpty() && 123 } else if (isolates.containsKey(rootContext.id) && workers.isEmpty() &&
142 !supportsWorkers && !inWindow) { 124 !supportsWorkers && !inWindow) {
143 // This should only trigger when running on the command-line. 125 // This should only trigger when running on the command-line.
144 // We don't want this check to execute in the browser where the isolate 126 // We don't want this check to execute in the browser where the isolate
145 // might still be alive due to DOM callbacks. 127 // might still be alive due to DOM callbacks.
146 throw new Exception("Program exited with open ReceivePorts."); 128 throw new Exception("Program exited with open ReceivePorts.");
147 } 129 }
148 } 130 }
149 } 131 }
150 132
151 _serializeMessage(message) {
152 if (_globalState.needSerialization) {
153 return new Serializer().traverse(message);
154 } else {
155 return new Copier().traverse(message);
156 }
157 }
158
159 _deserializeMessage(message) {
160 if (_globalState.needSerialization) {
161 return new Deserializer().deserialize(message);
162 } else {
163 // Nothing more to do.
164 return message;
165 }
166 }
167
168 /** Wait until all ports in a message are resolved. */
169 _waitForPendingPorts(var message, void callback()) {
170 final finder = new PendingSendPortFinder();
171 finder.traverse(message);
172 Futures.wait(finder.ports).then((_) => callback());
173 }
174
175 /** Default worker. */
176 class MainWorker {
177 int id = 0;
178 void postMessage(msg) native "return \$globalThis.postMessage(msg);";
179 void set onmessage(f) native "\$globalThis.onmessage = f;";
180 void terminate() {}
181 }
182
183 /**
184 * A web worker. This type is also defined in 'dart:dom', but we define it here
185 * to avoid introducing a dependency from corelib to dom. This definition uses a
186 * 'hidden' type (* prefix on the native name) to enforce that the type is
187 * defined dynamically only when web workers are actually available.
188 */
189 class _Worker native "*Worker" {
190 get id() native "return this.id;";
191 void set id(i) native "this.id = i;";
192 void set onmessage(f) native "this.onmessage = f;";
193 void postMessage(msg) native "return this.postMessage(msg);";
194 }
195
196 /** Context information tracked for each isolate. */ 133 /** Context information tracked for each isolate. */
197 class IsolateContext { 134 class IsolateContext {
198 /** Current isolate id. */ 135 /** Current isolate id. */
199 int id; 136 int id;
200 137
201 /** Registry of receive ports currently active on this isolate. */ 138 /** Registry of receive ports currently active on this isolate. */
202 Map<int, ReceivePort> ports; 139 Map<int, ReceivePort> ports;
203 140
204 /** Holds isolate globals (statics and top-level properties). */ 141 /** Holds isolate globals (statics and top-level properties). */
205 var isolateStatics; // native object containing all globals of an isolate. 142 var isolateStatics; // native object containing all globals of an isolate.
206 143
207 IsolateContext() { 144 IsolateContext() {
208 id = _globalState.nextIsolateId++; 145 id = globalState.nextIsolateId++;
209 ports = {}; 146 ports = {};
210 initGlobals(); 147 initGlobals();
211 } 148 }
212 149
213 // these are filled lazily the first time the isolate starts running. 150 // these are filled lazily the first time the isolate starts running.
214 void initGlobals() native 'this.isolateStatics = {};'; 151 void initGlobals() native 'this.isolateStatics = {};';
215 152
216 /** 153 /**
217 * Run [code] in the context of the isolate represented by [this]. Note this 154 * Run [code] in the context of the isolate represented by [this]. Note this
218 * is called from JavaScript (see $wrap_call in corejs.dart). 155 * is called from JavaScript (see $wrap_call in corejs.dart).
219 */ 156 */
220 void eval(Function code) { 157 void eval(Function code) {
221 var old = _globalState.currentContext; 158 var old = globalState.currentContext;
222 _globalState.currentContext = this; 159 globalState.currentContext = this;
223 this._setGlobals(); 160 this._setGlobals();
224 var result = null; 161 var result = null;
225 try { 162 try {
226 result = code(); 163 result = code();
227 } finally { 164 } finally {
228 _globalState.currentContext = old; 165 globalState.currentContext = old;
229 if (old != null) old._setGlobals(); 166 if (old != null) old._setGlobals();
230 } 167 }
231 return result; 168 return result;
232 } 169 }
233 170
234 void _setGlobals() native @'$globals = this.isolateStatics;'; 171 void _setGlobals() native @'$globals = this.isolateStatics;';
235 172
236 /** Lookup a port registered for this isolate. */ 173 /** Lookup a port registered for this isolate. */
237 ReceivePort lookup(int id) => ports[id]; 174 ReceivePort lookup(int id) => ports[id];
238 175
239 /** Register a port on this isolate. */ 176 /** Register a port on this isolate. */
240 void register(int portId, ReceivePort port) { 177 void register(int portId, ReceivePort port) {
241 if (ports.containsKey(portId)) { 178 if (ports.containsKey(portId)) {
242 throw new Exception("Registry: ports must be registered only once."); 179 throw new Exception("Registry: ports must be registered only once.");
243 } 180 }
244 ports[portId] = port; 181 ports[portId] = port;
245 _globalState.isolates[id] = this; // indicate this isolate is active 182 globalState.isolates[id] = this; // indicate this isolate is active
246 } 183 }
247 184
248 /** Unregister a port on this isolate. */ 185 /** Unregister a port on this isolate. */
249 void unregister(int portId) { 186 void unregister(int portId) {
250 ports.remove(portId); 187 ports.remove(portId);
251 if (ports.isEmpty()) { 188 if (ports.isEmpty()) {
252 _globalState.isolates.remove(id); // indicate this isolate is not active 189 globalState.isolates.remove(id); // indicate this isolate is not active
253 } 190 }
254 } 191 }
255 } 192 }
256 193
194
257 /** Represent the event loop on a javascript thread (DOM or worker). */ 195 /** Represent the event loop on a javascript thread (DOM or worker). */
258 class EventLoop { 196 class EventLoop {
259 Queue<IsolateEvent> events; 197 Queue<IsolateEvent> events;
260 198
261 EventLoop() : events = new Queue<IsolateEvent>(); 199 EventLoop() : events = new Queue<IsolateEvent>();
262 200
263 void enqueue(isolate, fn, msg) { 201 void enqueue(isolate, fn, msg) {
264 events.addLast(new IsolateEvent(isolate, fn, msg)); 202 events.addLast(new IsolateEvent(isolate, fn, msg));
265 } 203 }
266 204
267 IsolateEvent dequeue() { 205 IsolateEvent dequeue() {
268 if (events.isEmpty()) return null; 206 if (events.isEmpty()) return null;
269 return events.removeFirst(); 207 return events.removeFirst();
270 } 208 }
271 209
272 /** Process a single event, if any. */ 210 /** Process a single event, if any. */
273 bool runIteration() { 211 bool runIteration() {
274 final event = dequeue(); 212 final event = dequeue();
275 if (event == null) { 213 if (event == null) {
276 _globalState.closeWorker(); 214 globalState.closeWorker();
277 return false; 215 return false;
278 } 216 }
279 event.process(); 217 event.process();
280 return true; 218 return true;
281 } 219 }
282 220
283 /** Function equivalent to [:window.setTimeout:] when available, or null. */ 221 /** Function equivalent to [:window.setTimeout:] when available, or null. */
284 static Function _wrapSetTimeout() native """ 222 static Function _wrapSetTimeout() native """
285 return typeof window != 'undefined' ? 223 return typeof window != 'undefined' ?
286 function(a, b) { window.setTimeout(a, b); } : undefined; 224 function(a, b) { window.setTimeout(a, b); } : undefined;
(...skipping 16 matching lines...) Expand all
303 // Run synchronously until no more iterations are available. 241 // Run synchronously until no more iterations are available.
304 while (runIteration()) {} 242 while (runIteration()) {}
305 } 243 }
306 } 244 }
307 245
308 /** 246 /**
309 * Call [_runHelper] but ensure that worker exceptions are propragated. Note 247 * Call [_runHelper] but ensure that worker exceptions are propragated. Note
310 * this is called from JavaScript (see $wrap_call in corejs.dart). 248 * this is called from JavaScript (see $wrap_call in corejs.dart).
311 */ 249 */
312 void run() { 250 void run() {
313 if (!_globalState.isWorker) { 251 if (!globalState.isWorker) {
314 _runHelper(); 252 _runHelper();
315 } else { 253 } else {
316 try { 254 try {
317 _runHelper(); 255 _runHelper();
318 } catch(var e, var trace) { 256 } catch(var e, var trace) {
319 _globalState.mainWorker.postMessage(_serializeMessage( 257 globalState.mainWorker.postMessage(_serializeMessage(
320 {'command': 'error', 'msg': '$e\n$trace' })); 258 {'command': 'error', 'msg': '$e\n$trace' }));
321 } 259 }
322 } 260 }
323 } 261 }
324 } 262 }
325 263
326 /** An event in the top-level event queue. */ 264 /** An event in the top-level event queue. */
327 class IsolateEvent { 265 class IsolateEvent {
328 IsolateContext isolate; 266 IsolateContext isolate;
329 Function fn; 267 Function fn;
330 String message; 268 String message;
331 269
332 IsolateEvent(this.isolate, this.fn, this.message); 270 IsolateEvent(this.isolate, this.fn, this.message);
333 271
334 void process() { 272 void process() {
335 isolate.eval(fn); 273 isolate.eval(fn);
336 } 274 }
337 } 275 }
338 276
339 /** Common functionality to all send ports. */
340 class BaseSendPort implements SendPort {
341 /** Id for the destination isolate. */
342 final int _isolateId;
343 277
344 BaseSendPort(this._isolateId); 278 /** Default worker. */
345 279 class MainWorker {
346 ReceivePortSingleShotImpl call(var message) { 280 int id = 0;
347 final result = new ReceivePortSingleShotImpl(); 281 void postMessage(msg) native "return \$globalThis.postMessage(msg);";
348 this.send(message, result.toSendPort()); 282 void set onmessage(f) native "\$globalThis.onmessage = f;";
349 return result; 283 void terminate() {}
350 }
351
352 static void checkReplyTo(SendPort replyTo) {
353 if (replyTo !== null
354 && replyTo is! NativeJsSendPort
355 && replyTo is! WorkerSendPort
356 && replyTo is! BufferingSendPort) {
357 throw new Exception("SendPort.send: Illegal replyTo port type");
358 }
359 }
360
361 // TODO(sigmund): replace the current SendPort.call with the following:
362 //Future call(var message) {
363 //  final completer = new Completer();
364 //  final port = new ReceivePort.singleShot();
365 //  send(message, port.toSendPort());
366 //  port.receive((value, ignoreReplyTo) {
367 //    if (value is Exception) {
368 //  completer.completeException(value);
369 // } else {
370 // completer.complete(value);
371 // }
372 // });
373 //  return completer.future;
374 //}
375
376 abstract void send(var message, [SendPort replyTo]);
377 abstract bool operator ==(var other);
378 abstract int hashCode();
379 } 284 }
380 285
381 /** A send port that delivers messages in-memory via native JavaScript calls. */ 286 /**
382 class NativeJsSendPort extends BaseSendPort implements SendPort { 287 * A web worker. This type is also defined in 'dart:dom', but we define it here
383 final ReceivePortImpl _receivePort; 288 * to avoid introducing a dependency from corelib to dom. This definition uses a
384 289 * 'hidden' type (* prefix on the native name) to enforce that the type is
385 const NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); 290 * defined dynamically only when web workers are actually available.
386 291 */
387 void send(var message, [SendPort replyTo = null]) { 292 class _Worker native "*Worker" {
388 _waitForPendingPorts([message, replyTo], () { 293 get id() native "return this.id;";
389 checkReplyTo(replyTo); 294 void set id(i) native "this.id = i;";
390 // Check that the isolate still runs and the port is still open 295 void set onmessage(f) native "this.onmessage = f;";
391 final isolate = _globalState.isolates[_isolateId]; 296 void postMessage(msg) native "return this.postMessage(msg);";
392 if (isolate == null) return;
393 if (_receivePort._callback == null) return;
394
395 // We force serialization/deserialization as a simple way to ensure
396 // isolate communication restrictions are respected between isolates that
397 // live in the same worker. NativeJsSendPort delivers both messages from
398 // the same worker and messages from other workers. In particular,
399 // messages sent from a worker via a WorkerSendPort are received at
400 // [_processWorkerMessage] and forwarded to a native port. In such cases,
401 // here we'll see [_globalState.currentContext == null].
402 final shouldSerialize = _globalState.currentContext != null
403 && _globalState.currentContext.id != _isolateId;
404 var msg = message;
405 var reply = replyTo;
406 if (shouldSerialize) {
407 msg = _serializeMessage(msg);
408 reply = _serializeMessage(reply);
409 }
410 _globalState.topEventLoop.enqueue(isolate, () {
411 if (_receivePort._callback != null) {
412 if (shouldSerialize) {
413 msg = _deserializeMessage(msg);
414 reply = _deserializeMessage(reply);
415 }
416 _receivePort._callback(msg, reply);
417 }
418 }, 'receive ' + message);
419 });
420 }
421
422 bool operator ==(var other) => (other is NativeJsSendPort) &&
423 (_receivePort == other._receivePort);
424
425 int hashCode() => _receivePort._id;
426 }
427
428 /** A send port that delivers messages via worker.postMessage. */
429 class WorkerSendPort extends BaseSendPort implements SendPort {
430 final int _workerId;
431 final int _receivePortId;
432
433 const WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
434 : super(isolateId);
435
436 void send(var message, [SendPort replyTo = null]) {
437 _waitForPendingPorts([message, replyTo], () {
438 checkReplyTo(replyTo);
439 final workerMessage = _serializeMessage({
440 'command': 'message',
441 'port': this,
442 'msg': message,
443 'replyTo': replyTo});
444
445 if (_globalState.isWorker) {
446 // communication from one worker to another go through the main worker:
447 _globalState.mainWorker.postMessage(workerMessage);
448 } else {
449 _globalState.workers[_workerId].postMessage(workerMessage);
450 }
451 });
452 }
453
454 bool operator ==(var other) {
455 return (other is WorkerSendPort) &&
456 (_workerId == other._workerId) &&
457 (_isolateId == other._isolateId) &&
458 (_receivePortId == other._receivePortId);
459 }
460
461 int hashCode() {
462 // TODO(sigmund): use a standard hash when we get one available in corelib.
463 return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId;
464 }
465 }
466
467 /** A port that buffers messages until an underlying port gets resolved. */
468 class BufferingSendPort extends BaseSendPort implements SendPort {
469 /** Internal counter to assign unique ids to each port. */
470 static int _idCount = 0;
471
472 /** For implementing equals and hashcode. */
473 final int _id;
474
475 /** Underlying port, when resolved. */
476 SendPort _port;
477
478 /**
479 * Future of the underlying port, so that we can detect when this port can be
480 * sent on messages.
481 */
482 Future<SendPort> _futurePort;
483
484 /** Pending messages (and reply ports). */
485 List pending;
486
487 BufferingSendPort(isolateId, this._futurePort)
488 : super(isolateId), _id = _idCount, pending = [] {
489 _idCount++;
490 _futurePort.then((p) {
491 _port = p;
492 for (final item in pending) {
493 p.send(item['message'], item['replyTo']);
494 }
495 pending = null;
496 });
497 }
498
499 BufferingSendPort.fromPort(isolateId, this._port)
500 : super(isolateId), _id = _idCount {
501 _idCount++;
502 }
503
504 void send(var message, [SendPort replyTo]) {
505 if (_port != null) {
506 _port.send(message, replyTo);
507 } else {
508 pending.add({'message': message, 'replyTo': replyTo});
509 }
510 }
511
512 bool operator ==(var other) => other is BufferingSendPort && _id == other._id;
513 int hashCode() => _id;
514 }
515
516 /** Default factory for receive ports. */
517 class ReceivePortFactory {
518
519 factory ReceivePort() {
520 return new ReceivePortImpl();
521 }
522
523 factory ReceivePort.singleShot() {
524 return new ReceivePortSingleShotImpl();
525 }
526 }
527
528 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
529 class ReceivePortImpl implements ReceivePort {
530 int _id;
531 Function _callback;
532 static int _nextFreeId = 1;
533
534 ReceivePortImpl()
535 : _id = _nextFreeId++ {
536 _globalState.currentContext.register(_id, this);
537 }
538
539 void receive(void onMessage(var message, SendPort replyTo)) {
540 _callback = onMessage;
541 }
542
543 void close() {
544 _callback = null;
545 _globalState.currentContext.unregister(_id);
546 }
547
548 SendPort toSendPort() {
549 return new NativeJsSendPort(this, _globalState.currentContext.id);
550 }
551 }
552
553 /** Implementation of a single-shot [ReceivePort]. */
554 class ReceivePortSingleShotImpl implements ReceivePort {
555
556 ReceivePortSingleShotImpl() : _port = new ReceivePortImpl() { }
557
558 void receive(void callback(var message, SendPort replyTo)) {
559 _port.receive((var message, SendPort replyTo) {
560 _port.close();
561 callback(message, replyTo);
562 });
563 }
564
565 void close() {
566 _port.close();
567 }
568
569 SendPort toSendPort() => _port.toSendPort();
570
571 final ReceivePortImpl _port;
572 } 297 }
573 298
574 final String _SPAWNED_SIGNAL = "spawned"; 299 final String _SPAWNED_SIGNAL = "spawned";
575 300
576 class IsolateNatives { 301 class _IsolateNatives {
577 302
578 /** JavaScript-specific implementation to spawn an isolate. */ 303 /** JavaScript-specific implementation to spawn an isolate. */
579 static Future<SendPort> spawn(Isolate isolate, bool isLight) { 304 static Future<SendPort> spawn(Isolate isolate, bool isLight) {
580 Completer<SendPort> completer = new Completer<SendPort>(); 305 Completer<SendPort> completer = new Completer<SendPort>();
581 ReceivePort port = new ReceivePort.singleShot(); 306 ReceivePort port = new ReceivePort.singleShot();
582 port.receive((msg, SendPort replyPort) { 307 port.receive((msg, SendPort replyPort) {
583 assert(msg == _SPAWNED_SIGNAL); 308 assert(msg == _SPAWNED_SIGNAL);
584 completer.complete(replyPort); 309 completer.complete(replyPort);
585 }); 310 });
586 311
587 // TODO(floitsch): throw exception if isolate's class doesn't have a 312 // TODO(floitsch): throw exception if isolate's class doesn't have a
588 // default constructor. 313 // default constructor.
589 if (_globalState.useWorkers && !isLight) { 314 if (globalState.useWorkers && !isLight) {
590 _startWorker(isolate, port.toSendPort()); 315 _startWorker(isolate, port.toSendPort());
591 } else { 316 } else {
592 _startNonWorker(isolate, port.toSendPort()); 317 _startNonWorker(isolate, port.toSendPort());
593 } 318 }
594 319
595 return completer.future; 320 return completer.future;
596 } 321 }
597 322
598 static SendPort _startWorker(Isolate runnable, SendPort replyPort) { 323 static SendPort _startWorker(Isolate runnable, SendPort replyPort) {
599 var factoryName = _getJSConstructorName(runnable); 324 var factoryName = _getJSConstructorName(runnable);
600 if (_globalState.isWorker) { 325 if (globalState.isWorker) {
601 _globalState.mainWorker.postMessage(_serializeMessage({ 326 globalState.mainWorker.postMessage(_serializeMessage({
602 'command': 'spawn-worker', 327 'command': 'spawn-worker',
603 'factoryName': factoryName, 328 'factoryName': factoryName,
604 'replyPort': _serializeMessage(replyPort)})); 329 'replyPort': _serializeMessage(replyPort)}));
605 } else { 330 } else {
606 _spawnWorker(factoryName, _serializeMessage(replyPort)); 331 _spawnWorker(factoryName, _serializeMessage(replyPort));
607 } 332 }
608 } 333 }
609 334
610 /** 335 /**
611 * The src url for the script tag that loaded this code. Used to create 336 * The src url for the script tag that loaded this code. Used to create
(...skipping 14 matching lines...) Expand all
626 var scripts = document.getElementsByTagName('script'); 351 var scripts = document.getElementsByTagName('script');
627 // The scripts variable only contains the scripts that have already been 352 // The scripts variable only contains the scripts that have already been
628 // executed. The last one is the currently running script. 353 // executed. The last one is the currently running script.
629 var script = scripts[scripts.length - 1]; 354 var script = scripts[scripts.length - 1];
630 var src = script && script.src; 355 var src = script && script.src;
631 if (!src) { 356 if (!src) {
632 // TODO() 357 // TODO()
633 src = "FIXME:5407062" + "_" + Math.random().toString(); 358 src = "FIXME:5407062" + "_" + Math.random().toString();
634 if (script) script.src = src; 359 if (script) script.src = src;
635 } 360 }
636 IsolateNatives._thisScriptCache = src; 361 _IsolateNatives._thisScriptCache = src;
637 return src; 362 return src;
638 """; 363 """;
639 364
640 /** Starts a new worker with the given URL. */ 365 /** Starts a new worker with the given URL. */
641 static _Worker _newWorker(url) native "return new Worker(url);"; 366 static _Worker _newWorker(url) native "return new Worker(url);";
642 367
643 /** 368 /**
644 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor 369 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor
645 * name for the isolate entry point class. 370 * name for the isolate entry point class.
646 */ 371 */
647 static void _spawnWorker(factoryName, serializedReplyPort) { 372 static void _spawnWorker(factoryName, serializedReplyPort) {
648 final worker = _newWorker(_thisScript); 373 final worker = _newWorker(_thisScript);
649 worker.onmessage = (e) { _processWorkerMessage(worker, e); }; 374 worker.onmessage = (e) { _processWorkerMessage(worker, e); };
650 var workerId = _globalState.nextWorkerId++; 375 var workerId = globalState.nextWorkerId++;
651 // We also store the id on the worker itself so that we can unregister it. 376 // We also store the id on the worker itself so that we can unregister it.
652 worker.id = workerId; 377 worker.id = workerId;
653 _globalState.workers[workerId] = worker; 378 globalState.workers[workerId] = worker;
654 worker.postMessage(_serializeMessage({ 379 worker.postMessage(_serializeMessage({
655 'command': 'start', 380 'command': 'start',
656 'id': workerId, 381 'id': workerId,
657 'replyTo': serializedReplyPort, 382 'replyTo': serializedReplyPort,
658 'factoryName': factoryName })); 383 'factoryName': factoryName }));
659 } 384 }
660 385
661 /** 386 /**
662 * Assume that [e] is a browser message event and extract its message data. 387 * Assume that [e] is a browser message event and extract its message data.
663 * We don't import the dom explicitly so, when workers are disabled, this 388 * We don't import the dom explicitly so, when workers are disabled, this
664 * library can also run on top of nodejs. 389 * library can also run on top of nodejs.
665 */ 390 */
666 static _getEventData(e) native "return e.data"; 391 static _getEventData(e) native "return e.data";
667 392
668 /** 393 /**
669 * Process messages on a worker, either to control the worker instance or to 394 * Process messages on a worker, either to control the worker instance or to
670 * pass messages along to the isolate running in the worker. 395 * pass messages along to the isolate running in the worker.
671 */ 396 */
672 static void _processWorkerMessage(sender, e) { 397 static void _processWorkerMessage(sender, e) {
673 var msg = _deserializeMessage(_getEventData(e)); 398 var msg = _deserializeMessage(_getEventData(e));
674 switch (msg['command']) { 399 switch (msg['command']) {
675 // TODO(sigmund): delete after we migrate to Isolate2 400 // TODO(sigmund): delete after we migrate to Isolate2
676 case 'start': 401 case 'start':
677 _globalState.currentWorkerId = msg['id']; 402 globalState.currentWorkerId = msg['id'];
678 var runnerObject = 403 var runnerObject =
679 _allocate(_getJSConstructorFromName(msg['factoryName'])); 404 _allocate(_getJSConstructorFromName(msg['factoryName']));
680 var serializedReplyTo = msg['replyTo']; 405 var serializedReplyTo = msg['replyTo'];
681 _globalState.topEventLoop.enqueue(new IsolateContext(), function() { 406 globalState.topEventLoop.enqueue(new IsolateContext(), function() {
682 var replyTo = _deserializeMessage(serializedReplyTo); 407 var replyTo = _deserializeMessage(serializedReplyTo);
683 _startIsolate(runnerObject, replyTo); 408 _startIsolate(runnerObject, replyTo);
684 }, 'worker-start'); 409 }, 'worker-start');
685 _globalState.topEventLoop.run(); 410 globalState.topEventLoop.run();
686 break; 411 break;
687 case 'start2': 412 case 'start2':
688 _globalState.currentWorkerId = msg['id']; 413 globalState.currentWorkerId = msg['id'];
689 Function entryPoint = _getJSFunctionFromName(msg['functionName']); 414 Function entryPoint = _getJSFunctionFromName(msg['functionName']);
690 var replyTo = _deserializeMessage(msg['replyTo']); 415 var replyTo = _deserializeMessage(msg['replyTo']);
691 _globalState.topEventLoop.enqueue(new IsolateContext(), function() { 416 globalState.topEventLoop.enqueue(new IsolateContext(), function() {
692 _startIsolate2(entryPoint, replyTo); 417 _startIsolate2(entryPoint, replyTo);
693 }, 'worker-start'); 418 }, 'worker-start');
694 _globalState.topEventLoop.run(); 419 globalState.topEventLoop.run();
695 break; 420 break;
696 // TODO(sigmund): delete after we migrate to Isolate2 421 // TODO(sigmund): delete after we migrate to Isolate2
697 case 'spawn-worker': 422 case 'spawn-worker':
698 _spawnWorker(msg['factoryName'], msg['replyPort']); 423 _spawnWorker(msg['factoryName'], msg['replyPort']);
699 break; 424 break;
700 case 'spawn-worker2': 425 case 'spawn-worker2':
701 _spawnWorker2(msg['functionName'], msg['uri'], msg['replyPort']); 426 _spawnWorker2(msg['functionName'], msg['uri'], msg['replyPort']);
702 break; 427 break;
703 case 'message': 428 case 'message':
704 msg['port'].send(msg['msg'], msg['replyTo']); 429 msg['port'].send(msg['msg'], msg['replyTo']);
705 _globalState.topEventLoop.run(); 430 globalState.topEventLoop.run();
706 break; 431 break;
707 case 'close': 432 case 'close':
708 _log("Closing Worker"); 433 _log("Closing Worker");
709 _globalState.workers.remove(sender.id); 434 globalState.workers.remove(sender.id);
710 sender.terminate(); 435 sender.terminate();
711 _globalState.topEventLoop.run(); 436 globalState.topEventLoop.run();
712 break; 437 break;
713 case 'log': 438 case 'log':
714 _log(msg['msg']); 439 _log(msg['msg']);
715 break; 440 break;
716 case 'print': 441 case 'print':
717 if (_globalState.isWorker) { 442 if (globalState.isWorker) {
718 _globalState.mainWorker.postMessage( 443 globalState.mainWorker.postMessage(
719 _serializeMessage({'command': 'print', 'msg': msg})); 444 _serializeMessage({'command': 'print', 'msg': msg}));
720 } else { 445 } else {
721 print(msg['msg']); 446 print(msg['msg']);
722 } 447 }
723 break; 448 break;
724 case 'error': 449 case 'error':
725 throw msg['msg']; 450 throw msg['msg'];
726 } 451 }
727 } 452 }
728 453
729 /** Log a message, forwarding to the main worker if appropriate. */ 454 /** Log a message, forwarding to the main worker if appropriate. */
730 static _log(msg) { 455 static _log(msg) {
731 if (_globalState.isWorker) { 456 if (globalState.isWorker) {
732 _globalState.mainWorker.postMessage( 457 globalState.mainWorker.postMessage(
733 _serializeMessage({'command': 'log', 'msg': msg })); 458 _serializeMessage({'command': 'log', 'msg': msg }));
734 } else { 459 } else {
735 try { 460 try {
736 _consoleLog(msg); 461 _consoleLog(msg);
737 } catch(e, trace) { 462 } catch(e, trace) {
738 throw new Exception(trace); 463 throw new Exception(trace);
739 } 464 }
740 } 465 }
741 } 466 }
742 467
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
774 /** Starts a non-worker isolate. */ 499 /** Starts a non-worker isolate. */
775 static SendPort _startNonWorker(Isolate runnable, SendPort replyTo) { 500 static SendPort _startNonWorker(Isolate runnable, SendPort replyTo) {
776 // Spawn a new isolate and create the receive port in it. 501 // Spawn a new isolate and create the receive port in it.
777 final spawned = new IsolateContext(); 502 final spawned = new IsolateContext();
778 503
779 // Instead of just running the provided runnable, we create a 504 // Instead of just running the provided runnable, we create a
780 // new cloned instance of it with a fresh state in the spawned 505 // new cloned instance of it with a fresh state in the spawned
781 // isolate. This way, we do not get cross-isolate references 506 // isolate. This way, we do not get cross-isolate references
782 // through the runnable. 507 // through the runnable.
783 final ctor = _getJSConstructor(runnable); 508 final ctor = _getJSConstructor(runnable);
784 _globalState.topEventLoop.enqueue(spawned, function() { 509 globalState.topEventLoop.enqueue(spawned, function() {
785 _startIsolate(_allocate(ctor), replyTo); 510 _startIsolate(_allocate(ctor), replyTo);
786 }, 'nonworker start'); 511 }, 'nonworker start');
787 } 512 }
788 513
789 /** Given a ready-to-start runnable, start running it. */ 514 /** Given a ready-to-start runnable, start running it. */
790 static void _startIsolate(Isolate isolate, SendPort replyTo) { 515 static void _startIsolate(Isolate isolate, SendPort replyTo) {
791 _fillStatics(_globalState.currentContext); 516 fillStatics(globalState.currentContext);
792 ReceivePort port = new ReceivePort(); 517 ReceivePort port = new ReceivePort();
793 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); 518 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
794 isolate._run(port); 519 isolate._run(port);
795 } 520 }
796 521
797 // TODO(sigmund): clean up above, after we make the new API the default: 522 // TODO(sigmund): clean up above, after we make the new API the default:
798 523
799 static _spawn2(String functionName, String uri, bool isLight) { 524 static _spawn2(String functionName, String uri, bool isLight) {
800 Completer<SendPort> completer = new Completer<SendPort>(); 525 Completer<SendPort> completer = new Completer<SendPort>();
801 ReceivePort port = new ReceivePort.singleShot(); 526 ReceivePort port = new ReceivePort.singleShot();
802 port.receive((msg, SendPort replyPort) { 527 port.receive((msg, SendPort replyPort) {
803 assert(msg == _SPAWNED_SIGNAL); 528 assert(msg == _SPAWNED_SIGNAL);
804 completer.complete(replyPort); 529 completer.complete(replyPort);
805 }); 530 });
806 531
807 SendPort signalReply = port.toSendPort(); 532 SendPort signalReply = port.toSendPort();
808 533
809 if (_globalState.useWorkers && !isLight) { 534 if (globalState.useWorkers && !isLight) {
810 _startWorker2(functionName, uri, signalReply); 535 _startWorker2(functionName, uri, signalReply);
811 } else { 536 } else {
812 _startNonWorker2(functionName, uri, signalReply); 537 _startNonWorker2(functionName, uri, signalReply);
813 } 538 }
814 return new BufferingSendPort( 539 return new _BufferingSendPort(
815 _globalState.currentContext.id, completer.future); 540 globalState.currentContext.id, completer.future);
816 } 541 }
817 542
818 static SendPort _startWorker2( 543 static SendPort _startWorker2(
819 String functionName, String uri, SendPort replyPort) { 544 String functionName, String uri, SendPort replyPort) {
820 if (_globalState.isWorker) { 545 if (globalState.isWorker) {
821 _globalState.mainWorker.postMessage(_serializeMessage({ 546 globalState.mainWorker.postMessage(_serializeMessage({
822 'command': 'spawn-worker2', 547 'command': 'spawn-worker2',
823 'functionName': functionName, 548 'functionName': functionName,
824 'uri': uri, 549 'uri': uri,
825 'replyPort': replyPort})); 550 'replyPort': replyPort}));
826 } else { 551 } else {
827 _spawnWorker2(functionName, uri, replyPort); 552 _spawnWorker2(functionName, uri, replyPort);
828 } 553 }
829 } 554 }
830 555
831 static SendPort _startNonWorker2( 556 static SendPort _startNonWorker2(
832 String functionName, String uri, SendPort replyPort) { 557 String functionName, String uri, SendPort replyPort) {
833 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. 558 // TODO(eub): support IE9 using an iframe -- Dart issue 1702.
834 if (uri != null) throw new UnsupportedOperationException( 559 if (uri != null) throw new UnsupportedOperationException(
835 "Currently Isolate2.fromUri is not supported without web workers."); 560 "Currently Isolate2.fromUri is not supported without web workers.");
836 _globalState.topEventLoop.enqueue(new IsolateContext(), function() { 561 globalState.topEventLoop.enqueue(new IsolateContext(), function() {
837 final func = _getJSFunctionFromName(functionName); 562 final func = _getJSFunctionFromName(functionName);
838 _startIsolate2(func, replyPort); 563 _startIsolate2(func, replyPort);
839 }, 'nonworker start'); 564 }, 'nonworker start');
840 } 565 }
841 566
842 static void _startIsolate2(Function topLevel, SendPort replyTo) { 567 static void _startIsolate2(Function topLevel, SendPort replyTo) {
843 _fillStatics(_globalState.currentContext); 568 fillStatics(globalState.currentContext);
844 final port = new ReceivePort(); 569 final port = new ReceivePort();
845 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); 570 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort());
846 topLevel(port); 571 topLevel(port);
847 } 572 }
848 573
849 /** 574 /**
850 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor 575 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor
851 * name for the isolate entry point class. 576 * name for the isolate entry point class.
852 */ 577 */
853 static void _spawnWorker2(functionName, uri, replyPort) { 578 static void _spawnWorker2(functionName, uri, replyPort) {
854 // TODO(eub): convert to 'main' once we switch back to port at top-level. 579 // TODO(eub): convert to 'main' once we switch back to port at top-level.
855 if (functionName == null) functionName = 'isolateMain'; 580 if (functionName == null) functionName = 'isolateMain';
856 if (uri == null) uri = _thisScript; 581 if (uri == null) uri = _thisScript;
857 final worker = _newWorker(uri); 582 final worker = _newWorker(uri);
858 worker.onmessage = (e) { _processWorkerMessage(worker, e); }; 583 worker.onmessage = (e) { _processWorkerMessage(worker, e); };
859 var workerId = _globalState.nextWorkerId++; 584 var workerId = globalState.nextWorkerId++;
860 // We also store the id on the worker itself so that we can unregister it. 585 // We also store the id on the worker itself so that we can unregister it.
861 worker.id = workerId; 586 worker.id = workerId;
862 _globalState.workers[workerId] = worker; 587 globalState.workers[workerId] = worker;
863 worker.postMessage(_serializeMessage({ 588 worker.postMessage(_serializeMessage({
864 'command': 'start2', 589 'command': 'start2',
865 'id': workerId, 590 'id': workerId,
866 // Note: we serialize replyPort twice because the child worker needs to 591 // Note: we serialize replyPort twice because the child worker needs to
867 // first deserialize the worker id, before it can correctly deserialize 592 // first deserialize the worker id, before it can correctly deserialize
868 // the port (port deserialization is sensitive to what is the current 593 // the port (port deserialization is sensitive to what is the current
869 // workerId). 594 // workerId).
870 'replyTo': _serializeMessage(replyPort), 595 'replyTo': _serializeMessage(replyPort),
871 'functionName': functionName })); 596 'functionName': functionName }));
872 } 597 }
873 } 598 }
874
875 class Isolate2Impl implements Isolate2 {
876 SendPort sendPort;
877
878 Isolate2Impl(this.sendPort);
879
880 void stop() {}
881 }
882
883 class IsolateFactory implements Isolate2 {
884
885 factory Isolate2.fromCode(Function topLevelFunction) {
886 final name = IsolateNatives._getJSFunctionName(topLevelFunction);
887 if (name == null) {
888 throw new UnsupportedOperationException(
889 "only top-level functions can be spawned.");
890 }
891 return new Isolate2Impl(IsolateNatives._spawn2(name, null, false));
892 }
893
894 factory Isolate2.fromUri(String uri) {
895 return new Isolate2Impl(IsolateNatives._spawn2(null, uri, false));
896 }
897 }
OLDNEW
« no previous file with comments | « lib/isolate/frog/compiler_hooks.dart ('k') | lib/isolate/frog/messages.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698