OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
| 5 /** Implementation of [Isolate2]. */ |
| 6 class _Isolate2Impl implements Isolate2 { |
| 7 SendPort sendPort; |
| 8 |
| 9 _Isolate2Impl(this.sendPort); |
| 10 } |
| 11 |
5 /** | 12 /** |
6 * A native object that is shared across isolates. This object is visible to all | 13 * A native object that is shared across isolates. This object is visible to all |
7 * isolates running on the same worker (either UI or background web worker). | 14 * isolates running on the same worker (either UI or background web worker). |
8 * | 15 * |
9 * This is code that is intended to 'escape' the isolate boundaries in order to | 16 * This is code that is intended to 'escape' the isolate boundaries in order to |
10 * implement the semantics of friendly isolates in JavaScript. Without this we | 17 * implement the semantics of isolates in JavaScript. Without this we would have |
11 * would have been forced to implement more code (including the top-level event | 18 * been forced to implement more code (including the top-level event loop) in |
12 * loop) in JavaScript itself. | 19 * JavaScript itself. |
13 */ | 20 */ |
14 GlobalState get _globalState() native "return \$globalState;"; | 21 GlobalState get globalState() native "return \$globalState;"; |
15 set _globalState(GlobalState val) native "\$globalState = val;"; | 22 set globalState(GlobalState val) native "\$globalState = val;"; |
16 | 23 |
17 /** | 24 void fillStatics(context) native @""" |
18 * Wrapper that takes the dart entry point and runs it within an isolate. The | |
19 * frog compiler will inject a call of the form [: startRootIsolate(main); :] | |
20 * when it determines that this wrapping is needed. For single-isolate | |
21 * applications (e.g. hello world), this call is not emitted. | |
22 */ | |
23 void startRootIsolate(entry) { | |
24 _globalState = new GlobalState(); | |
25 | |
26 // Don't start the main loop again, if we are in a worker. | |
27 if (_globalState.isWorker) return; | |
28 final rootContext = new IsolateContext(); | |
29 _globalState.rootContext = rootContext; | |
30 _fillStatics(rootContext); | |
31 | |
32 // BUG(5151491): Setting currentContext should not be necessary, but | |
33 // because closures passed to the DOM as event handlers do not bind their | |
34 // isolate automatically we try to give them a reasonable context to live in | |
35 // by having a "default" isolate (the first one created). | |
36 _globalState.currentContext = rootContext; | |
37 | |
38 rootContext.eval(entry); | |
39 _globalState.topEventLoop.run(); | |
40 } | |
41 | |
42 void _fillStatics(context) native @""" | |
43 $globals = context.isolateStatics; | 25 $globals = context.isolateStatics; |
44 $static_init(); | 26 $static_init(); |
45 """; | 27 """; |
46 | 28 |
47 /** Global state associated with the current worker. See [_globalState]. */ | 29 /** Global state associated with the current worker. See [globalState]. */ |
48 // TODO(sigmund): split in multiple classes: global, thread, main-worker states? | 30 // TODO(sigmund): split in multiple classes: global, thread, main-worker states? |
49 class GlobalState { | 31 class GlobalState { |
50 | 32 |
51 /** Next available isolate id. */ | 33 /** Next available isolate id. */ |
52 int nextIsolateId = 0; | 34 int nextIsolateId = 0; |
53 | 35 |
54 /** Worker id associated with this worker. */ | 36 /** Worker id associated with this worker. */ |
55 int currentWorkerId = 0; | 37 int currentWorkerId = 0; |
56 | 38 |
57 /** | 39 /** |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
95 * Registry of isolates. Isolates must be registered if, and only if, receive | 77 * Registry of isolates. Isolates must be registered if, and only if, receive |
96 * ports are alive. Normally no open receive-ports means that the isolate is | 78 * ports are alive. Normally no open receive-ports means that the isolate is |
97 * dead, but DOM callbacks could resurrect it. | 79 * dead, but DOM callbacks could resurrect it. |
98 */ | 80 */ |
99 Map<int, IsolateContext> isolates; | 81 Map<int, IsolateContext> isolates; |
100 | 82 |
101 /** Reference to the main worker. */ | 83 /** Reference to the main worker. */ |
102 MainWorker mainWorker; | 84 MainWorker mainWorker; |
103 | 85 |
104 /** Registry of active workers. Only used in the main worker. */ | 86 /** Registry of active workers. Only used in the main worker. */ |
105 Map<int, var> workers; | 87 Map<int, Dynamic> workers; |
106 | 88 |
107 GlobalState() { | 89 GlobalState() { |
108 topEventLoop = new EventLoop(); | 90 topEventLoop = new EventLoop(); |
109 isolates = {}; | 91 isolates = {}; |
110 workers = {}; | 92 workers = {}; |
111 mainWorker = new MainWorker(); | 93 mainWorker = new MainWorker(); |
112 _nativeInit(); | 94 _nativeInit(); |
113 } | 95 } |
114 | 96 |
115 void _nativeInit() native @""" | 97 void _nativeInit() native @""" |
116 this.isWorker = typeof ($globalThis['importScripts']) != 'undefined'; | 98 this.isWorker = typeof ($globalThis['importScripts']) != 'undefined'; |
117 this.inWindow = typeof(window) !== 'undefined'; | 99 this.inWindow = typeof(window) !== 'undefined'; |
118 this.supportsWorkers = this.isWorker || | 100 this.supportsWorkers = this.isWorker || |
119 ((typeof $globalThis['Worker']) != 'undefined'); | 101 ((typeof $globalThis['Worker']) != 'undefined'); |
120 | 102 |
121 // if workers are supported, treat this as a main worker: | 103 // if workers are supported, treat this as a main worker: |
122 if (this.supportsWorkers) { | 104 if (this.supportsWorkers) { |
123 $globalThis.onmessage = function(e) { | 105 $globalThis.onmessage = function(e) { |
124 IsolateNatives._processWorkerMessage(this.mainWorker, e); | 106 _IsolateNatives._processWorkerMessage(this.mainWorker, e); |
125 }; | 107 }; |
126 } | 108 } |
127 """ { | 109 """ { |
128 // Declare that the native code has a dependency on this fn. | 110 // Declare that the native code has a dependency on this fn. |
129 IsolateNatives._processWorkerMessage(null, null); | 111 _IsolateNatives._processWorkerMessage(null, null); |
130 } | 112 } |
131 | 113 |
132 /** | 114 /** |
133 * Close the worker running this code, called when there is nothing else to | 115 * Close the worker running this code, called when there is nothing else to |
134 * run. | 116 * run. |
135 */ | 117 */ |
136 void closeWorker() { | 118 void closeWorker() { |
137 if (isWorker) { | 119 if (isWorker) { |
138 if (!isolates.isEmpty()) return; | 120 if (!isolates.isEmpty()) return; |
139 mainWorker.postMessage( | 121 mainWorker.postMessage( |
140 _serializeMessage({'command': 'close'})); | 122 _serializeMessage({'command': 'close'})); |
141 } else if (isolates.containsKey(rootContext.id) && workers.isEmpty() && | 123 } else if (isolates.containsKey(rootContext.id) && workers.isEmpty() && |
142 !supportsWorkers && !inWindow) { | 124 !supportsWorkers && !inWindow) { |
143 // This should only trigger when running on the command-line. | 125 // This should only trigger when running on the command-line. |
144 // We don't want this check to execute in the browser where the isolate | 126 // We don't want this check to execute in the browser where the isolate |
145 // might still be alive due to DOM callbacks. | 127 // might still be alive due to DOM callbacks. |
146 throw new Exception("Program exited with open ReceivePorts."); | 128 throw new Exception("Program exited with open ReceivePorts."); |
147 } | 129 } |
148 } | 130 } |
149 } | 131 } |
150 | 132 |
151 _serializeMessage(message) { | |
152 if (_globalState.needSerialization) { | |
153 return new Serializer().traverse(message); | |
154 } else { | |
155 return new Copier().traverse(message); | |
156 } | |
157 } | |
158 | |
159 _deserializeMessage(message) { | |
160 if (_globalState.needSerialization) { | |
161 return new Deserializer().deserialize(message); | |
162 } else { | |
163 // Nothing more to do. | |
164 return message; | |
165 } | |
166 } | |
167 | |
168 /** Wait until all ports in a message are resolved. */ | |
169 _waitForPendingPorts(var message, void callback()) { | |
170 final finder = new PendingSendPortFinder(); | |
171 finder.traverse(message); | |
172 Futures.wait(finder.ports).then((_) => callback()); | |
173 } | |
174 | |
175 /** Default worker. */ | |
176 class MainWorker { | |
177 int id = 0; | |
178 void postMessage(msg) native "return \$globalThis.postMessage(msg);"; | |
179 void set onmessage(f) native "\$globalThis.onmessage = f;"; | |
180 void terminate() {} | |
181 } | |
182 | |
183 /** | |
184 * A web worker. This type is also defined in 'dart:dom', but we define it here | |
185 * to avoid introducing a dependency from corelib to dom. This definition uses a | |
186 * 'hidden' type (* prefix on the native name) to enforce that the type is | |
187 * defined dynamically only when web workers are actually available. | |
188 */ | |
189 class _Worker native "*Worker" { | |
190 get id() native "return this.id;"; | |
191 void set id(i) native "this.id = i;"; | |
192 void set onmessage(f) native "this.onmessage = f;"; | |
193 void postMessage(msg) native "return this.postMessage(msg);"; | |
194 } | |
195 | |
196 /** Context information tracked for each isolate. */ | 133 /** Context information tracked for each isolate. */ |
197 class IsolateContext { | 134 class IsolateContext { |
198 /** Current isolate id. */ | 135 /** Current isolate id. */ |
199 int id; | 136 int id; |
200 | 137 |
201 /** Registry of receive ports currently active on this isolate. */ | 138 /** Registry of receive ports currently active on this isolate. */ |
202 Map<int, ReceivePort> ports; | 139 Map<int, ReceivePort> ports; |
203 | 140 |
204 /** Holds isolate globals (statics and top-level properties). */ | 141 /** Holds isolate globals (statics and top-level properties). */ |
205 var isolateStatics; // native object containing all globals of an isolate. | 142 var isolateStatics; // native object containing all globals of an isolate. |
206 | 143 |
207 IsolateContext() { | 144 IsolateContext() { |
208 id = _globalState.nextIsolateId++; | 145 id = globalState.nextIsolateId++; |
209 ports = {}; | 146 ports = {}; |
210 initGlobals(); | 147 initGlobals(); |
211 } | 148 } |
212 | 149 |
213 // these are filled lazily the first time the isolate starts running. | 150 // these are filled lazily the first time the isolate starts running. |
214 void initGlobals() native 'this.isolateStatics = {};'; | 151 void initGlobals() native 'this.isolateStatics = {};'; |
215 | 152 |
216 /** | 153 /** |
217 * Run [code] in the context of the isolate represented by [this]. Note this | 154 * Run [code] in the context of the isolate represented by [this]. Note this |
218 * is called from JavaScript (see $wrap_call in corejs.dart). | 155 * is called from JavaScript (see $wrap_call in corejs.dart). |
219 */ | 156 */ |
220 void eval(Function code) { | 157 void eval(Function code) { |
221 var old = _globalState.currentContext; | 158 var old = globalState.currentContext; |
222 _globalState.currentContext = this; | 159 globalState.currentContext = this; |
223 this._setGlobals(); | 160 this._setGlobals(); |
224 var result = null; | 161 var result = null; |
225 try { | 162 try { |
226 result = code(); | 163 result = code(); |
227 } finally { | 164 } finally { |
228 _globalState.currentContext = old; | 165 globalState.currentContext = old; |
229 if (old != null) old._setGlobals(); | 166 if (old != null) old._setGlobals(); |
230 } | 167 } |
231 return result; | 168 return result; |
232 } | 169 } |
233 | 170 |
234 void _setGlobals() native @'$globals = this.isolateStatics;'; | 171 void _setGlobals() native @'$globals = this.isolateStatics;'; |
235 | 172 |
236 /** Lookup a port registered for this isolate. */ | 173 /** Lookup a port registered for this isolate. */ |
237 ReceivePort lookup(int id) => ports[id]; | 174 ReceivePort lookup(int id) => ports[id]; |
238 | 175 |
239 /** Register a port on this isolate. */ | 176 /** Register a port on this isolate. */ |
240 void register(int portId, ReceivePort port) { | 177 void register(int portId, ReceivePort port) { |
241 if (ports.containsKey(portId)) { | 178 if (ports.containsKey(portId)) { |
242 throw new Exception("Registry: ports must be registered only once."); | 179 throw new Exception("Registry: ports must be registered only once."); |
243 } | 180 } |
244 ports[portId] = port; | 181 ports[portId] = port; |
245 _globalState.isolates[id] = this; // indicate this isolate is active | 182 globalState.isolates[id] = this; // indicate this isolate is active |
246 } | 183 } |
247 | 184 |
248 /** Unregister a port on this isolate. */ | 185 /** Unregister a port on this isolate. */ |
249 void unregister(int portId) { | 186 void unregister(int portId) { |
250 ports.remove(portId); | 187 ports.remove(portId); |
251 if (ports.isEmpty()) { | 188 if (ports.isEmpty()) { |
252 _globalState.isolates.remove(id); // indicate this isolate is not active | 189 globalState.isolates.remove(id); // indicate this isolate is not active |
253 } | 190 } |
254 } | 191 } |
255 } | 192 } |
256 | 193 |
| 194 |
257 /** Represent the event loop on a javascript thread (DOM or worker). */ | 195 /** Represent the event loop on a javascript thread (DOM or worker). */ |
258 class EventLoop { | 196 class EventLoop { |
259 Queue<IsolateEvent> events; | 197 Queue<IsolateEvent> events; |
260 | 198 |
261 EventLoop() : events = new Queue<IsolateEvent>(); | 199 EventLoop() : events = new Queue<IsolateEvent>(); |
262 | 200 |
263 void enqueue(isolate, fn, msg) { | 201 void enqueue(isolate, fn, msg) { |
264 events.addLast(new IsolateEvent(isolate, fn, msg)); | 202 events.addLast(new IsolateEvent(isolate, fn, msg)); |
265 } | 203 } |
266 | 204 |
267 IsolateEvent dequeue() { | 205 IsolateEvent dequeue() { |
268 if (events.isEmpty()) return null; | 206 if (events.isEmpty()) return null; |
269 return events.removeFirst(); | 207 return events.removeFirst(); |
270 } | 208 } |
271 | 209 |
272 /** Process a single event, if any. */ | 210 /** Process a single event, if any. */ |
273 bool runIteration() { | 211 bool runIteration() { |
274 final event = dequeue(); | 212 final event = dequeue(); |
275 if (event == null) { | 213 if (event == null) { |
276 _globalState.closeWorker(); | 214 globalState.closeWorker(); |
277 return false; | 215 return false; |
278 } | 216 } |
279 event.process(); | 217 event.process(); |
280 return true; | 218 return true; |
281 } | 219 } |
282 | 220 |
283 /** Function equivalent to [:window.setTimeout:] when available, or null. */ | 221 /** Function equivalent to [:window.setTimeout:] when available, or null. */ |
284 static Function _wrapSetTimeout() native """ | 222 static Function _wrapSetTimeout() native """ |
285 return typeof window != 'undefined' ? | 223 return typeof window != 'undefined' ? |
286 function(a, b) { window.setTimeout(a, b); } : undefined; | 224 function(a, b) { window.setTimeout(a, b); } : undefined; |
(...skipping 16 matching lines...) Expand all Loading... |
303 // Run synchronously until no more iterations are available. | 241 // Run synchronously until no more iterations are available. |
304 while (runIteration()) {} | 242 while (runIteration()) {} |
305 } | 243 } |
306 } | 244 } |
307 | 245 |
308 /** | 246 /** |
309 * Call [_runHelper] but ensure that worker exceptions are propragated. Note | 247 * Call [_runHelper] but ensure that worker exceptions are propragated. Note |
310 * this is called from JavaScript (see $wrap_call in corejs.dart). | 248 * this is called from JavaScript (see $wrap_call in corejs.dart). |
311 */ | 249 */ |
312 void run() { | 250 void run() { |
313 if (!_globalState.isWorker) { | 251 if (!globalState.isWorker) { |
314 _runHelper(); | 252 _runHelper(); |
315 } else { | 253 } else { |
316 try { | 254 try { |
317 _runHelper(); | 255 _runHelper(); |
318 } catch(var e, var trace) { | 256 } catch(var e, var trace) { |
319 _globalState.mainWorker.postMessage(_serializeMessage( | 257 globalState.mainWorker.postMessage(_serializeMessage( |
320 {'command': 'error', 'msg': '$e\n$trace' })); | 258 {'command': 'error', 'msg': '$e\n$trace' })); |
321 } | 259 } |
322 } | 260 } |
323 } | 261 } |
324 } | 262 } |
325 | 263 |
326 /** An event in the top-level event queue. */ | 264 /** An event in the top-level event queue. */ |
327 class IsolateEvent { | 265 class IsolateEvent { |
328 IsolateContext isolate; | 266 IsolateContext isolate; |
329 Function fn; | 267 Function fn; |
330 String message; | 268 String message; |
331 | 269 |
332 IsolateEvent(this.isolate, this.fn, this.message); | 270 IsolateEvent(this.isolate, this.fn, this.message); |
333 | 271 |
334 void process() { | 272 void process() { |
335 isolate.eval(fn); | 273 isolate.eval(fn); |
336 } | 274 } |
337 } | 275 } |
338 | 276 |
339 /** Common functionality to all send ports. */ | |
340 class BaseSendPort implements SendPort { | |
341 /** Id for the destination isolate. */ | |
342 final int _isolateId; | |
343 | 277 |
344 BaseSendPort(this._isolateId); | 278 /** Default worker. */ |
345 | 279 class MainWorker { |
346 ReceivePortSingleShotImpl call(var message) { | 280 int id = 0; |
347 final result = new ReceivePortSingleShotImpl(); | 281 void postMessage(msg) native "return \$globalThis.postMessage(msg);"; |
348 this.send(message, result.toSendPort()); | 282 void set onmessage(f) native "\$globalThis.onmessage = f;"; |
349 return result; | 283 void terminate() {} |
350 } | |
351 | |
352 static void checkReplyTo(SendPort replyTo) { | |
353 if (replyTo !== null | |
354 && replyTo is! NativeJsSendPort | |
355 && replyTo is! WorkerSendPort | |
356 && replyTo is! BufferingSendPort) { | |
357 throw new Exception("SendPort.send: Illegal replyTo port type"); | |
358 } | |
359 } | |
360 | |
361 // TODO(sigmund): replace the current SendPort.call with the following: | |
362 //Future call(var message) { | |
363 // Â final completer = new Completer(); | |
364 // Â final port = new ReceivePort.singleShot(); | |
365 // Â send(message, port.toSendPort()); | |
366 // Â port.receive((value, ignoreReplyTo) { | |
367 // Â Â Â if (value is Exception) { | |
368 // Â completer.completeException(value); | |
369 // } else { | |
370 // completer.complete(value); | |
371 // } | |
372 // }); | |
373 // Â return completer.future; | |
374 //} | |
375 | |
376 abstract void send(var message, [SendPort replyTo]); | |
377 abstract bool operator ==(var other); | |
378 abstract int hashCode(); | |
379 } | 284 } |
380 | 285 |
381 /** A send port that delivers messages in-memory via native JavaScript calls. */ | 286 /** |
382 class NativeJsSendPort extends BaseSendPort implements SendPort { | 287 * A web worker. This type is also defined in 'dart:dom', but we define it here |
383 final ReceivePortImpl _receivePort; | 288 * to avoid introducing a dependency from corelib to dom. This definition uses a |
384 | 289 * 'hidden' type (* prefix on the native name) to enforce that the type is |
385 const NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); | 290 * defined dynamically only when web workers are actually available. |
386 | 291 */ |
387 void send(var message, [SendPort replyTo = null]) { | 292 class _Worker native "*Worker" { |
388 _waitForPendingPorts([message, replyTo], () { | 293 get id() native "return this.id;"; |
389 checkReplyTo(replyTo); | 294 void set id(i) native "this.id = i;"; |
390 // Check that the isolate still runs and the port is still open | 295 void set onmessage(f) native "this.onmessage = f;"; |
391 final isolate = _globalState.isolates[_isolateId]; | 296 void postMessage(msg) native "return this.postMessage(msg);"; |
392 if (isolate == null) return; | |
393 if (_receivePort._callback == null) return; | |
394 | |
395 // We force serialization/deserialization as a simple way to ensure | |
396 // isolate communication restrictions are respected between isolates that | |
397 // live in the same worker. NativeJsSendPort delivers both messages from | |
398 // the same worker and messages from other workers. In particular, | |
399 // messages sent from a worker via a WorkerSendPort are received at | |
400 // [_processWorkerMessage] and forwarded to a native port. In such cases, | |
401 // here we'll see [_globalState.currentContext == null]. | |
402 final shouldSerialize = _globalState.currentContext != null | |
403 && _globalState.currentContext.id != _isolateId; | |
404 var msg = message; | |
405 var reply = replyTo; | |
406 if (shouldSerialize) { | |
407 msg = _serializeMessage(msg); | |
408 reply = _serializeMessage(reply); | |
409 } | |
410 _globalState.topEventLoop.enqueue(isolate, () { | |
411 if (_receivePort._callback != null) { | |
412 if (shouldSerialize) { | |
413 msg = _deserializeMessage(msg); | |
414 reply = _deserializeMessage(reply); | |
415 } | |
416 _receivePort._callback(msg, reply); | |
417 } | |
418 }, 'receive ' + message); | |
419 }); | |
420 } | |
421 | |
422 bool operator ==(var other) => (other is NativeJsSendPort) && | |
423 (_receivePort == other._receivePort); | |
424 | |
425 int hashCode() => _receivePort._id; | |
426 } | |
427 | |
428 /** A send port that delivers messages via worker.postMessage. */ | |
429 class WorkerSendPort extends BaseSendPort implements SendPort { | |
430 final int _workerId; | |
431 final int _receivePortId; | |
432 | |
433 const WorkerSendPort(this._workerId, int isolateId, this._receivePortId) | |
434 : super(isolateId); | |
435 | |
436 void send(var message, [SendPort replyTo = null]) { | |
437 _waitForPendingPorts([message, replyTo], () { | |
438 checkReplyTo(replyTo); | |
439 final workerMessage = _serializeMessage({ | |
440 'command': 'message', | |
441 'port': this, | |
442 'msg': message, | |
443 'replyTo': replyTo}); | |
444 | |
445 if (_globalState.isWorker) { | |
446 // communication from one worker to another go through the main worker: | |
447 _globalState.mainWorker.postMessage(workerMessage); | |
448 } else { | |
449 _globalState.workers[_workerId].postMessage(workerMessage); | |
450 } | |
451 }); | |
452 } | |
453 | |
454 bool operator ==(var other) { | |
455 return (other is WorkerSendPort) && | |
456 (_workerId == other._workerId) && | |
457 (_isolateId == other._isolateId) && | |
458 (_receivePortId == other._receivePortId); | |
459 } | |
460 | |
461 int hashCode() { | |
462 // TODO(sigmund): use a standard hash when we get one available in corelib. | |
463 return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId; | |
464 } | |
465 } | |
466 | |
467 /** A port that buffers messages until an underlying port gets resolved. */ | |
468 class BufferingSendPort extends BaseSendPort implements SendPort { | |
469 /** Internal counter to assign unique ids to each port. */ | |
470 static int _idCount = 0; | |
471 | |
472 /** For implementing equals and hashcode. */ | |
473 final int _id; | |
474 | |
475 /** Underlying port, when resolved. */ | |
476 SendPort _port; | |
477 | |
478 /** | |
479 * Future of the underlying port, so that we can detect when this port can be | |
480 * sent on messages. | |
481 */ | |
482 Future<SendPort> _futurePort; | |
483 | |
484 /** Pending messages (and reply ports). */ | |
485 List pending; | |
486 | |
487 BufferingSendPort(isolateId, this._futurePort) | |
488 : super(isolateId), _id = _idCount, pending = [] { | |
489 _idCount++; | |
490 _futurePort.then((p) { | |
491 _port = p; | |
492 for (final item in pending) { | |
493 p.send(item['message'], item['replyTo']); | |
494 } | |
495 pending = null; | |
496 }); | |
497 } | |
498 | |
499 BufferingSendPort.fromPort(isolateId, this._port) | |
500 : super(isolateId), _id = _idCount { | |
501 _idCount++; | |
502 } | |
503 | |
504 void send(var message, [SendPort replyTo]) { | |
505 if (_port != null) { | |
506 _port.send(message, replyTo); | |
507 } else { | |
508 pending.add({'message': message, 'replyTo': replyTo}); | |
509 } | |
510 } | |
511 | |
512 bool operator ==(var other) => other is BufferingSendPort && _id == other._id; | |
513 int hashCode() => _id; | |
514 } | |
515 | |
516 /** Default factory for receive ports. */ | |
517 class ReceivePortFactory { | |
518 | |
519 factory ReceivePort() { | |
520 return new ReceivePortImpl(); | |
521 } | |
522 | |
523 factory ReceivePort.singleShot() { | |
524 return new ReceivePortSingleShotImpl(); | |
525 } | |
526 } | |
527 | |
528 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ | |
529 class ReceivePortImpl implements ReceivePort { | |
530 int _id; | |
531 Function _callback; | |
532 static int _nextFreeId = 1; | |
533 | |
534 ReceivePortImpl() | |
535 : _id = _nextFreeId++ { | |
536 _globalState.currentContext.register(_id, this); | |
537 } | |
538 | |
539 void receive(void onMessage(var message, SendPort replyTo)) { | |
540 _callback = onMessage; | |
541 } | |
542 | |
543 void close() { | |
544 _callback = null; | |
545 _globalState.currentContext.unregister(_id); | |
546 } | |
547 | |
548 SendPort toSendPort() { | |
549 return new NativeJsSendPort(this, _globalState.currentContext.id); | |
550 } | |
551 } | |
552 | |
553 /** Implementation of a single-shot [ReceivePort]. */ | |
554 class ReceivePortSingleShotImpl implements ReceivePort { | |
555 | |
556 ReceivePortSingleShotImpl() : _port = new ReceivePortImpl() { } | |
557 | |
558 void receive(void callback(var message, SendPort replyTo)) { | |
559 _port.receive((var message, SendPort replyTo) { | |
560 _port.close(); | |
561 callback(message, replyTo); | |
562 }); | |
563 } | |
564 | |
565 void close() { | |
566 _port.close(); | |
567 } | |
568 | |
569 SendPort toSendPort() => _port.toSendPort(); | |
570 | |
571 final ReceivePortImpl _port; | |
572 } | 297 } |
573 | 298 |
574 final String _SPAWNED_SIGNAL = "spawned"; | 299 final String _SPAWNED_SIGNAL = "spawned"; |
575 | 300 |
576 class IsolateNatives { | 301 class _IsolateNatives { |
577 | 302 |
578 /** JavaScript-specific implementation to spawn an isolate. */ | 303 /** JavaScript-specific implementation to spawn an isolate. */ |
579 static Future<SendPort> spawn(Isolate isolate, bool isLight) { | 304 static Future<SendPort> spawn(Isolate isolate, bool isLight) { |
580 Completer<SendPort> completer = new Completer<SendPort>(); | 305 Completer<SendPort> completer = new Completer<SendPort>(); |
581 ReceivePort port = new ReceivePort.singleShot(); | 306 ReceivePort port = new ReceivePort.singleShot(); |
582 port.receive((msg, SendPort replyPort) { | 307 port.receive((msg, SendPort replyPort) { |
583 assert(msg == _SPAWNED_SIGNAL); | 308 assert(msg == _SPAWNED_SIGNAL); |
584 completer.complete(replyPort); | 309 completer.complete(replyPort); |
585 }); | 310 }); |
586 | 311 |
587 // TODO(floitsch): throw exception if isolate's class doesn't have a | 312 // TODO(floitsch): throw exception if isolate's class doesn't have a |
588 // default constructor. | 313 // default constructor. |
589 if (_globalState.useWorkers && !isLight) { | 314 if (globalState.useWorkers && !isLight) { |
590 _startWorker(isolate, port.toSendPort()); | 315 _startWorker(isolate, port.toSendPort()); |
591 } else { | 316 } else { |
592 _startNonWorker(isolate, port.toSendPort()); | 317 _startNonWorker(isolate, port.toSendPort()); |
593 } | 318 } |
594 | 319 |
595 return completer.future; | 320 return completer.future; |
596 } | 321 } |
597 | 322 |
598 static SendPort _startWorker(Isolate runnable, SendPort replyPort) { | 323 static SendPort _startWorker(Isolate runnable, SendPort replyPort) { |
599 var factoryName = _getJSConstructorName(runnable); | 324 var factoryName = _getJSConstructorName(runnable); |
600 if (_globalState.isWorker) { | 325 if (globalState.isWorker) { |
601 _globalState.mainWorker.postMessage(_serializeMessage({ | 326 globalState.mainWorker.postMessage(_serializeMessage({ |
602 'command': 'spawn-worker', | 327 'command': 'spawn-worker', |
603 'factoryName': factoryName, | 328 'factoryName': factoryName, |
604 'replyPort': _serializeMessage(replyPort)})); | 329 'replyPort': _serializeMessage(replyPort)})); |
605 } else { | 330 } else { |
606 _spawnWorker(factoryName, _serializeMessage(replyPort)); | 331 _spawnWorker(factoryName, _serializeMessage(replyPort)); |
607 } | 332 } |
608 } | 333 } |
609 | 334 |
610 /** | 335 /** |
611 * The src url for the script tag that loaded this code. Used to create | 336 * The src url for the script tag that loaded this code. Used to create |
(...skipping 14 matching lines...) Expand all Loading... |
626 var scripts = document.getElementsByTagName('script'); | 351 var scripts = document.getElementsByTagName('script'); |
627 // The scripts variable only contains the scripts that have already been | 352 // The scripts variable only contains the scripts that have already been |
628 // executed. The last one is the currently running script. | 353 // executed. The last one is the currently running script. |
629 var script = scripts[scripts.length - 1]; | 354 var script = scripts[scripts.length - 1]; |
630 var src = script && script.src; | 355 var src = script && script.src; |
631 if (!src) { | 356 if (!src) { |
632 // TODO() | 357 // TODO() |
633 src = "FIXME:5407062" + "_" + Math.random().toString(); | 358 src = "FIXME:5407062" + "_" + Math.random().toString(); |
634 if (script) script.src = src; | 359 if (script) script.src = src; |
635 } | 360 } |
636 IsolateNatives._thisScriptCache = src; | 361 _IsolateNatives._thisScriptCache = src; |
637 return src; | 362 return src; |
638 """; | 363 """; |
639 | 364 |
640 /** Starts a new worker with the given URL. */ | 365 /** Starts a new worker with the given URL. */ |
641 static _Worker _newWorker(url) native "return new Worker(url);"; | 366 static _Worker _newWorker(url) native "return new Worker(url);"; |
642 | 367 |
643 /** | 368 /** |
644 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor | 369 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor |
645 * name for the isolate entry point class. | 370 * name for the isolate entry point class. |
646 */ | 371 */ |
647 static void _spawnWorker(factoryName, serializedReplyPort) { | 372 static void _spawnWorker(factoryName, serializedReplyPort) { |
648 final worker = _newWorker(_thisScript); | 373 final worker = _newWorker(_thisScript); |
649 worker.onmessage = (e) { _processWorkerMessage(worker, e); }; | 374 worker.onmessage = (e) { _processWorkerMessage(worker, e); }; |
650 var workerId = _globalState.nextWorkerId++; | 375 var workerId = globalState.nextWorkerId++; |
651 // We also store the id on the worker itself so that we can unregister it. | 376 // We also store the id on the worker itself so that we can unregister it. |
652 worker.id = workerId; | 377 worker.id = workerId; |
653 _globalState.workers[workerId] = worker; | 378 globalState.workers[workerId] = worker; |
654 worker.postMessage(_serializeMessage({ | 379 worker.postMessage(_serializeMessage({ |
655 'command': 'start', | 380 'command': 'start', |
656 'id': workerId, | 381 'id': workerId, |
657 'replyTo': serializedReplyPort, | 382 'replyTo': serializedReplyPort, |
658 'factoryName': factoryName })); | 383 'factoryName': factoryName })); |
659 } | 384 } |
660 | 385 |
661 /** | 386 /** |
662 * Assume that [e] is a browser message event and extract its message data. | 387 * Assume that [e] is a browser message event and extract its message data. |
663 * We don't import the dom explicitly so, when workers are disabled, this | 388 * We don't import the dom explicitly so, when workers are disabled, this |
664 * library can also run on top of nodejs. | 389 * library can also run on top of nodejs. |
665 */ | 390 */ |
666 static _getEventData(e) native "return e.data"; | 391 static _getEventData(e) native "return e.data"; |
667 | 392 |
668 /** | 393 /** |
669 * Process messages on a worker, either to control the worker instance or to | 394 * Process messages on a worker, either to control the worker instance or to |
670 * pass messages along to the isolate running in the worker. | 395 * pass messages along to the isolate running in the worker. |
671 */ | 396 */ |
672 static void _processWorkerMessage(sender, e) { | 397 static void _processWorkerMessage(sender, e) { |
673 var msg = _deserializeMessage(_getEventData(e)); | 398 var msg = _deserializeMessage(_getEventData(e)); |
674 switch (msg['command']) { | 399 switch (msg['command']) { |
675 // TODO(sigmund): delete after we migrate to Isolate2 | 400 // TODO(sigmund): delete after we migrate to Isolate2 |
676 case 'start': | 401 case 'start': |
677 _globalState.currentWorkerId = msg['id']; | 402 globalState.currentWorkerId = msg['id']; |
678 var runnerObject = | 403 var runnerObject = |
679 _allocate(_getJSConstructorFromName(msg['factoryName'])); | 404 _allocate(_getJSConstructorFromName(msg['factoryName'])); |
680 var serializedReplyTo = msg['replyTo']; | 405 var serializedReplyTo = msg['replyTo']; |
681 _globalState.topEventLoop.enqueue(new IsolateContext(), function() { | 406 globalState.topEventLoop.enqueue(new IsolateContext(), function() { |
682 var replyTo = _deserializeMessage(serializedReplyTo); | 407 var replyTo = _deserializeMessage(serializedReplyTo); |
683 _startIsolate(runnerObject, replyTo); | 408 _startIsolate(runnerObject, replyTo); |
684 }, 'worker-start'); | 409 }, 'worker-start'); |
685 _globalState.topEventLoop.run(); | 410 globalState.topEventLoop.run(); |
686 break; | 411 break; |
687 case 'start2': | 412 case 'start2': |
688 _globalState.currentWorkerId = msg['id']; | 413 globalState.currentWorkerId = msg['id']; |
689 Function entryPoint = _getJSFunctionFromName(msg['functionName']); | 414 Function entryPoint = _getJSFunctionFromName(msg['functionName']); |
690 var replyTo = _deserializeMessage(msg['replyTo']); | 415 var replyTo = _deserializeMessage(msg['replyTo']); |
691 _globalState.topEventLoop.enqueue(new IsolateContext(), function() { | 416 globalState.topEventLoop.enqueue(new IsolateContext(), function() { |
692 _startIsolate2(entryPoint, replyTo); | 417 _startIsolate2(entryPoint, replyTo); |
693 }, 'worker-start'); | 418 }, 'worker-start'); |
694 _globalState.topEventLoop.run(); | 419 globalState.topEventLoop.run(); |
695 break; | 420 break; |
696 // TODO(sigmund): delete after we migrate to Isolate2 | 421 // TODO(sigmund): delete after we migrate to Isolate2 |
697 case 'spawn-worker': | 422 case 'spawn-worker': |
698 _spawnWorker(msg['factoryName'], msg['replyPort']); | 423 _spawnWorker(msg['factoryName'], msg['replyPort']); |
699 break; | 424 break; |
700 case 'spawn-worker2': | 425 case 'spawn-worker2': |
701 _spawnWorker2(msg['functionName'], msg['uri'], msg['replyPort']); | 426 _spawnWorker2(msg['functionName'], msg['uri'], msg['replyPort']); |
702 break; | 427 break; |
703 case 'message': | 428 case 'message': |
704 msg['port'].send(msg['msg'], msg['replyTo']); | 429 msg['port'].send(msg['msg'], msg['replyTo']); |
705 _globalState.topEventLoop.run(); | 430 globalState.topEventLoop.run(); |
706 break; | 431 break; |
707 case 'close': | 432 case 'close': |
708 _log("Closing Worker"); | 433 _log("Closing Worker"); |
709 _globalState.workers.remove(sender.id); | 434 globalState.workers.remove(sender.id); |
710 sender.terminate(); | 435 sender.terminate(); |
711 _globalState.topEventLoop.run(); | 436 globalState.topEventLoop.run(); |
712 break; | 437 break; |
713 case 'log': | 438 case 'log': |
714 _log(msg['msg']); | 439 _log(msg['msg']); |
715 break; | 440 break; |
716 case 'print': | 441 case 'print': |
717 if (_globalState.isWorker) { | 442 if (globalState.isWorker) { |
718 _globalState.mainWorker.postMessage( | 443 globalState.mainWorker.postMessage( |
719 _serializeMessage({'command': 'print', 'msg': msg})); | 444 _serializeMessage({'command': 'print', 'msg': msg})); |
720 } else { | 445 } else { |
721 print(msg['msg']); | 446 print(msg['msg']); |
722 } | 447 } |
723 break; | 448 break; |
724 case 'error': | 449 case 'error': |
725 throw msg['msg']; | 450 throw msg['msg']; |
726 } | 451 } |
727 } | 452 } |
728 | 453 |
729 /** Log a message, forwarding to the main worker if appropriate. */ | 454 /** Log a message, forwarding to the main worker if appropriate. */ |
730 static _log(msg) { | 455 static _log(msg) { |
731 if (_globalState.isWorker) { | 456 if (globalState.isWorker) { |
732 _globalState.mainWorker.postMessage( | 457 globalState.mainWorker.postMessage( |
733 _serializeMessage({'command': 'log', 'msg': msg })); | 458 _serializeMessage({'command': 'log', 'msg': msg })); |
734 } else { | 459 } else { |
735 try { | 460 try { |
736 _consoleLog(msg); | 461 _consoleLog(msg); |
737 } catch(e, trace) { | 462 } catch(e, trace) { |
738 throw new Exception(trace); | 463 throw new Exception(trace); |
739 } | 464 } |
740 } | 465 } |
741 } | 466 } |
742 | 467 |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
774 /** Starts a non-worker isolate. */ | 499 /** Starts a non-worker isolate. */ |
775 static SendPort _startNonWorker(Isolate runnable, SendPort replyTo) { | 500 static SendPort _startNonWorker(Isolate runnable, SendPort replyTo) { |
776 // Spawn a new isolate and create the receive port in it. | 501 // Spawn a new isolate and create the receive port in it. |
777 final spawned = new IsolateContext(); | 502 final spawned = new IsolateContext(); |
778 | 503 |
779 // Instead of just running the provided runnable, we create a | 504 // Instead of just running the provided runnable, we create a |
780 // new cloned instance of it with a fresh state in the spawned | 505 // new cloned instance of it with a fresh state in the spawned |
781 // isolate. This way, we do not get cross-isolate references | 506 // isolate. This way, we do not get cross-isolate references |
782 // through the runnable. | 507 // through the runnable. |
783 final ctor = _getJSConstructor(runnable); | 508 final ctor = _getJSConstructor(runnable); |
784 _globalState.topEventLoop.enqueue(spawned, function() { | 509 globalState.topEventLoop.enqueue(spawned, function() { |
785 _startIsolate(_allocate(ctor), replyTo); | 510 _startIsolate(_allocate(ctor), replyTo); |
786 }, 'nonworker start'); | 511 }, 'nonworker start'); |
787 } | 512 } |
788 | 513 |
789 /** Given a ready-to-start runnable, start running it. */ | 514 /** Given a ready-to-start runnable, start running it. */ |
790 static void _startIsolate(Isolate isolate, SendPort replyTo) { | 515 static void _startIsolate(Isolate isolate, SendPort replyTo) { |
791 _fillStatics(_globalState.currentContext); | 516 fillStatics(globalState.currentContext); |
792 ReceivePort port = new ReceivePort(); | 517 ReceivePort port = new ReceivePort(); |
793 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); | 518 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); |
794 isolate._run(port); | 519 isolate._run(port); |
795 } | 520 } |
796 | 521 |
797 // TODO(sigmund): clean up above, after we make the new API the default: | 522 // TODO(sigmund): clean up above, after we make the new API the default: |
798 | 523 |
799 static _spawn2(String functionName, String uri, bool isLight) { | 524 static _spawn2(String functionName, String uri, bool isLight) { |
800 Completer<SendPort> completer = new Completer<SendPort>(); | 525 Completer<SendPort> completer = new Completer<SendPort>(); |
801 ReceivePort port = new ReceivePort.singleShot(); | 526 ReceivePort port = new ReceivePort.singleShot(); |
802 port.receive((msg, SendPort replyPort) { | 527 port.receive((msg, SendPort replyPort) { |
803 assert(msg == _SPAWNED_SIGNAL); | 528 assert(msg == _SPAWNED_SIGNAL); |
804 completer.complete(replyPort); | 529 completer.complete(replyPort); |
805 }); | 530 }); |
806 | 531 |
807 SendPort signalReply = port.toSendPort(); | 532 SendPort signalReply = port.toSendPort(); |
808 | 533 |
809 if (_globalState.useWorkers && !isLight) { | 534 if (globalState.useWorkers && !isLight) { |
810 _startWorker2(functionName, uri, signalReply); | 535 _startWorker2(functionName, uri, signalReply); |
811 } else { | 536 } else { |
812 _startNonWorker2(functionName, uri, signalReply); | 537 _startNonWorker2(functionName, uri, signalReply); |
813 } | 538 } |
814 return new BufferingSendPort( | 539 return new _BufferingSendPort( |
815 _globalState.currentContext.id, completer.future); | 540 globalState.currentContext.id, completer.future); |
816 } | 541 } |
817 | 542 |
818 static SendPort _startWorker2( | 543 static SendPort _startWorker2( |
819 String functionName, String uri, SendPort replyPort) { | 544 String functionName, String uri, SendPort replyPort) { |
820 if (_globalState.isWorker) { | 545 if (globalState.isWorker) { |
821 _globalState.mainWorker.postMessage(_serializeMessage({ | 546 globalState.mainWorker.postMessage(_serializeMessage({ |
822 'command': 'spawn-worker2', | 547 'command': 'spawn-worker2', |
823 'functionName': functionName, | 548 'functionName': functionName, |
824 'uri': uri, | 549 'uri': uri, |
825 'replyPort': replyPort})); | 550 'replyPort': replyPort})); |
826 } else { | 551 } else { |
827 _spawnWorker2(functionName, uri, replyPort); | 552 _spawnWorker2(functionName, uri, replyPort); |
828 } | 553 } |
829 } | 554 } |
830 | 555 |
831 static SendPort _startNonWorker2( | 556 static SendPort _startNonWorker2( |
832 String functionName, String uri, SendPort replyPort) { | 557 String functionName, String uri, SendPort replyPort) { |
833 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. | 558 // TODO(eub): support IE9 using an iframe -- Dart issue 1702. |
834 if (uri != null) throw new UnsupportedOperationException( | 559 if (uri != null) throw new UnsupportedOperationException( |
835 "Currently Isolate2.fromUri is not supported without web workers."); | 560 "Currently Isolate2.fromUri is not supported without web workers."); |
836 _globalState.topEventLoop.enqueue(new IsolateContext(), function() { | 561 globalState.topEventLoop.enqueue(new IsolateContext(), function() { |
837 final func = _getJSFunctionFromName(functionName); | 562 final func = _getJSFunctionFromName(functionName); |
838 _startIsolate2(func, replyPort); | 563 _startIsolate2(func, replyPort); |
839 }, 'nonworker start'); | 564 }, 'nonworker start'); |
840 } | 565 } |
841 | 566 |
842 static void _startIsolate2(Function topLevel, SendPort replyTo) { | 567 static void _startIsolate2(Function topLevel, SendPort replyTo) { |
843 _fillStatics(_globalState.currentContext); | 568 fillStatics(globalState.currentContext); |
844 final port = new ReceivePort(); | 569 final port = new ReceivePort(); |
845 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); | 570 replyTo.send(_SPAWNED_SIGNAL, port.toSendPort()); |
846 topLevel(port); | 571 topLevel(port); |
847 } | 572 } |
848 | 573 |
849 /** | 574 /** |
850 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor | 575 * Spawns an isolate in a worker. [factoryName] is the Javascript constructor |
851 * name for the isolate entry point class. | 576 * name for the isolate entry point class. |
852 */ | 577 */ |
853 static void _spawnWorker2(functionName, uri, replyPort) { | 578 static void _spawnWorker2(functionName, uri, replyPort) { |
854 // TODO(eub): convert to 'main' once we switch back to port at top-level. | 579 // TODO(eub): convert to 'main' once we switch back to port at top-level. |
855 if (functionName == null) functionName = 'isolateMain'; | 580 if (functionName == null) functionName = 'isolateMain'; |
856 if (uri == null) uri = _thisScript; | 581 if (uri == null) uri = _thisScript; |
857 final worker = _newWorker(uri); | 582 final worker = _newWorker(uri); |
858 worker.onmessage = (e) { _processWorkerMessage(worker, e); }; | 583 worker.onmessage = (e) { _processWorkerMessage(worker, e); }; |
859 var workerId = _globalState.nextWorkerId++; | 584 var workerId = globalState.nextWorkerId++; |
860 // We also store the id on the worker itself so that we can unregister it. | 585 // We also store the id on the worker itself so that we can unregister it. |
861 worker.id = workerId; | 586 worker.id = workerId; |
862 _globalState.workers[workerId] = worker; | 587 globalState.workers[workerId] = worker; |
863 worker.postMessage(_serializeMessage({ | 588 worker.postMessage(_serializeMessage({ |
864 'command': 'start2', | 589 'command': 'start2', |
865 'id': workerId, | 590 'id': workerId, |
866 // Note: we serialize replyPort twice because the child worker needs to | 591 // Note: we serialize replyPort twice because the child worker needs to |
867 // first deserialize the worker id, before it can correctly deserialize | 592 // first deserialize the worker id, before it can correctly deserialize |
868 // the port (port deserialization is sensitive to what is the current | 593 // the port (port deserialization is sensitive to what is the current |
869 // workerId). | 594 // workerId). |
870 'replyTo': _serializeMessage(replyPort), | 595 'replyTo': _serializeMessage(replyPort), |
871 'functionName': functionName })); | 596 'functionName': functionName })); |
872 } | 597 } |
873 } | 598 } |
874 | |
875 class Isolate2Impl implements Isolate2 { | |
876 SendPort sendPort; | |
877 | |
878 Isolate2Impl(this.sendPort); | |
879 | |
880 void stop() {} | |
881 } | |
882 | |
883 class IsolateFactory implements Isolate2 { | |
884 | |
885 factory Isolate2.fromCode(Function topLevelFunction) { | |
886 final name = IsolateNatives._getJSFunctionName(topLevelFunction); | |
887 if (name == null) { | |
888 throw new UnsupportedOperationException( | |
889 "only top-level functions can be spawned."); | |
890 } | |
891 return new Isolate2Impl(IsolateNatives._spawn2(name, null, false)); | |
892 } | |
893 | |
894 factory Isolate2.fromUri(String uri) { | |
895 return new Isolate2Impl(IsolateNatives._spawn2(null, uri, false)); | |
896 } | |
897 } | |
OLD | NEW |