OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 #library("isolate.frog.ports"); |
| 6 |
| 7 #import("dart:isolate"); |
| 8 #import("messages.dart", prefix: "messages"); |
| 9 #import("isolateimpl.dart"); |
| 10 |
| 11 /** Common functionality to all send ports. */ |
| 12 class BaseSendPort implements SendPort { |
| 13 /** Id for the destination isolate. */ |
| 14 final int _isolateId; |
| 15 |
| 16 BaseSendPort(this._isolateId); |
| 17 |
| 18 ReceivePortSingleShotImpl call(var message) { |
| 19 final result = new ReceivePortSingleShotImpl(); |
| 20 this.send(message, result.toSendPort()); |
| 21 return result; |
| 22 } |
| 23 |
| 24 static void checkReplyTo(SendPort replyTo) { |
| 25 if (replyTo !== null |
| 26 && replyTo is! NativeJsSendPort |
| 27 && replyTo is! WorkerSendPort |
| 28 && replyTo is! BufferingSendPort) { |
| 29 throw new Exception("SendPort.send: Illegal replyTo port type"); |
| 30 } |
| 31 } |
| 32 |
| 33 // TODO(sigmund): replace the current SendPort.call with the following: |
| 34 //Future call(var message) { |
| 35 // final completer = new Completer(); |
| 36 // final port = new ReceivePort.singleShot(); |
| 37 // send(message, port.toSendPort()); |
| 38 // port.receive((value, ignoreReplyTo) { |
| 39 // if (value is Exception) { |
| 40 // completer.completeException(value); |
| 41 // } else { |
| 42 // completer.complete(value); |
| 43 // } |
| 44 // }); |
| 45 // return completer.future; |
| 46 //} |
| 47 |
| 48 abstract void send(var message, [SendPort replyTo]); |
| 49 abstract bool operator ==(var other); |
| 50 abstract int hashCode(); |
| 51 } |
| 52 |
| 53 /** A send port that delivers messages in-memory via native JavaScript calls. */ |
| 54 class NativeJsSendPort extends BaseSendPort implements SendPort { |
| 55 final ReceivePortImpl _receivePort; |
| 56 |
| 57 const NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
| 58 |
| 59 void send(var message, [SendPort replyTo = null]) { |
| 60 _waitForPendingPorts([message, replyTo], () { |
| 61 checkReplyTo(replyTo); |
| 62 // Check that the isolate still runs and the port is still open |
| 63 final isolate = globalState.isolates[_isolateId]; |
| 64 if (isolate == null) return; |
| 65 if (_receivePort._callback == null) return; |
| 66 |
| 67 // We force serialization/deserialization as a simple way to ensure |
| 68 // isolate communication restrictions are respected between isolates that |
| 69 // live in the same worker. NativeJsSendPort delivers both messages from |
| 70 // the same worker and messages from other workers. In particular, |
| 71 // messages sent from a worker via a WorkerSendPort are received at |
| 72 // [_processWorkerMessage] and forwarded to a native port. In such cases, |
| 73 // here we'll see [globalState.currentContext == null]. |
| 74 final shouldSerialize = globalState.currentContext != null |
| 75 && globalState.currentContext.id != _isolateId; |
| 76 var msg = message; |
| 77 var reply = replyTo; |
| 78 if (shouldSerialize) { |
| 79 msg = messages.serialize(msg); |
| 80 reply = messages.serialize(reply); |
| 81 } |
| 82 globalState.topEventLoop.enqueue(isolate, () { |
| 83 if (_receivePort._callback != null) { |
| 84 if (shouldSerialize) { |
| 85 msg = messages.deserialize(msg); |
| 86 reply = messages.deserialize(reply); |
| 87 } |
| 88 _receivePort._callback(msg, reply); |
| 89 } |
| 90 }, 'receive ' + message); |
| 91 }); |
| 92 } |
| 93 |
| 94 bool operator ==(var other) => (other is NativeJsSendPort) && |
| 95 (_receivePort == other._receivePort); |
| 96 |
| 97 int hashCode() => _receivePort._id; |
| 98 } |
| 99 |
| 100 /** A send port that delivers messages via worker.postMessage. */ |
| 101 class WorkerSendPort extends BaseSendPort implements SendPort { |
| 102 final int _workerId; |
| 103 final int _receivePortId; |
| 104 |
| 105 const WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
| 106 : super(isolateId); |
| 107 |
| 108 void send(var message, [SendPort replyTo = null]) { |
| 109 _waitForPendingPorts([message, replyTo], () { |
| 110 checkReplyTo(replyTo); |
| 111 final workerMessage = messages.serialize({ |
| 112 'command': 'message', |
| 113 'port': this, |
| 114 'msg': message, |
| 115 'replyTo': replyTo}); |
| 116 |
| 117 if (globalState.isWorker) { |
| 118 // communication from one worker to another go through the main worker: |
| 119 globalState.mainWorker.postMessage(workerMessage); |
| 120 } else { |
| 121 globalState.workers[_workerId].postMessage(workerMessage); |
| 122 } |
| 123 }); |
| 124 } |
| 125 |
| 126 bool operator ==(var other) { |
| 127 return (other is WorkerSendPort) && |
| 128 (_workerId == other._workerId) && |
| 129 (_isolateId == other._isolateId) && |
| 130 (_receivePortId == other._receivePortId); |
| 131 } |
| 132 |
| 133 int hashCode() { |
| 134 // TODO(sigmund): use a standard hash when we get one available in corelib. |
| 135 return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId; |
| 136 } |
| 137 } |
| 138 |
| 139 /** A port that buffers messages until an underlying port gets resolved. */ |
| 140 class BufferingSendPort extends BaseSendPort implements SendPort { |
| 141 /** Internal counter to assign unique ids to each port. */ |
| 142 static int _idCount = 0; |
| 143 |
| 144 /** For implementing equals and hashcode. */ |
| 145 final int _id; |
| 146 |
| 147 /** Underlying port, when resolved. */ |
| 148 SendPort _port; |
| 149 |
| 150 /** |
| 151 * Future of the underlying port, so that we can detect when this port can be |
| 152 * sent on messages. |
| 153 */ |
| 154 Future<SendPort> _futurePort; |
| 155 |
| 156 /** Pending messages (and reply ports). */ |
| 157 List pending; |
| 158 |
| 159 BufferingSendPort(isolateId, this._futurePort) |
| 160 : super(isolateId), _id = _idCount, pending = [] { |
| 161 _idCount++; |
| 162 _futurePort.then((p) { |
| 163 _port = p; |
| 164 for (final item in pending) { |
| 165 p.send(item['message'], item['replyTo']); |
| 166 } |
| 167 pending = null; |
| 168 }); |
| 169 } |
| 170 |
| 171 BufferingSendPort.fromPort(isolateId, this._port) |
| 172 : super(isolateId), _id = _idCount { |
| 173 _idCount++; |
| 174 } |
| 175 |
| 176 void send(var message, [SendPort replyTo]) { |
| 177 if (_port != null) { |
| 178 _port.send(message, replyTo); |
| 179 } else { |
| 180 pending.add({'message': message, 'replyTo': replyTo}); |
| 181 } |
| 182 } |
| 183 |
| 184 bool operator ==(var other) => other is BufferingSendPort && _id == other._id; |
| 185 int hashCode() => _id; |
| 186 } |
| 187 |
| 188 /** Default factory for receive ports. */ |
| 189 class ReceivePortFactory { |
| 190 |
| 191 factory ReceivePort() { |
| 192 return new ReceivePortImpl(); |
| 193 } |
| 194 |
| 195 factory ReceivePort.singleShot() { |
| 196 return new ReceivePortSingleShotImpl(); |
| 197 } |
| 198 } |
| 199 |
| 200 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
| 201 class ReceivePortImpl implements ReceivePort { |
| 202 int _id; |
| 203 Function _callback; |
| 204 static int _nextFreeId = 1; |
| 205 |
| 206 ReceivePortImpl() |
| 207 : _id = _nextFreeId++ { |
| 208 globalState.currentContext.register(_id, this); |
| 209 } |
| 210 |
| 211 void receive(void onMessage(var message, SendPort replyTo)) { |
| 212 _callback = onMessage; |
| 213 } |
| 214 |
| 215 void close() { |
| 216 _callback = null; |
| 217 globalState.currentContext.unregister(_id); |
| 218 } |
| 219 |
| 220 SendPort toSendPort() { |
| 221 return new NativeJsSendPort(this, globalState.currentContext.id); |
| 222 } |
| 223 } |
| 224 |
| 225 /** Implementation of a single-shot [ReceivePort]. */ |
| 226 class ReceivePortSingleShotImpl implements ReceivePort { |
| 227 |
| 228 ReceivePortSingleShotImpl() : _port = new ReceivePortImpl() { } |
| 229 |
| 230 void receive(void callback(var message, SendPort replyTo)) { |
| 231 _port.receive((var message, SendPort replyTo) { |
| 232 _port.close(); |
| 233 callback(message, replyTo); |
| 234 }); |
| 235 } |
| 236 |
| 237 void close() { |
| 238 _port.close(); |
| 239 } |
| 240 |
| 241 SendPort toSendPort() => _port.toSendPort(); |
| 242 |
| 243 final ReceivePortImpl _port; |
| 244 } |
| 245 |
| 246 /** Wait until all ports in a message are resolved. */ |
| 247 _waitForPendingPorts(var message, void callback()) { |
| 248 final finder = new _PendingSendPortFinder(); |
| 249 finder.traverse(message); |
| 250 Futures.wait(finder.ports).then((_) => callback()); |
| 251 } |
| 252 |
| 253 |
| 254 /** Visitor that finds all unresolved [SendPort]s in a message. */ |
| 255 class _PendingSendPortFinder extends messages.MessageTraverser { |
| 256 List<Future<SendPort>> ports; |
| 257 _PendingSendPortFinder() : super(), ports = []; |
| 258 |
| 259 visitPrimitive(x) {} |
| 260 visitNativeJsSendPort(NativeJsSendPort port) {} |
| 261 visitWorkerSendPort(WorkerSendPort port) {} |
| 262 visitReceivePort(ReceivePortImpl port) {} |
| 263 visitReceivePortSingleShot(ReceivePortSingleShotImpl port) {} |
| 264 |
| 265 visitList(List list) { |
| 266 final visited = _getInfo(list); |
| 267 if (visited !== null) return; |
| 268 _attachInfo(list, true); |
| 269 // TODO(sigmund): replace with the following: (bug #1660) |
| 270 // list.forEach(_dispatch); |
| 271 list.forEach((e) => _dispatch(e)); |
| 272 } |
| 273 |
| 274 visitMap(Map map) { |
| 275 final visited = _getInfo(map); |
| 276 if (visited !== null) return; |
| 277 |
| 278 _attachInfo(map, true); |
| 279 // TODO(sigmund): replace with the following: (bug #1660) |
| 280 // map.getValues().forEach(_dispatch); |
| 281 map.getValues().forEach((e) => _dispatch(e)); |
| 282 } |
| 283 |
| 284 visitBufferingSendPort(BufferingSendPort port) { |
| 285 if (port._port == null) { |
| 286 ports.add(port._futurePort); |
| 287 } |
| 288 } |
| 289 } |
OLD | NEW |