Index: runtime/bin/stream_buffer.dart |
diff --git a/runtime/bin/stream_buffer.dart b/runtime/bin/stream_buffer.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5df701c93c2abceffa14b867fc842095e2f97ae1 |
--- /dev/null |
+++ b/runtime/bin/stream_buffer.dart |
@@ -0,0 +1,249 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+ |
+/** |
+ * A [StreamBuffer] is an unbounded buffer that is written to through the |
+ * [OutputStream] [toBuffer] and is read from the [InputStream] [fromBuffer]. |
+ */ |
+class StreamBuffer { |
+ StreamBuffer() |
Søren Gjesse
2012/08/10 07:58:10
I think the Dart style calls for four space indent
|
+ : _data = [], |
+ _closed = false, |
+ _destroyed = false, |
+ _available = 0, |
+ fromBuffer = new _StreamBufferInputStream(), |
+ toBuffer = new _StreamBufferOutputStream() { |
+ fromBuffer._buffer = this; |
+ toBuffer._buffer = this; |
nweiz
2012/08/09 22:03:45
Style nit: I think this should be intended only fo
|
+ } |
+ |
+ void destroy() { |
+ _closed = true; |
+ _closePending = true; |
+ _data.clear(); |
+ if (!_destroyed) { |
+ new Timer(0, toBuffer._doOnDestroy); |
+ } |
+ _destroyed = true; |
+ _available = 0; |
+ if (fromBuffer._onDataScheduledEvent != null) { |
+ fromBuffer._onDataScheduledEvent.cancel(); |
+ } |
+ if (fromBuffer._onClosedScheduledEvent != null) { |
+ fromBuffer._onClosedScheduledEvent.cancel(); |
nweiz
2012/08/09 22:03:45
As I mentioned in issue 4222, I don't understand w
Søren Gjesse
2012/08/10 07:58:10
Shouldn't it call onError? Destroy sounds like jus
nweiz
2012/09/04 23:50:31
That's not how InputStream#close or OutputStream#d
|
+ } |
+ } |
+ |
+ final _StreamBufferInputStream fromBuffer; |
+ final _StreamBufferOutputStream toBuffer; |
nweiz
2012/08/09 22:03:45
For tooling/documentation reasons, the types of th
|
+ final List<List<int>> _data; |
nweiz
2012/08/09 22:03:45
Would it be easier to make this a [_BufferList]? I
Søren Gjesse
2012/08/10 07:58:10
I agree _BufferList was made to fit this purpose.
|
+ bool _closePending; |
+ bool _closed; |
+ bool _destroyed; |
+ int _available; |
+} |
+ |
+class _StreamBufferInputStream implements InputStream { |
nweiz
2012/08/09 22:03:45
Why not extend [_BaseDataInputStream]?
|
+ int available() => _buffer._available; |
+ bool get closed() => _buffer._closed; |
nweiz
2012/08/09 22:03:45
I'm a little confused how this relates to what you
Bill Hesse
2012/09/04 23:17:13
When close() is called on the output stream, _clos
nweiz
2012/09/04 23:50:31
It seems to be that (for an InputStream) exactly o
|
+ |
+ void set onData(void callback()) { |
+ _onData = callback; |
+ _scheduleCallbacks(); |
+ } |
+ |
+ void set onClosed(void callback()) { |
+ _onClosed = callback; |
+ _scheduleCallbacks(); |
+ } |
+ |
+ void set onError(void callback(e)) { |
+ throw new StreamException( |
+ "Error events not supported on StreamBuffer"); |
nweiz
2012/08/09 22:03:45
It would be great if errors were supported (see is
|
+ } |
+ |
+ List<int> read([int len]) { |
+ if (closed || available() == 0) { |
+ return const []; |
+ } |
+ if (len == null || len > available()) { |
+ len = available(); |
+ } |
+ _buffer._available -= len; |
+ var data = _buffer._data; |
+ List<int> result; |
+ if (len == data[0].length) { |
+ result = data[0]; |
+ data.removeRange(0, 1); |
+ } else { |
+ result = new List<int>(len); |
+ int chunkIndex = 0; |
+ int written = 0; |
+ // Copy all the chunks except a possible partial last chunk. |
+ while (len - written > 0 && |
+ data[chunkIndex].length <= len - written) { |
+ result.setRange(written, data[chunkIndex].length, data[chunkIndex]); |
+ written += data[chunkIndex].length; |
+ ++chunkIndex; |
+ } |
+ if (len - written > 0) { |
+ // Copy the last partial chunk. |
+ var last = data[chunkIndex]; |
+ result.setRange(written, len - written, data[chunkIndex]); |
+ data[chunkIndex].removeRange(0, len - written); |
+ } |
+ data.removeRange(0, chunkIndex); |
+ } |
+ _scheduleCallbacks(); |
+ return result; |
+ } |
+ |
+ int readInto(List<int> buffer, [int offset = 0, int len]) { |
+ if (closed || available() == 0) { |
+ return 0; |
+ } |
+ if (len == null) len = buffer.length - offset; |
+ if (len > available()) { |
+ len = available(); |
+ } |
+ |
+ _buffer._available -= len; |
+ var data = _buffer._data; |
+ int pos = offset; |
+ int remaining = len; |
+ int chunkIndex = 0; |
+ int removedChunks = 0; |
+ while (remaining > 0) { |
+ var chunk = data[chunkIndex]; |
+ int bytes = chunk.length; |
+ if (remaining >= bytes) { |
+ buffer.setRange(pos, bytes, chunk); |
+ pos += bytes; |
+ remaining -= bytes; |
+ ++removedChunks; |
+ ++chunkIndex; |
+ } else { |
+ buffer.setRange(pos, remaining, chunk); |
+ chunk.removeRange(0, remaining); |
+ remaining = 0; |
+ } |
+ } |
+ data.removeRange(0, removedChunks); |
+ _scheduleCallbacks(); |
+ return len; |
+ } |
+ |
+ void close() { |
+ _buffer.destroy(); |
+ } |
+ |
+ void pipe(OutputStream output, [bool close]) { |
nweiz
2012/08/09 22:03:45
Don't you get this for free from [_pipe] in stream
|
+ // Not implemented yet. |
+ } |
+ |
+ void _scheduleCallbacks() { |
+ if (closed) return; |
+ if (available() > 0) { |
+ if (_onDataScheduledEvent == null) { |
+ _onDataScheduledEvent = new Timer(0, _doOnData); |
+ } |
+ } else { |
+ if (_onDataScheduledEvent != null) { |
+ _onDataScheduledEvent.cancel(); |
+ } |
+ if (_buffer._closePending && _onClosedScheduledEvent == null) { |
+ _onClosedScheduledEvent = new Timer(0, _doOnClosed); |
+ } |
nweiz
2012/08/09 22:03:45
Extra closing brace
Bill Hesse
2012/09/04 23:17:13
Actually, wrong indentation.
|
+ } |
+ } |
+ |
+ void _doOnData(_) { |
+ _onDataScheduledEvent = null; |
+ if (_onData != null) { |
+ _onData(); |
+ } |
+ } |
+ |
+ void _doOnClosed(_) { |
+ _buffer._closed = true; |
+ _onClosedScheduledEvent = null; |
+ if (_onClosed != null) { |
+ _onClosed(); |
+ } |
+ } |
+ |
+ StreamBuffer _buffer; |
+ Function _onData; |
+ Function _onClosed; |
+ Timer _onDataScheduledEvent; |
+ Timer _onClosedScheduledEvent; |
+ |
+ |
nweiz
2012/08/09 22:03:45
Remove empty lines
|
+} |
+ |
+class _StreamBufferOutputStream implements OutputStream { |
+ bool write(List<int> buffer, [bool copyBuffer = true]) { |
+ if (_buffer._destroyed) { |
+ throw new StreamException("Writing to a destroyed StreamBuffer"); |
nweiz
2012/08/09 22:03:45
Is it useful to distinguish this error from the st
|
+ } |
+ if (_buffer._closePending) { |
+ throw new StreamException.streamClosed(); |
+ } |
+ |
+ if (copyBuffer) { |
+ _buffer._data.add(new List<int>.from(buffer)); |
+ } else { |
+ _buffer._data.add(buffer); |
+ } |
+ _buffer._available += buffer.length; |
+ _buffer.fromBuffer._scheduleCallbacks(); |
+ return true; |
+ } |
+ |
+ bool writeFrom(List<int> buffer, [int offset, int len]) { |
+ if (offset == null) offset = 0; |
+ if (len > buffer.length - offset) len = buffer.length - offset; |
+ return write(buffer.getRange(offset, len), copyBuffer: false); |
+ } |
+ |
+ bool writeString(String string, [Encoding encoding]) { |
+ // Not implemented yet. |
+ return true; |
+ } |
+ |
+ void close() { |
+ _buffer._closePending = true; |
+ _buffer.fromBuffer._scheduleCallbacks(); |
+ } |
+ |
+ void destroy() { |
+ _buffer.destroy(); |
+ } |
+ |
+ void flush() { } |
+ |
+ void set onClosed(void callback()) { |
nweiz
2012/08/09 22:03:45
The behavior of this callback doesn't match the de
|
+ _onDestroy = callback; |
+ } |
+ |
+ void set onError(void callback(e)) { |
+ throw new StreamException( |
+ "Error events not supported on StreamBuffer"); |
+ } |
+ |
+ void set onNoPendingWrites(void callback()) { |
nweiz
2012/08/09 22:03:45
It seems like this should have some implementation
|
+ throw new StreamException( |
+ "onNoPendingWrites handlers not supported on StreamBuffer"); |
+ } |
+ |
+ void _doOnDestroy(_) { |
+ if (_onDestroy != null) { |
+ _onDestroy(); |
+ } |
+ } |
+ |
+ Function _onDestroy; |
+ StreamBuffer _buffer; |
+} |
nweiz
2012/08/09 22:03:45
Now that my [OutputStream.closed] CL is submitted,
|