Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(109)

Side by Side Diff: runtime/bin/stream_buffer.dart

Issue 10836177: Add StreamBuffer to dart:io. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/standalone/io/stream_buffer_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5
6 /**
7 * A [StreamBuffer] is an unbounded buffer that is written to through the
8 * [OutputStream] [toBuffer] and is read from the [InputStream] [fromBuffer].
9 */
10 class StreamBuffer {
11 StreamBuffer()
Søren Gjesse 2012/08/10 07:58:10 I think the Dart style calls for four space indent
12 : _data = [],
13 _closed = false,
14 _destroyed = false,
15 _available = 0,
16 fromBuffer = new _StreamBufferInputStream(),
17 toBuffer = new _StreamBufferOutputStream() {
18 fromBuffer._buffer = this;
19 toBuffer._buffer = this;
nweiz 2012/08/09 22:03:45 Style nit: I think this should be intended only fo
20 }
21
22 void destroy() {
23 _closed = true;
24 _closePending = true;
25 _data.clear();
26 if (!_destroyed) {
27 new Timer(0, toBuffer._doOnDestroy);
28 }
29 _destroyed = true;
30 _available = 0;
31 if (fromBuffer._onDataScheduledEvent != null) {
32 fromBuffer._onDataScheduledEvent.cancel();
33 }
34 if (fromBuffer._onClosedScheduledEvent != null) {
35 fromBuffer._onClosedScheduledEvent.cancel();
nweiz 2012/08/09 22:03:45 As I mentioned in issue 4222, I don't understand w
Søren Gjesse 2012/08/10 07:58:10 Shouldn't it call onError? Destroy sounds like jus
nweiz 2012/09/04 23:50:31 That's not how InputStream#close or OutputStream#d
36 }
37 }
38
39 final _StreamBufferInputStream fromBuffer;
40 final _StreamBufferOutputStream toBuffer;
nweiz 2012/08/09 22:03:45 For tooling/documentation reasons, the types of th
41 final List<List<int>> _data;
nweiz 2012/08/09 22:03:45 Would it be easier to make this a [_BufferList]? I
Søren Gjesse 2012/08/10 07:58:10 I agree _BufferList was made to fit this purpose.
42 bool _closePending;
43 bool _closed;
44 bool _destroyed;
45 int _available;
46 }
47
48 class _StreamBufferInputStream implements InputStream {
nweiz 2012/08/09 22:03:45 Why not extend [_BaseDataInputStream]?
49 int available() => _buffer._available;
50 bool get closed() => _buffer._closed;
nweiz 2012/08/09 22:03:45 I'm a little confused how this relates to what you
Bill Hesse 2012/09/04 23:17:13 When close() is called on the output stream, _clos
nweiz 2012/09/04 23:50:31 It seems to be that (for an InputStream) exactly o
51
52 void set onData(void callback()) {
53 _onData = callback;
54 _scheduleCallbacks();
55 }
56
57 void set onClosed(void callback()) {
58 _onClosed = callback;
59 _scheduleCallbacks();
60 }
61
62 void set onError(void callback(e)) {
63 throw new StreamException(
64 "Error events not supported on StreamBuffer");
nweiz 2012/08/09 22:03:45 It would be great if errors were supported (see is
65 }
66
67 List<int> read([int len]) {
68 if (closed || available() == 0) {
69 return const [];
70 }
71 if (len == null || len > available()) {
72 len = available();
73 }
74 _buffer._available -= len;
75 var data = _buffer._data;
76 List<int> result;
77 if (len == data[0].length) {
78 result = data[0];
79 data.removeRange(0, 1);
80 } else {
81 result = new List<int>(len);
82 int chunkIndex = 0;
83 int written = 0;
84 // Copy all the chunks except a possible partial last chunk.
85 while (len - written > 0 &&
86 data[chunkIndex].length <= len - written) {
87 result.setRange(written, data[chunkIndex].length, data[chunkIndex]);
88 written += data[chunkIndex].length;
89 ++chunkIndex;
90 }
91 if (len - written > 0) {
92 // Copy the last partial chunk.
93 var last = data[chunkIndex];
94 result.setRange(written, len - written, data[chunkIndex]);
95 data[chunkIndex].removeRange(0, len - written);
96 }
97 data.removeRange(0, chunkIndex);
98 }
99 _scheduleCallbacks();
100 return result;
101 }
102
103 int readInto(List<int> buffer, [int offset = 0, int len]) {
104 if (closed || available() == 0) {
105 return 0;
106 }
107 if (len == null) len = buffer.length - offset;
108 if (len > available()) {
109 len = available();
110 }
111
112 _buffer._available -= len;
113 var data = _buffer._data;
114 int pos = offset;
115 int remaining = len;
116 int chunkIndex = 0;
117 int removedChunks = 0;
118 while (remaining > 0) {
119 var chunk = data[chunkIndex];
120 int bytes = chunk.length;
121 if (remaining >= bytes) {
122 buffer.setRange(pos, bytes, chunk);
123 pos += bytes;
124 remaining -= bytes;
125 ++removedChunks;
126 ++chunkIndex;
127 } else {
128 buffer.setRange(pos, remaining, chunk);
129 chunk.removeRange(0, remaining);
130 remaining = 0;
131 }
132 }
133 data.removeRange(0, removedChunks);
134 _scheduleCallbacks();
135 return len;
136 }
137
138 void close() {
139 _buffer.destroy();
140 }
141
142 void pipe(OutputStream output, [bool close]) {
nweiz 2012/08/09 22:03:45 Don't you get this for free from [_pipe] in stream
143 // Not implemented yet.
144 }
145
146 void _scheduleCallbacks() {
147 if (closed) return;
148 if (available() > 0) {
149 if (_onDataScheduledEvent == null) {
150 _onDataScheduledEvent = new Timer(0, _doOnData);
151 }
152 } else {
153 if (_onDataScheduledEvent != null) {
154 _onDataScheduledEvent.cancel();
155 }
156 if (_buffer._closePending && _onClosedScheduledEvent == null) {
157 _onClosedScheduledEvent = new Timer(0, _doOnClosed);
158 }
nweiz 2012/08/09 22:03:45 Extra closing brace
Bill Hesse 2012/09/04 23:17:13 Actually, wrong indentation.
159 }
160 }
161
162 void _doOnData(_) {
163 _onDataScheduledEvent = null;
164 if (_onData != null) {
165 _onData();
166 }
167 }
168
169 void _doOnClosed(_) {
170 _buffer._closed = true;
171 _onClosedScheduledEvent = null;
172 if (_onClosed != null) {
173 _onClosed();
174 }
175 }
176
177 StreamBuffer _buffer;
178 Function _onData;
179 Function _onClosed;
180 Timer _onDataScheduledEvent;
181 Timer _onClosedScheduledEvent;
182
183
nweiz 2012/08/09 22:03:45 Remove empty lines
184 }
185
186 class _StreamBufferOutputStream implements OutputStream {
187 bool write(List<int> buffer, [bool copyBuffer = true]) {
188 if (_buffer._destroyed) {
189 throw new StreamException("Writing to a destroyed StreamBuffer");
nweiz 2012/08/09 22:03:45 Is it useful to distinguish this error from the st
190 }
191 if (_buffer._closePending) {
192 throw new StreamException.streamClosed();
193 }
194
195 if (copyBuffer) {
196 _buffer._data.add(new List<int>.from(buffer));
197 } else {
198 _buffer._data.add(buffer);
199 }
200 _buffer._available += buffer.length;
201 _buffer.fromBuffer._scheduleCallbacks();
202 return true;
203 }
204
205 bool writeFrom(List<int> buffer, [int offset, int len]) {
206 if (offset == null) offset = 0;
207 if (len > buffer.length - offset) len = buffer.length - offset;
208 return write(buffer.getRange(offset, len), copyBuffer: false);
209 }
210
211 bool writeString(String string, [Encoding encoding]) {
212 // Not implemented yet.
213 return true;
214 }
215
216 void close() {
217 _buffer._closePending = true;
218 _buffer.fromBuffer._scheduleCallbacks();
219 }
220
221 void destroy() {
222 _buffer.destroy();
223 }
224
225 void flush() { }
226
227 void set onClosed(void callback()) {
nweiz 2012/08/09 22:03:45 The behavior of this callback doesn't match the de
228 _onDestroy = callback;
229 }
230
231 void set onError(void callback(e)) {
232 throw new StreamException(
233 "Error events not supported on StreamBuffer");
234 }
235
236 void set onNoPendingWrites(void callback()) {
nweiz 2012/08/09 22:03:45 It seems like this should have some implementation
237 throw new StreamException(
238 "onNoPendingWrites handlers not supported on StreamBuffer");
239 }
240
241 void _doOnDestroy(_) {
242 if (_onDestroy != null) {
243 _onDestroy();
244 }
245 }
246
247 Function _onDestroy;
248 StreamBuffer _buffer;
249 }
nweiz 2012/08/09 22:03:45 Now that my [OutputStream.closed] CL is submitted,
OLDNEW
« no previous file with comments | « no previous file | tests/standalone/io/stream_buffer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698