Index: runtime/bin/stream_buffer.dart |
diff --git a/runtime/bin/stream_buffer.dart b/runtime/bin/stream_buffer.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5df701c93c2abceffa14b867fc842095e2f97ae1 |
--- /dev/null |
+++ b/runtime/bin/stream_buffer.dart |
@@ -0,0 +1,249 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+ |
+/** |
+ * A [StreamBuffer] is an unbounded buffer that is written to through the |
+ * [OutputStream] [toBuffer] and is read from the [InputStream] [fromBuffer]. |
+ */ |
+class StreamBuffer { |
+ StreamBuffer() |
+ : _data = [], |
+ _closed = false, |
+ _destroyed = false, |
+ _available = 0, |
+ fromBuffer = new _StreamBufferInputStream(), |
+ toBuffer = new _StreamBufferOutputStream() { |
+ fromBuffer._buffer = this; |
+ toBuffer._buffer = this; |
+ } |
+ |
+ void destroy() { |
+ _closed = true; |
+ _closePending = true; |
+ _data.clear(); |
+ if (!_destroyed) { |
+ new Timer(0, toBuffer._doOnDestroy); |
+ } |
+ _destroyed = true; |
+ _available = 0; |
+ if (fromBuffer._onDataScheduledEvent != null) { |
+ fromBuffer._onDataScheduledEvent.cancel(); |
+ } |
+ if (fromBuffer._onClosedScheduledEvent != null) { |
+ fromBuffer._onClosedScheduledEvent.cancel(); |
+ } |
+ } |
+ |
+ final _StreamBufferInputStream fromBuffer; |
+ final _StreamBufferOutputStream toBuffer; |
+ final List<List<int>> _data; |
+ bool _closePending; |
+ bool _closed; |
+ bool _destroyed; |
+ int _available; |
+} |
+ |
+class _StreamBufferInputStream implements InputStream { |
+ int available() => _buffer._available; |
+ bool get closed() => _buffer._closed; |
+ |
+ void set onData(void callback()) { |
+ _onData = callback; |
+ _scheduleCallbacks(); |
+ } |
+ |
+ void set onClosed(void callback()) { |
+ _onClosed = callback; |
+ _scheduleCallbacks(); |
+ } |
+ |
+ void set onError(void callback(e)) { |
+ throw new StreamException( |
+ "Error events not supported on StreamBuffer"); |
+ } |
+ |
+ List<int> read([int len]) { |
+ if (closed || available() == 0) { |
+ return const []; |
+ } |
+ if (len == null || len > available()) { |
+ len = available(); |
+ } |
+ _buffer._available -= len; |
+ var data = _buffer._data; |
+ List<int> result; |
+ if (len == data[0].length) { |
+ result = data[0]; |
+ data.removeRange(0, 1); |
+ } else { |
+ result = new List<int>(len); |
+ int chunkIndex = 0; |
+ int written = 0; |
+ // Copy all the chunks except a possible partial last chunk. |
+ while (len - written > 0 && |
+ data[chunkIndex].length <= len - written) { |
+ result.setRange(written, data[chunkIndex].length, data[chunkIndex]); |
+ written += data[chunkIndex].length; |
+ ++chunkIndex; |
+ } |
+ if (len - written > 0) { |
+ // Copy the last partial chunk. |
+ var last = data[chunkIndex]; |
+ result.setRange(written, len - written, data[chunkIndex]); |
+ data[chunkIndex].removeRange(0, len - written); |
+ } |
+ data.removeRange(0, chunkIndex); |
+ } |
+ _scheduleCallbacks(); |
+ return result; |
+ } |
+ |
+ int readInto(List<int> buffer, [int offset = 0, int len]) { |
+ if (closed || available() == 0) { |
+ return 0; |
+ } |
+ if (len == null) len = buffer.length - offset; |
+ if (len > available()) { |
+ len = available(); |
+ } |
+ |
+ _buffer._available -= len; |
+ var data = _buffer._data; |
+ int pos = offset; |
+ int remaining = len; |
+ int chunkIndex = 0; |
+ int removedChunks = 0; |
+ while (remaining > 0) { |
+ var chunk = data[chunkIndex]; |
+ int bytes = chunk.length; |
+ if (remaining >= bytes) { |
+ buffer.setRange(pos, bytes, chunk); |
+ pos += bytes; |
+ remaining -= bytes; |
+ ++removedChunks; |
+ ++chunkIndex; |
+ } else { |
+ buffer.setRange(pos, remaining, chunk); |
+ chunk.removeRange(0, remaining); |
+ remaining = 0; |
+ } |
+ } |
+ data.removeRange(0, removedChunks); |
+ _scheduleCallbacks(); |
+ return len; |
+ } |
+ |
+ void close() { |
+ _buffer.destroy(); |
+ } |
+ |
+ void pipe(OutputStream output, [bool close]) { |
+ // Not implemented yet. |
+ } |
+ |
+ void _scheduleCallbacks() { |
+ if (closed) return; |
+ if (available() > 0) { |
+ if (_onDataScheduledEvent == null) { |
+ _onDataScheduledEvent = new Timer(0, _doOnData); |
+ } |
+ } else { |
+ if (_onDataScheduledEvent != null) { |
+ _onDataScheduledEvent.cancel(); |
+ } |
+ if (_buffer._closePending && _onClosedScheduledEvent == null) { |
+ _onClosedScheduledEvent = new Timer(0, _doOnClosed); |
+ } |
+ } |
+ } |
+ |
+ void _doOnData(_) { |
+ _onDataScheduledEvent = null; |
+ if (_onData != null) { |
+ _onData(); |
+ } |
+ } |
+ |
+ void _doOnClosed(_) { |
+ _buffer._closed = true; |
+ _onClosedScheduledEvent = null; |
+ if (_onClosed != null) { |
+ _onClosed(); |
+ } |
+ } |
+ |
+ StreamBuffer _buffer; |
+ Function _onData; |
+ Function _onClosed; |
+ Timer _onDataScheduledEvent; |
+ Timer _onClosedScheduledEvent; |
+ |
+ |
+} |
+ |
+class _StreamBufferOutputStream implements OutputStream { |
+ bool write(List<int> buffer, [bool copyBuffer = true]) { |
+ if (_buffer._destroyed) { |
+ throw new StreamException("Writing to a destroyed StreamBuffer"); |
+ } |
+ if (_buffer._closePending) { |
+ throw new StreamException.streamClosed(); |
+ } |
+ |
+ if (copyBuffer) { |
+ _buffer._data.add(new List<int>.from(buffer)); |
+ } else { |
+ _buffer._data.add(buffer); |
+ } |
+ _buffer._available += buffer.length; |
+ _buffer.fromBuffer._scheduleCallbacks(); |
+ return true; |
+ } |
+ |
+ bool writeFrom(List<int> buffer, [int offset, int len]) { |
+ if (offset == null) offset = 0; |
+ if (len > buffer.length - offset) len = buffer.length - offset; |
+ return write(buffer.getRange(offset, len), copyBuffer: false); |
+ } |
+ |
+ bool writeString(String string, [Encoding encoding]) { |
+ // Not implemented yet. |
+ return true; |
+ } |
+ |
+ void close() { |
+ _buffer._closePending = true; |
+ _buffer.fromBuffer._scheduleCallbacks(); |
+ } |
+ |
+ void destroy() { |
+ _buffer.destroy(); |
+ } |
+ |
+ void flush() { } |
+ |
+ void set onClosed(void callback()) { |
+ _onDestroy = callback; |
+ } |
+ |
+ void set onError(void callback(e)) { |
+ throw new StreamException( |
+ "Error events not supported on StreamBuffer"); |
+ } |
+ |
+ void set onNoPendingWrites(void callback()) { |
+ throw new StreamException( |
+ "onNoPendingWrites handlers not supported on StreamBuffer"); |
+ } |
+ |
+ void _doOnDestroy(_) { |
+ if (_onDestroy != null) { |
+ _onDestroy(); |
+ } |
+ } |
+ |
+ Function _onDestroy; |
+ StreamBuffer _buffer; |
+} |