Index: utils/archive/input_stream.dart |
diff --git a/utils/archive/input_stream.dart b/utils/archive/input_stream.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4ccb00034bda4a5de72d09ab3de773d360c2093a |
--- /dev/null |
+++ b/utils/archive/input_stream.dart |
@@ -0,0 +1,141 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+#library("input_stream"); |
+ |
+#import("entry.dart"); |
+#import("read_request.dart"); |
+#import("utils.dart"); |
+ |
+// TODO(nweiz): ensure that the C struct associated with an input stream gets |
+// freed if the stream gets garbage collected before it's closed. |
+/** |
+ * A stream of [ArchiveEntry]s being read from an archive. |
+ * |
+ * This is accessible via [ArchiveReader]. |
+ */ |
+class ArchiveInputStream { |
+ /** |
+ * The id of the underlying archive. |
+ * |
+ * This will be set to null once the input stream has finished reading from |
+ * the archive. |
+ */ |
+ int _id; |
+ |
+ /** A [Completer] that will fire once the [_onEntry] callback is set. */ |
+ final Completer<Function> _onEntryCompleter; |
+ |
+ /** The callback to call when the input stream is closed. */ |
+ Function _onClosed; |
+ |
+ /** The callback to call when an error occurs. */ |
+ Function _onError; |
+ |
+ /** The entry that is currently eligible to read data from the archive. */ |
+ ArchiveEntry _currentEntry; |
+ |
+ ArchiveInputStream(this._id) : _onEntryCompleter = new Completer<Function>() { |
+ var future = _consumeHeaders(); |
+ future.handleException((e) { |
+ if (_onError != null) { |
+ _onError(e, future.stackTrace); |
+ return true; |
+ } else { |
+ throw e; |
+ } |
+ }); |
+ |
+ future.then((_) { |
+ close(); |
+ if (_onClosed != null) _onClosed(); |
+ }); |
+ } |
+ |
+ /** Whether this stream has finished reading entries. */ |
+ bool get closed() => _id == null; |
+ |
+ /** |
+ * Sets a callback to call when a new entry is read from the archive. |
+ * |
+ * The [ArchiveEntry] that's read from an archive initially only contains |
+ * header information such as the filename and permissions. To get the actual |
+ * data contained in the entry, use [ArchiveEntry.openInputStream]. |
+ * |
+ * Since the entries are read in sequence from the archive, the data stream |
+ * for one entry must be opened before the next entry is read from the |
+ * archive. The next entry will not be read until the return value of |
+ * [callback] has completed. |
+ * |
+ * If [callback] calls [ArchiveEntry.openInputStream] before it returns, or if |
+ * it doesn't want to read the contents of [entry], it can return null. |
+ */ |
+ void set onEntry(Future callback(ArchiveEntry entry)) { |
+ _onEntryCompleter.complete(callback); |
+ } |
+ |
+ /** |
+ * Sets a callback to call when the input stream is done emitting entries. |
+ */ |
+ void set onClosed(void callback()) { |
+ _onClosed = callback; |
+ } |
+ |
+ /** |
+ * Sets a callback to call if an error occurs while extracting the archive. |
+ * |
+ * [e] is the error that occured and [stack] is the stack trace of the error. |
+ */ |
+ void set onError(void callback(e, stack)) { |
+ _onError = callback; |
+ } |
+ |
+ /** |
+ * Closes the input stream. No more entries will be emitted. |
+ */ |
+ void close() { |
+ if (closed) return; |
+ call(FREE, _id).then((_) {}); |
+ _id = null; |
+ if (_currentEntry != null) _currentEntry.close(); |
+ if (!_onEntryCompleter.future.isComplete) _onEntryCompleter.complete(null); |
+ } |
+ |
+ /** |
+ * Consumes and emits all [ArchiveEntries] in this archive. |
+ */ |
+ Future _consumeHeaders() { |
+ if (closed) return new Future.immediate(null); |
+ var data; |
+ return call(NEXT_HEADER, _id).chain((_data) { |
+ data = _data; |
+ if (data == null) return new Future.immediate(null); |
+ return _emit(new ArchiveEntry(_id, data)). |
+ chain((_) => _consumeHeaders()); |
+ }); |
+ } |
+ |
+ /** |
+ * Emits [entry] to the [onEntry] callback. Returns a [Future] that will |
+ * complete once the callback's return value completes and the entry's data |
+ * has been fully consumed. |
+ */ |
+ Future _emit(ArchiveEntry entry) { |
+ _currentEntry = entry; |
+ var future = _onEntryCompleter.future.chain((onEntry) { |
+ if (closed) return new Future.immediate(null); |
+ var result = onEntry(entry); |
+ if (result is Future) return result; |
+ return new Future.immediate(null); |
+ }).chain((_) { |
+ if (entry.isInputOpen) return entry.inputComplete; |
+ return new Future.immediate(null); |
+ }); |
+ future.onComplete((_) { |
+ _currentEntry = null; |
+ entry.close(); |
+ }); |
+ return future; |
+ } |
+} |