Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(413)

Side by Side Diff: lib/isolate/frog/ports.dart

Issue 9422019: isolates refactor: this change introduces 'dart:isolate' as a library. This is a (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: '' Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 #library("isolate.frog.ports");
6
7 #import("dart:isolate");
8 #import("messages.dart", prefix: "messages");
9 #import("isolateimpl.dart");
10
11 /** Common functionality to all send ports. */
12 class BaseSendPort implements SendPort {
13 /** Id for the destination isolate. */
14 final int _isolateId;
15
16 BaseSendPort(this._isolateId);
17
18 ReceivePortSingleShotImpl call(var message) {
19 final result = new ReceivePortSingleShotImpl();
20 this.send(message, result.toSendPort());
21 return result;
22 }
23
24 static void checkReplyTo(SendPort replyTo) {
25 if (replyTo !== null
26 && replyTo is! NativeJsSendPort
27 && replyTo is! WorkerSendPort
28 && replyTo is! BufferingSendPort) {
29 throw new Exception("SendPort.send: Illegal replyTo port type");
30 }
31 }
32
33 // TODO(sigmund): replace the current SendPort.call with the following:
34 //Future call(var message) {
35 //  final completer = new Completer();
36 //  final port = new ReceivePort.singleShot();
37 //  send(message, port.toSendPort());
38 //  port.receive((value, ignoreReplyTo) {
39 //    if (value is Exception) {
40 //  completer.completeException(value);
41 // } else {
42 // completer.complete(value);
43 // }
44 // });
45 //  return completer.future;
46 //}
47
48 abstract void send(var message, [SendPort replyTo]);
49 abstract bool operator ==(var other);
50 abstract int hashCode();
51 }
52
53 /** A send port that delivers messages in-memory via native JavaScript calls. */
54 class NativeJsSendPort extends BaseSendPort implements SendPort {
55 final ReceivePortImpl _receivePort;
56
57 const NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
58
59 void send(var message, [SendPort replyTo = null]) {
60 _waitForPendingPorts([message, replyTo], () {
61 checkReplyTo(replyTo);
62 // Check that the isolate still runs and the port is still open
63 final isolate = globalState.isolates[_isolateId];
64 if (isolate == null) return;
65 if (_receivePort._callback == null) return;
66
67 // We force serialization/deserialization as a simple way to ensure
68 // isolate communication restrictions are respected between isolates that
69 // live in the same worker. NativeJsSendPort delivers both messages from
70 // the same worker and messages from other workers. In particular,
71 // messages sent from a worker via a WorkerSendPort are received at
72 // [_processWorkerMessage] and forwarded to a native port. In such cases,
73 // here we'll see [globalState.currentContext == null].
74 final shouldSerialize = globalState.currentContext != null
75 && globalState.currentContext.id != _isolateId;
76 var msg = message;
77 var reply = replyTo;
78 if (shouldSerialize) {
79 msg = messages.serialize(msg);
80 reply = messages.serialize(reply);
81 }
82 globalState.topEventLoop.enqueue(isolate, () {
83 if (_receivePort._callback != null) {
84 if (shouldSerialize) {
85 msg = messages.deserialize(msg);
86 reply = messages.deserialize(reply);
87 }
88 _receivePort._callback(msg, reply);
89 }
90 }, 'receive ' + message);
91 });
92 }
93
94 bool operator ==(var other) => (other is NativeJsSendPort) &&
95 (_receivePort == other._receivePort);
96
97 int hashCode() => _receivePort._id;
98 }
99
100 /** A send port that delivers messages via worker.postMessage. */
101 class WorkerSendPort extends BaseSendPort implements SendPort {
102 final int _workerId;
103 final int _receivePortId;
104
105 const WorkerSendPort(this._workerId, int isolateId, this._receivePortId)
106 : super(isolateId);
107
108 void send(var message, [SendPort replyTo = null]) {
109 _waitForPendingPorts([message, replyTo], () {
110 checkReplyTo(replyTo);
111 final workerMessage = messages.serialize({
112 'command': 'message',
113 'port': this,
114 'msg': message,
115 'replyTo': replyTo});
116
117 if (globalState.isWorker) {
118 // communication from one worker to another go through the main worker:
119 globalState.mainWorker.postMessage(workerMessage);
120 } else {
121 globalState.workers[_workerId].postMessage(workerMessage);
122 }
123 });
124 }
125
126 bool operator ==(var other) {
127 return (other is WorkerSendPort) &&
128 (_workerId == other._workerId) &&
129 (_isolateId == other._isolateId) &&
130 (_receivePortId == other._receivePortId);
131 }
132
133 int hashCode() {
134 // TODO(sigmund): use a standard hash when we get one available in corelib.
135 return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId;
136 }
137 }
138
139 /** A port that buffers messages until an underlying port gets resolved. */
140 class BufferingSendPort extends BaseSendPort implements SendPort {
141 /** Internal counter to assign unique ids to each port. */
142 static int _idCount = 0;
143
144 /** For implementing equals and hashcode. */
145 final int _id;
146
147 /** Underlying port, when resolved. */
148 SendPort _port;
149
150 /**
151 * Future of the underlying port, so that we can detect when this port can be
152 * sent on messages.
153 */
154 Future<SendPort> _futurePort;
155
156 /** Pending messages (and reply ports). */
157 List pending;
158
159 BufferingSendPort(isolateId, this._futurePort)
160 : super(isolateId), _id = _idCount, pending = [] {
161 _idCount++;
162 _futurePort.then((p) {
163 _port = p;
164 for (final item in pending) {
165 p.send(item['message'], item['replyTo']);
166 }
167 pending = null;
168 });
169 }
170
171 BufferingSendPort.fromPort(isolateId, this._port)
172 : super(isolateId), _id = _idCount {
173 _idCount++;
174 }
175
176 void send(var message, [SendPort replyTo]) {
177 if (_port != null) {
178 _port.send(message, replyTo);
179 } else {
180 pending.add({'message': message, 'replyTo': replyTo});
181 }
182 }
183
184 bool operator ==(var other) => other is BufferingSendPort && _id == other._id;
185 int hashCode() => _id;
186 }
187
188 /** Default factory for receive ports. */
189 class ReceivePortFactory {
190
191 factory ReceivePort() {
192 return new ReceivePortImpl();
193 }
194
195 factory ReceivePort.singleShot() {
196 return new ReceivePortSingleShotImpl();
197 }
198 }
199
200 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
201 class ReceivePortImpl implements ReceivePort {
202 int _id;
203 Function _callback;
204 static int _nextFreeId = 1;
205
206 ReceivePortImpl()
207 : _id = _nextFreeId++ {
208 globalState.currentContext.register(_id, this);
209 }
210
211 void receive(void onMessage(var message, SendPort replyTo)) {
212 _callback = onMessage;
213 }
214
215 void close() {
216 _callback = null;
217 globalState.currentContext.unregister(_id);
218 }
219
220 SendPort toSendPort() {
221 return new NativeJsSendPort(this, globalState.currentContext.id);
222 }
223 }
224
225 /** Implementation of a single-shot [ReceivePort]. */
226 class ReceivePortSingleShotImpl implements ReceivePort {
227
228 ReceivePortSingleShotImpl() : _port = new ReceivePortImpl() { }
229
230 void receive(void callback(var message, SendPort replyTo)) {
231 _port.receive((var message, SendPort replyTo) {
232 _port.close();
233 callback(message, replyTo);
234 });
235 }
236
237 void close() {
238 _port.close();
239 }
240
241 SendPort toSendPort() => _port.toSendPort();
242
243 final ReceivePortImpl _port;
244 }
245
246 /** Wait until all ports in a message are resolved. */
247 _waitForPendingPorts(var message, void callback()) {
248 final finder = new _PendingSendPortFinder();
249 finder.traverse(message);
250 Futures.wait(finder.ports).then((_) => callback());
251 }
252
253
254 /** Visitor that finds all unresolved [SendPort]s in a message. */
255 class _PendingSendPortFinder extends messages.MessageTraverser {
256 List<Future<SendPort>> ports;
257 _PendingSendPortFinder() : super(), ports = [];
258
259 visitPrimitive(x) {}
260 visitNativeJsSendPort(NativeJsSendPort port) {}
261 visitWorkerSendPort(WorkerSendPort port) {}
262 visitReceivePort(ReceivePortImpl port) {}
263 visitReceivePortSingleShot(ReceivePortSingleShotImpl port) {}
264
265 visitList(List list) {
266 final visited = _getInfo(list);
267 if (visited !== null) return;
268 _attachInfo(list, true);
269 // TODO(sigmund): replace with the following: (bug #1660)
270 // list.forEach(_dispatch);
271 list.forEach((e) => _dispatch(e));
272 }
273
274 visitMap(Map map) {
275 final visited = _getInfo(map);
276 if (visited !== null) return;
277
278 _attachInfo(map, true);
279 // TODO(sigmund): replace with the following: (bug #1660)
280 // map.getValues().forEach(_dispatch);
281 map.getValues().forEach((e) => _dispatch(e));
282 }
283
284 visitBufferingSendPort(BufferingSendPort port) {
285 if (port._port == null) {
286 ports.add(port._futurePort);
287 }
288 }
289 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698