Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(332)

Unified Diff: runtime/bin/stream_buffer.dart

Issue 10836177: Add StreamBuffer to dart:io. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebase, without changes. Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/standalone/io/stream_buffer_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/stream_buffer.dart
diff --git a/runtime/bin/stream_buffer.dart b/runtime/bin/stream_buffer.dart
new file mode 100644
index 0000000000000000000000000000000000000000..5df701c93c2abceffa14b867fc842095e2f97ae1
--- /dev/null
+++ b/runtime/bin/stream_buffer.dart
@@ -0,0 +1,249 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+
+/**
+ * A [StreamBuffer] is an unbounded buffer that is written to through the
+ * [OutputStream] [toBuffer] and is read from the [InputStream] [fromBuffer].
+ */
+class StreamBuffer {
+ StreamBuffer()
+ : _data = [],
+ _closed = false,
+ _destroyed = false,
+ _available = 0,
+ fromBuffer = new _StreamBufferInputStream(),
+ toBuffer = new _StreamBufferOutputStream() {
+ fromBuffer._buffer = this;
+ toBuffer._buffer = this;
+ }
+
+ void destroy() {
+ _closed = true;
+ _closePending = true;
+ _data.clear();
+ if (!_destroyed) {
+ new Timer(0, toBuffer._doOnDestroy);
+ }
+ _destroyed = true;
+ _available = 0;
+ if (fromBuffer._onDataScheduledEvent != null) {
+ fromBuffer._onDataScheduledEvent.cancel();
+ }
+ if (fromBuffer._onClosedScheduledEvent != null) {
+ fromBuffer._onClosedScheduledEvent.cancel();
+ }
+ }
+
+ final _StreamBufferInputStream fromBuffer;
+ final _StreamBufferOutputStream toBuffer;
+ final List<List<int>> _data;
+ bool _closePending;
+ bool _closed;
+ bool _destroyed;
+ int _available;
+}
+
+class _StreamBufferInputStream implements InputStream {
+ int available() => _buffer._available;
+ bool get closed() => _buffer._closed;
+
+ void set onData(void callback()) {
+ _onData = callback;
+ _scheduleCallbacks();
+ }
+
+ void set onClosed(void callback()) {
+ _onClosed = callback;
+ _scheduleCallbacks();
+ }
+
+ void set onError(void callback(e)) {
+ throw new StreamException(
+ "Error events not supported on StreamBuffer");
+ }
+
+ List<int> read([int len]) {
+ if (closed || available() == 0) {
+ return const [];
+ }
+ if (len == null || len > available()) {
+ len = available();
+ }
+ _buffer._available -= len;
+ var data = _buffer._data;
+ List<int> result;
+ if (len == data[0].length) {
+ result = data[0];
+ data.removeRange(0, 1);
+ } else {
+ result = new List<int>(len);
+ int chunkIndex = 0;
+ int written = 0;
+ // Copy all the chunks except a possible partial last chunk.
+ while (len - written > 0 &&
+ data[chunkIndex].length <= len - written) {
+ result.setRange(written, data[chunkIndex].length, data[chunkIndex]);
+ written += data[chunkIndex].length;
+ ++chunkIndex;
+ }
+ if (len - written > 0) {
+ // Copy the last partial chunk.
+ var last = data[chunkIndex];
+ result.setRange(written, len - written, data[chunkIndex]);
+ data[chunkIndex].removeRange(0, len - written);
+ }
+ data.removeRange(0, chunkIndex);
+ }
+ _scheduleCallbacks();
+ return result;
+ }
+
+ int readInto(List<int> buffer, [int offset = 0, int len]) {
+ if (closed || available() == 0) {
+ return 0;
+ }
+ if (len == null) len = buffer.length - offset;
+ if (len > available()) {
+ len = available();
+ }
+
+ _buffer._available -= len;
+ var data = _buffer._data;
+ int pos = offset;
+ int remaining = len;
+ int chunkIndex = 0;
+ int removedChunks = 0;
+ while (remaining > 0) {
+ var chunk = data[chunkIndex];
+ int bytes = chunk.length;
+ if (remaining >= bytes) {
+ buffer.setRange(pos, bytes, chunk);
+ pos += bytes;
+ remaining -= bytes;
+ ++removedChunks;
+ ++chunkIndex;
+ } else {
+ buffer.setRange(pos, remaining, chunk);
+ chunk.removeRange(0, remaining);
+ remaining = 0;
+ }
+ }
+ data.removeRange(0, removedChunks);
+ _scheduleCallbacks();
+ return len;
+ }
+
+ void close() {
+ _buffer.destroy();
+ }
+
+ void pipe(OutputStream output, [bool close]) {
+ // Not implemented yet.
+ }
+
+ void _scheduleCallbacks() {
+ if (closed) return;
+ if (available() > 0) {
+ if (_onDataScheduledEvent == null) {
+ _onDataScheduledEvent = new Timer(0, _doOnData);
+ }
+ } else {
+ if (_onDataScheduledEvent != null) {
+ _onDataScheduledEvent.cancel();
+ }
+ if (_buffer._closePending && _onClosedScheduledEvent == null) {
+ _onClosedScheduledEvent = new Timer(0, _doOnClosed);
+ }
+ }
+ }
+
+ void _doOnData(_) {
+ _onDataScheduledEvent = null;
+ if (_onData != null) {
+ _onData();
+ }
+ }
+
+ void _doOnClosed(_) {
+ _buffer._closed = true;
+ _onClosedScheduledEvent = null;
+ if (_onClosed != null) {
+ _onClosed();
+ }
+ }
+
+ StreamBuffer _buffer;
+ Function _onData;
+ Function _onClosed;
+ Timer _onDataScheduledEvent;
+ Timer _onClosedScheduledEvent;
+
+
+}
+
+class _StreamBufferOutputStream implements OutputStream {
+ bool write(List<int> buffer, [bool copyBuffer = true]) {
+ if (_buffer._destroyed) {
+ throw new StreamException("Writing to a destroyed StreamBuffer");
+ }
+ if (_buffer._closePending) {
+ throw new StreamException.streamClosed();
+ }
+
+ if (copyBuffer) {
+ _buffer._data.add(new List<int>.from(buffer));
+ } else {
+ _buffer._data.add(buffer);
+ }
+ _buffer._available += buffer.length;
+ _buffer.fromBuffer._scheduleCallbacks();
+ return true;
+ }
+
+ bool writeFrom(List<int> buffer, [int offset, int len]) {
+ if (offset == null) offset = 0;
+ if (len > buffer.length - offset) len = buffer.length - offset;
+ return write(buffer.getRange(offset, len), copyBuffer: false);
+ }
+
+ bool writeString(String string, [Encoding encoding]) {
+ // Not implemented yet.
+ return true;
+ }
+
+ void close() {
+ _buffer._closePending = true;
+ _buffer.fromBuffer._scheduleCallbacks();
+ }
+
+ void destroy() {
+ _buffer.destroy();
+ }
+
+ void flush() { }
+
+ void set onClosed(void callback()) {
+ _onDestroy = callback;
+ }
+
+ void set onError(void callback(e)) {
+ throw new StreamException(
+ "Error events not supported on StreamBuffer");
+ }
+
+ void set onNoPendingWrites(void callback()) {
+ throw new StreamException(
+ "onNoPendingWrites handlers not supported on StreamBuffer");
+ }
+
+ void _doOnDestroy(_) {
+ if (_onDestroy != null) {
+ _onDestroy();
+ }
+ }
+
+ Function _onDestroy;
+ StreamBuffer _buffer;
+}
« no previous file with comments | « no previous file | tests/standalone/io/stream_buffer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698