OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 /** Common functionality to all send ports. */ |
| 6 class _BaseSendPort implements SendPort { |
| 7 /** Id for the destination isolate. */ |
| 8 final int _isolateId; |
| 9 |
| 10 _BaseSendPort(this._isolateId); |
| 11 |
| 12 _ReceivePortSingleShotImpl call(var message) { |
| 13 final result = new _ReceivePortSingleShotImpl(); |
| 14 this.send(message, result.toSendPort()); |
| 15 return result; |
| 16 } |
| 17 |
| 18 static void checkReplyTo(SendPort replyTo) { |
| 19 if (replyTo !== null |
| 20 && replyTo is! _NativeJsSendPort |
| 21 && replyTo is! _WorkerSendPort |
| 22 && replyTo is! _BufferingSendPort) { |
| 23 throw new Exception("SendPort.send: Illegal replyTo port type"); |
| 24 } |
| 25 } |
| 26 |
| 27 // TODO(sigmund): replace the current SendPort.call with the following: |
| 28 //Future call(var message) { |
| 29 // final completer = new Completer(); |
| 30 // final port = new _ReceivePort.singleShot(); |
| 31 // send(message, port.toSendPort()); |
| 32 // port.receive((value, ignoreReplyTo) { |
| 33 // if (value is Exception) { |
| 34 // completer.completeException(value); |
| 35 // } else { |
| 36 // completer.complete(value); |
| 37 // } |
| 38 // }); |
| 39 // return completer.future; |
| 40 //} |
| 41 |
| 42 abstract void send(var message, [SendPort replyTo]); |
| 43 abstract bool operator ==(var other); |
| 44 abstract int hashCode(); |
| 45 } |
| 46 |
| 47 /** A send port that delivers messages in-memory via native JavaScript calls. */ |
| 48 class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
| 49 final _ReceivePortImpl _receivePort; |
| 50 |
| 51 const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
| 52 |
| 53 void send(var message, [SendPort replyTo = null]) { |
| 54 _waitForPendingPorts([message, replyTo], () { |
| 55 checkReplyTo(replyTo); |
| 56 // Check that the isolate still runs and the port is still open |
| 57 final isolate = globalState.isolates[_isolateId]; |
| 58 if (isolate == null) return; |
| 59 if (_receivePort._callback == null) return; |
| 60 |
| 61 // We force serialization/deserialization as a simple way to ensure |
| 62 // isolate communication restrictions are respected between isolates that |
| 63 // live in the same worker. [_NativeJsSendPort] delivers both messages |
| 64 // from the same worker and messages from other workers. In particular, |
| 65 // messages sent from a worker via a [_WorkerSendPort] are received at |
| 66 // [_processWorkerMessage] and forwarded to a native port. In such cases, |
| 67 // here we'll see [globalState.currentContext == null]. |
| 68 final shouldSerialize = globalState.currentContext != null |
| 69 && globalState.currentContext.id != _isolateId; |
| 70 var msg = message; |
| 71 var reply = replyTo; |
| 72 if (shouldSerialize) { |
| 73 msg = _serializeMessage(msg); |
| 74 reply = _serializeMessage(reply); |
| 75 } |
| 76 globalState.topEventLoop.enqueue(isolate, () { |
| 77 if (_receivePort._callback != null) { |
| 78 if (shouldSerialize) { |
| 79 msg = _deserializeMessage(msg); |
| 80 reply = _deserializeMessage(reply); |
| 81 } |
| 82 _receivePort._callback(msg, reply); |
| 83 } |
| 84 }, 'receive ' + message); |
| 85 }); |
| 86 } |
| 87 |
| 88 bool operator ==(var other) => (other is _NativeJsSendPort) && |
| 89 (_receivePort == other._receivePort); |
| 90 |
| 91 int hashCode() => _receivePort._id; |
| 92 } |
| 93 |
| 94 /** A send port that delivers messages via worker.postMessage. */ |
| 95 class _WorkerSendPort extends _BaseSendPort implements SendPort { |
| 96 final int _workerId; |
| 97 final int _receivePortId; |
| 98 |
| 99 const _WorkerSendPort(this._workerId, int isolateId, this._receivePortId) |
| 100 : super(isolateId); |
| 101 |
| 102 void send(var message, [SendPort replyTo = null]) { |
| 103 _waitForPendingPorts([message, replyTo], () { |
| 104 checkReplyTo(replyTo); |
| 105 final workerMessage = _serializeMessage({ |
| 106 'command': 'message', |
| 107 'port': this, |
| 108 'msg': message, |
| 109 'replyTo': replyTo}); |
| 110 |
| 111 if (globalState.isWorker) { |
| 112 // communication from one worker to another go through the main worker: |
| 113 globalState.mainWorker.postMessage(workerMessage); |
| 114 } else { |
| 115 globalState.workers[_workerId].postMessage(workerMessage); |
| 116 } |
| 117 }); |
| 118 } |
| 119 |
| 120 bool operator ==(var other) { |
| 121 return (other is _WorkerSendPort) && |
| 122 (_workerId == other._workerId) && |
| 123 (_isolateId == other._isolateId) && |
| 124 (_receivePortId == other._receivePortId); |
| 125 } |
| 126 |
| 127 int hashCode() { |
| 128 // TODO(sigmund): use a standard hash when we get one available in corelib. |
| 129 return (_workerId << 16) ^ (_isolateId << 8) ^ _receivePortId; |
| 130 } |
| 131 } |
| 132 |
| 133 /** A port that buffers messages until an underlying port gets resolved. */ |
| 134 class _BufferingSendPort extends _BaseSendPort implements SendPort { |
| 135 /** Internal counter to assign unique ids to each port. */ |
| 136 static int _idCount = 0; |
| 137 |
| 138 /** For implementing equals and hashcode. */ |
| 139 final int _id; |
| 140 |
| 141 /** Underlying port, when resolved. */ |
| 142 SendPort _port; |
| 143 |
| 144 /** |
| 145 * Future of the underlying port, so that we can detect when this port can be |
| 146 * sent on messages. |
| 147 */ |
| 148 Future<SendPort> _futurePort; |
| 149 |
| 150 /** Pending messages (and reply ports). */ |
| 151 List pending; |
| 152 |
| 153 _BufferingSendPort(isolateId, this._futurePort) |
| 154 : super(isolateId), _id = _idCount, pending = [] { |
| 155 _idCount++; |
| 156 _futurePort.then((p) { |
| 157 _port = p; |
| 158 for (final item in pending) { |
| 159 p.send(item['message'], item['replyTo']); |
| 160 } |
| 161 pending = null; |
| 162 }); |
| 163 } |
| 164 |
| 165 _BufferingSendPort.fromPort(isolateId, this._port) |
| 166 : super(isolateId), _id = _idCount { |
| 167 _idCount++; |
| 168 } |
| 169 |
| 170 void send(var message, [SendPort replyTo]) { |
| 171 if (_port != null) { |
| 172 _port.send(message, replyTo); |
| 173 } else { |
| 174 pending.add({'message': message, 'replyTo': replyTo}); |
| 175 } |
| 176 } |
| 177 |
| 178 bool operator ==(var other) => |
| 179 other is _BufferingSendPort && _id == other._id; |
| 180 int hashCode() => _id; |
| 181 } |
| 182 |
| 183 /** Default factory for receive ports. */ |
| 184 class _ReceivePortFactory { |
| 185 |
| 186 factory ReceivePort() { |
| 187 return new _ReceivePortImpl(); |
| 188 } |
| 189 |
| 190 factory ReceivePort.singleShot() { |
| 191 return new _ReceivePortSingleShotImpl(); |
| 192 } |
| 193 } |
| 194 |
| 195 /** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
| 196 class _ReceivePortImpl implements ReceivePort { |
| 197 int _id; |
| 198 Function _callback; |
| 199 static int _nextFreeId = 1; |
| 200 |
| 201 _ReceivePortImpl() |
| 202 : _id = _nextFreeId++ { |
| 203 globalState.currentContext.register(_id, this); |
| 204 } |
| 205 |
| 206 void receive(void onMessage(var message, SendPort replyTo)) { |
| 207 _callback = onMessage; |
| 208 } |
| 209 |
| 210 void close() { |
| 211 _callback = null; |
| 212 globalState.currentContext.unregister(_id); |
| 213 } |
| 214 |
| 215 SendPort toSendPort() { |
| 216 return new _NativeJsSendPort(this, globalState.currentContext.id); |
| 217 } |
| 218 } |
| 219 |
| 220 /** Implementation of a single-shot [ReceivePort]. */ |
| 221 class _ReceivePortSingleShotImpl implements ReceivePort { |
| 222 |
| 223 _ReceivePortSingleShotImpl() : _port = new _ReceivePortImpl() { } |
| 224 |
| 225 void receive(void callback(var message, SendPort replyTo)) { |
| 226 _port.receive((var message, SendPort replyTo) { |
| 227 _port.close(); |
| 228 callback(message, replyTo); |
| 229 }); |
| 230 } |
| 231 |
| 232 void close() { |
| 233 _port.close(); |
| 234 } |
| 235 |
| 236 SendPort toSendPort() => _port.toSendPort(); |
| 237 |
| 238 final _ReceivePortImpl _port; |
| 239 } |
| 240 |
| 241 /** Wait until all ports in a message are resolved. */ |
| 242 _waitForPendingPorts(var message, void callback()) { |
| 243 final finder = new _PendingSendPortFinder(); |
| 244 finder.traverse(message); |
| 245 Futures.wait(finder.ports).then((_) => callback()); |
| 246 } |
| 247 |
| 248 |
| 249 /** Visitor that finds all unresolved [SendPort]s in a message. */ |
| 250 class _PendingSendPortFinder extends _MessageTraverser { |
| 251 List<Future<SendPort>> ports; |
| 252 _PendingSendPortFinder() : super(), ports = []; |
| 253 |
| 254 visitPrimitive(x) {} |
| 255 visitNativeJsSendPort(_NativeJsSendPort port) {} |
| 256 visitWorkerSendPort(_WorkerSendPort port) {} |
| 257 visitReceivePort(_ReceivePortImpl port) {} |
| 258 visitReceivePortSingleShot(_ReceivePortSingleShotImpl port) {} |
| 259 |
| 260 visitList(List list) { |
| 261 final visited = _getInfo(list); |
| 262 if (visited !== null) return; |
| 263 _attachInfo(list, true); |
| 264 // TODO(sigmund): replace with the following: (bug #1660) |
| 265 // list.forEach(_dispatch); |
| 266 list.forEach((e) => _dispatch(e)); |
| 267 } |
| 268 |
| 269 visitMap(Map map) { |
| 270 final visited = _getInfo(map); |
| 271 if (visited !== null) return; |
| 272 |
| 273 _attachInfo(map, true); |
| 274 // TODO(sigmund): replace with the following: (bug #1660) |
| 275 // map.getValues().forEach(_dispatch); |
| 276 map.getValues().forEach((e) => _dispatch(e)); |
| 277 } |
| 278 |
| 279 visitBufferingSendPort(_BufferingSendPort port) { |
| 280 if (port._port == null) { |
| 281 ports.add(port._futurePort); |
| 282 } |
| 283 } |
| 284 } |
OLD | NEW |