Index: lib/protobuf/runtime/PbInputStreamReader.dart |
diff --git a/lib/protobuf/runtime/PbInputStreamReader.dart b/lib/protobuf/runtime/PbInputStreamReader.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..138aa339241e2dbb7ea2a81817cbc3868a6cef65 |
--- /dev/null |
+++ b/lib/protobuf/runtime/PbInputStreamReader.dart |
@@ -0,0 +1,116 @@ |
+// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+class PbInputStreamReader implements PbReader { |
+ |
+ PbInputStreamReader(InputStream this._inputStream) |
+ : _buffer = new PbByteBuffer(); |
+ |
+ bool get closed() => _inputStream.closed && _buffer.isEmpty; |
+ |
+ int get position() => _position; |
+ |
+ Future<int> readByte() => |
+ readBytes(1).transform((List<int> b) => b.length > 0 ? b[0] : null); |
+ |
+ Future<List<int>> readBytes([int count = -1]) { |
+ if (_completer != null) throw new PbBufferBusyStateException(); |
+ if (count == 0) { |
+ return new Future.immediate(<int>[]); |
+ } else if ((count > 0 && _buffer.length >= count) || _inputStream.closed) { |
+ return new Future.immediate(_buffer.readBytes(count)); |
+ } else { |
+ _completer = new Completer<List<int>>(); |
+ _bytesToReturn = count; |
+ _bytesToRead = (count >= 0) && (_bytesToRead >= 0) ? |
+ Math.max(count, _bytesToRead) : -1; |
+ _readIntoPbByteBuffer(); |
+ return _completer.future; |
+ } |
+ } |
+ |
+ void setReadAhead([int count]) { |
+ _bytesToRead = (count >= 0) && (_bytesToRead >= 0) ? |
+ Math.max(count, _bytesToRead) : -1; |
+ _readIntoPbByteBuffer(); |
+ } |
+ |
+ void _fulfillRequest() { |
+ if (_completer != null && |
+ ((_bytesToReturn > 0 && _buffer.length >= _bytesToReturn) || |
+ _inputStream.closed)) { |
+ Completer c = _completer; |
+ _completer = null; |
+ |
+ List<int> bytes = _buffer.readBytes(_bytesToReturn); |
+ _position += bytes.length; |
+ c.complete(bytes); |
+ } |
+ } |
+ |
+ void _handleDataReadToLength() { |
+ int availableBytes = _inputStream.available(); |
+ if (availableBytes > 0) { |
+ int readSize = Math.min(availableBytes, _bytesToRead); |
+ List<int> data = _inputStream.read(readSize); |
+ if (null != data) { |
+ _buffer.add(data); |
+ _bytesToRead -= data.length; |
+ if (_bytesToRead == 0) { |
+ _reset(); |
+ } |
+ } |
+ } |
+ _fulfillRequest(); |
+ } |
+ |
+ void _handleDataReadToEnd() { |
+ if (_inputStream.available() > 0) { |
+ List<int> data = _inputStream.read(); |
+ if (null !== data) { |
+ _buffer.add(data); |
+ } |
+ } |
+ _fulfillRequest(); |
+ } |
+ |
+ void _handleClose() { |
+ _reset(); |
+ _fulfillRequest(); |
+ } |
+ |
+ void _handleError(Exception e) { |
+ _reset(); |
+ _completer.completeException(e); |
+ } |
+ |
+ /** |
+ * Reads the bytes requested and mark their availability with the |
+ * returned Future. |
+ */ |
+ void _readIntoPbByteBuffer() { |
+ if(_bytesToRead < 0) { |
+ _inputStream.onClosed = _handleClose; |
+ _inputStream.onError = _handleError; |
+ _inputStream.onData = _handleDataReadToEnd; |
+ } else if (_bytesToRead > 0) { |
+ _inputStream.onClosed = _handleClose; |
+ _inputStream.onError = _handleError; |
+ _inputStream.onData = _handleDataReadToLength; |
+ } |
+ } |
+ |
+ void _reset() { |
+ _inputStream.onData = null; |
+ _inputStream.onClosed = null; |
+ _inputStream.onError = null; |
+ } |
+ |
+ InputStream _inputStream; |
+ PbByteBuffer _buffer; |
+ int _bytesToReturn; |
+ int _bytesToRead = 0; |
+ Completer<List<int>> _completer; |
+ int _position = 0; |
+} |