Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(729)

Side by Side Diff: runtime/bin/socket_stream_impl.dart

Issue 10262031: Add a web socket client (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed review comments Created 8 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/http_parser.dart ('k') | runtime/bin/websocket.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 class _SocketInputStream implements SocketInputStream { 5 class _SocketInputStream implements SocketInputStream {
6 _SocketInputStream(Socket socket) : _socket = socket { 6 _SocketInputStream(Socket socket) : _socket = socket {
7 if (_socket._id == -1) _closed = true; 7 if (_socket._id == -1) _closed = true;
8 _socket.onClosed = _onClosed; 8 _socket.onClosed = _onClosed;
9 } 9 }
10 10
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
105 105
106 bool writeFrom(List<int> buffer, [int offset = 0, int len]) { 106 bool writeFrom(List<int> buffer, [int offset = 0, int len]) {
107 return _write( 107 return _write(
108 buffer, offset, (len == null) ? buffer.length - offset : len, true); 108 buffer, offset, (len == null) ? buffer.length - offset : len, true);
109 } 109 }
110 110
111 void close() { 111 void close() {
112 if (!_pendingWrites.isEmpty()) { 112 if (!_pendingWrites.isEmpty()) {
113 // Mark the socket for close when all data is written. 113 // Mark the socket for close when all data is written.
114 _closing = true; 114 _closing = true;
115 _socket._onWrite = _onWrite; 115 _setupWriteHander();
116 } else { 116 } else {
117 // Close the socket for writing. 117 // Close the socket for writing.
118 _socket._closeWrite(); 118 _socket._closeWrite();
119 _closed = true; 119 _closed = true;
120 } 120 }
121 } 121 }
122 122
123 void destroy() { 123 void destroy() {
124 _socket.onWrite = null; 124 _socket.onWrite = null;
125 _pendingWrites.clear(); 125 _pendingWrites.clear();
126 _socket.close(); 126 _socket.close();
127 _closed = true; 127 _closed = true;
128 } 128 }
129 129
130 void set onNoPendingWrites(void callback()) { 130 void set onNoPendingWrites(void callback()) {
131 if (_noPendingWritesTimer != null) {
132 _noPendingWritesTimer.cancel();
133 _noPendingWritesTimer = null;
134 }
131 _onNoPendingWrites = callback; 135 _onNoPendingWrites = callback;
132 if (_onNoPendingWrites != null) { 136 if (_onNoPendingWrites != null) {
133 _socket._onWrite = _onWrite; 137 if (_pendingWrites.isEmpty()) {
138 _noPendingWritesTimer = new Timer(0, (t) {
139 if (_onNoPendingWrites != null) _onNoPendingWrites();
140 });
141 } else {
142 _setupWriteHander();
143 }
134 } 144 }
135 } 145 }
136 146
137 bool _write(List<int> buffer, int offset, int len, bool copyBuffer) { 147 bool _write(List<int> buffer, int offset, int len, bool copyBuffer) {
138 if (_closing || _closed) throw new StreamException("Stream closed"); 148 if (_closing || _closed) throw new StreamException("Stream closed");
139 int bytesWritten = 0; 149 int bytesWritten = 0;
140 if (_pendingWrites.isEmpty()) { 150 if (_pendingWrites.isEmpty()) {
141 // If nothing is buffered write as much as possible and buffer 151 // If nothing is buffered write as much as possible and buffer
142 // the rest. 152 // the rest.
143 bytesWritten = _socket.writeList(buffer, offset, len); 153 bytesWritten = _socket.writeList(buffer, offset, len);
144 if (bytesWritten == len) return true; 154 if (bytesWritten == len) return true;
145 } 155 }
146 156
147 // Place remaining data on the pending writes queue. 157 // Place remaining data on the pending writes queue.
148 int notWrittenOffset = offset + bytesWritten; 158 int notWrittenOffset = offset + bytesWritten;
149 if (copyBuffer) { 159 if (copyBuffer) {
150 List<int> newBuffer = 160 List<int> newBuffer =
151 buffer.getRange(notWrittenOffset, len - bytesWritten); 161 buffer.getRange(notWrittenOffset, len - bytesWritten);
152 _pendingWrites.add(newBuffer); 162 _pendingWrites.add(newBuffer);
153 } else { 163 } else {
154 assert(offset + len == buffer.length); 164 assert(offset + len == buffer.length);
155 _pendingWrites.add(buffer, notWrittenOffset); 165 _pendingWrites.add(buffer, notWrittenOffset);
156 } 166 }
157 _socket._onWrite = _onWrite; 167 _setupWriteHander();
158 return false; 168 return false;
159 } 169 }
160 170
161 void _onWrite() { 171 void _onWrite() {
162 // Write as much buffered data to the socket as possible. 172 // Write as much buffered data to the socket as possible.
163 while (!_pendingWrites.isEmpty()) { 173 while (!_pendingWrites.isEmpty()) {
164 List<int> buffer = _pendingWrites.first; 174 List<int> buffer = _pendingWrites.first;
165 int offset = _pendingWrites.index; 175 int offset = _pendingWrites.index;
166 int bytesToWrite = buffer.length - offset; 176 int bytesToWrite = buffer.length - offset;
167 int bytesWritten = _socket.writeList(buffer, offset, bytesToWrite); 177 int bytesWritten = _socket.writeList(buffer, offset, bytesToWrite);
(...skipping 11 matching lines...) Expand all
179 } else { 189 } else {
180 if (_onNoPendingWrites != null) _onNoPendingWrites(); 190 if (_onNoPendingWrites != null) _onNoPendingWrites();
181 } 191 }
182 if (_onNoPendingWrites == null) { 192 if (_onNoPendingWrites == null) {
183 _socket._onWrite = null; 193 _socket._onWrite = null;
184 } else { 194 } else {
185 _socket._onWrite = _onWrite; 195 _socket._onWrite = _onWrite;
186 } 196 }
187 } 197 }
188 198
199 void _setupWriteHander() {
200 // Set up the callback for writing the pending data as the
201 // underlying socket becomes ready for writing.
202 if (_noPendingWritesTimer != null) {
203 _noPendingWritesTimer.cancel();
204 _noPendingWritesTimer = null;
205 }
206 _socket._onWrite = _onWrite;
207 }
208
189 bool _onSocketError(e) { 209 bool _onSocketError(e) {
190 close(); 210 close();
191 if (_onError != null) { 211 if (_onError != null) {
192 _onError(e); 212 _onError(e);
193 return true; 213 return true;
194 } else { 214 } else {
195 return false; 215 return false;
196 } 216 }
197 } 217 }
198 218
199 Socket _socket; 219 Socket _socket;
200 _BufferList _pendingWrites; 220 _BufferList _pendingWrites;
201 var _onNoPendingWrites; 221 Function _onNoPendingWrites;
222 Timer _noPendingWritesTimer;
202 bool _closing = false; 223 bool _closing = false;
203 bool _closed = false; 224 bool _closed = false;
204 } 225 }
OLDNEW
« no previous file with comments | « runtime/bin/http_parser.dart ('k') | runtime/bin/websocket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698