Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(556)

Side by Side Diff: lib/isolate/frog/ports.dart

Issue 9422019: isolates refactor: this change introduces 'dart:isolate' as a library. This is a (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: '' Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « lib/isolate/frog/natives.js ('k') | lib/isolate/isolate_api.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /** Common functionality to all send ports. */
6 class _BaseSendPort implements SendPort {
7 /** Id for the destination isolate. */
8 final int _isolateId;
9
10 _BaseSendPort(this._isolateId);
11
12 _ReceivePortSingleShotImpl call(var message) {
13 final result = new _ReceivePortSingleShotImpl();
14 this.send(message, result.toSendPort());
15 return result;
16 }
17
18 static void checkReplyTo(SendPort replyTo) {
19 if (replyTo !== null
20 && replyTo is! _NativeJsSendPort
21 && replyTo is! _WorkerSendPort
22 && replyTo is! _BufferingSendPort) {
23 throw new Exception("SendPort.send: Illegal replyTo port type");
24 }
25 }
26
27 // TODO(sigmund): replace the current SendPort.call with the following:
28 //Future call(var message) {
29 //  final completer = new Completer();
30 //  final port = new _ReceivePort.singleShot();
31 //  send(message, port.toSendPort());
32 //  port.receive((value, ignoreReplyTo) {
33 //    if (value is Exception) {
34 //  completer.completeException(value);
35 // } else {
36 // completer.complete(value);
37 // }
38 // });
39 //  return completer.future;
40 //}
41
42 abstract void send(var message, [SendPort replyTo]);
43 abstract bool operator ==(var other);
44 abstract int hashCode();
45 }
46
47 /** A send port that delivers messages in-memory via native JavaScript calls. */
48 class _NativeJsSendPort extends _BaseSendPort implements SendPort {
49 final _ReceivePortImpl _receivePort;
50
51 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
52
53 void send(var message, [SendPort replyTo = null]) {
54 _waitForPendingPorts([message, replyTo], () {
55 checkReplyTo(replyTo);
56 // Check that the isolate still runs and the port is still open
57 final isolate = globalState.isolates[_isolateId];
58 if (isolate == null) return;
59 if (_receivePort._callback == null) return;
60
61 // We force serialization/deserialization as a simple way to ensure
62 // isolate communication restrictions are respected between isolates that
63 // live in the same worker. [_NativeJsSendPort] delivers both messages
64 // from the same worker and messages from other workers. In particular,
65 // messages sent from a worker via a [_WorkerSendPort] are received at
66 // [_processWorkerMessage] and forwarded to a native port. In such cases,
67 // here we'll see [globalState.currentContext == null].
68 final shouldSerialize = globalState.currentContext != null
69 && globalState.currentContext.id != _isolateId;
70 var msg = message;
71 var reply = replyTo;
72 if (shouldSerialize) {
73 msg = _serializeMessage(msg);
74 reply = _serializeMessage(reply);
75 }
76 globalState.topEventLoop.enqueue(isolate, () {
77 if (_receivePort._callback != null) {
78 if (shouldSerialize) {
79 msg = _deserializeMessage(msg);
80 reply = _deserializeMessage(reply);
81 }
82 _receivePort._callback(msg, reply);
83 }
84 }, 'receive ' + message);
85 });
86 }
87
88 bool operator ==(var other) => (other is _NativeJsSendPort) &&
89 (_receivePort == other._receivePort);
90
91 int hashCode() => _receivePort._id;
92 }
93
94 /** A send port that delivers messages via worker.postMessage. */
95 class _WorkerSendPort extends _BaseSendPort implements SendPort {
96 final int _workerId;
97 final int _receivePortId;
98
99 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
100 : super(isolateId);
101
102 void send(var message, [SendPort replyTo = null]) {
103 _waitForPendingPorts([message, replyTo], () {
104 checkReplyTo(replyTo);
105 final workerMessage = _serializeMessage({
106 'command': 'message',
107 'port': this,
108 'msg': message,
109 'replyTo': replyTo});
110
111 if (globalState.isWorker) {
112 // communication from one worker to another go through the main worker:
113 globalState.mainWorker.postMessage(workerMessage);
114 } else {
115 globalState.workers[_workerId].postMessage(workerMessage);
116 }
117 });
118 }
119
120 bool operator ==(var other) {
121 return (other is _WorkerSendPort) &&
122 (_workerId == other._workerId) &&
123 (_isolateId == other._isolateId) &&
124 (_receivePortId == other._receivePortId);
125 }
126
127 int hashCode() {
128 // TODO(sigmund): use a standard hash when we get one available in corelib.
129 return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId;
130 }
131 }
132
133 /** A port that buffers messages until an underlying port gets resolved. */
134 class _BufferingSendPort extends _BaseSendPort implements SendPort {
135 /** Internal counter to assign unique ids to each port. */
136 static int _idCount = 0;
137
138 /** For implementing equals and hashcode. */
139 final int _id;
140
141 /** Underlying port, when resolved. */
142 SendPort _port;
143
144 /**
145 * Future of the underlying port, so that we can detect when this port can be
146 * sent on messages.
147 */
148 Future<SendPort> _futurePort;
149
150 /** Pending messages (and reply ports). */
151 List pending;
152
153 _BufferingSendPort(isolateId, this._futurePort)
154 : super(isolateId), _id = _idCount, pending = [] {
155 _idCount++;
156 _futurePort.then((p) {
157 _port = p;
158 for (final item in pending) {
159 p.send(item['message'], item['replyTo']);
160 }
161 pending = null;
162 });
163 }
164
165 _BufferingSendPort.fromPort(isolateId, this._port)
166 : super(isolateId), _id = _idCount {
167 _idCount++;
168 }
169
170 void send(var message, [SendPort replyTo]) {
171 if (_port != null) {
172 _port.send(message, replyTo);
173 } else {
174 pending.add({'message': message, 'replyTo': replyTo});
175 }
176 }
177
178 bool operator ==(var other) =>
179 other is _BufferingSendPort && _id == other._id;
180 int hashCode() => _id;
181 }
182
183 /** Default factory for receive ports. */
184 class _ReceivePortFactory {
185
186 factory ReceivePort() {
187 return new _ReceivePortImpl();
188 }
189
190 factory ReceivePort.singleShot() {
191 return new _ReceivePortSingleShotImpl();
192 }
193 }
194
195 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
196 class _ReceivePortImpl implements ReceivePort {
197 int _id;
198 Function _callback;
199 static int _nextFreeId = 1;
200
201 _ReceivePortImpl()
202 : _id = _nextFreeId++ {
203 globalState.currentContext.register(_id, this);
204 }
205
206 void receive(void onMessage(var message, SendPort replyTo)) {
207 _callback = onMessage;
208 }
209
210 void close() {
211 _callback = null;
212 globalState.currentContext.unregister(_id);
213 }
214
215 SendPort toSendPort() {
216 return new _NativeJsSendPort(this, globalState.currentContext.id);
217 }
218 }
219
220 /** Implementation of a single-shot [ReceivePort]. */
221 class _ReceivePortSingleShotImpl implements ReceivePort {
222
223 _ReceivePortSingleShotImpl() : _port = new _ReceivePortImpl() { }
224
225 void receive(void callback(var message, SendPort replyTo)) {
226 _port.receive((var message, SendPort replyTo) {
227 _port.close();
228 callback(message, replyTo);
229 });
230 }
231
232 void close() {
233 _port.close();
234 }
235
236 SendPort toSendPort() => _port.toSendPort();
237
238 final _ReceivePortImpl _port;
239 }
240
241 /** Wait until all ports in a message are resolved. */
242 _waitForPendingPorts(var message, void callback()) {
243 final finder = new _PendingSendPortFinder();
244 finder.traverse(message);
245 Futures.wait(finder.ports).then((_) => callback());
246 }
247
248
249 /** Visitor that finds all unresolved [SendPort]s in a message. */
250 class _PendingSendPortFinder extends _MessageTraverser {
251 List<Future<SendPort>> ports;
252 _PendingSendPortFinder() : super(), ports = [];
253
254 visitPrimitive(x) {}
255 visitNativeJsSendPort(_NativeJsSendPort port) {}
256 visitWorkerSendPort(_WorkerSendPort port) {}
257 visitReceivePort(_ReceivePortImpl port) {}
258 visitReceivePortSingleShot(_ReceivePortSingleShotImpl port) {}
259
260 visitList(List list) {
261 final visited = _getInfo(list);
262 if (visited !== null) return;
263 _attachInfo(list, true);
264 // TODO(sigmund): replace with the following: (bug #1660)
265 // list.forEach(_dispatch);
266 list.forEach((e) => _dispatch(e));
267 }
268
269 visitMap(Map map) {
270 final visited = _getInfo(map);
271 if (visited !== null) return;
272
273 _attachInfo(map, true);
274 // TODO(sigmund): replace with the following: (bug #1660)
275 // map.getValues().forEach(_dispatch);
276 map.getValues().forEach((e) => _dispatch(e));
277 }
278
279 visitBufferingSendPort(_BufferingSendPort port) {
280 if (port._port == null) {
281 ports.add(port._futurePort);
282 }
283 }
284 }
OLDNEW
« no previous file with comments | « lib/isolate/frog/natives.js ('k') | lib/isolate/isolate_api.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698