Index: pkg/barback/lib/src/phase.dart |
diff --git a/pkg/barback/lib/src/phase.dart b/pkg/barback/lib/src/phase.dart |
index e16f94b7a8e9e3aa7a4a175a7ae0be3ff81484d6..4d2afb4dabcb9b929b904fe157cbf78789e7eb97 100644 |
--- a/pkg/barback/lib/src/phase.dart |
+++ b/pkg/barback/lib/src/phase.dart |
@@ -14,6 +14,7 @@ import 'asset_set.dart'; |
import 'errors.dart'; |
import 'transform_node.dart'; |
import 'transformer.dart'; |
+import 'utils.dart'; |
/// One phase in the ordered series of transformations in an [AssetCascade]. |
/// |
@@ -45,19 +46,32 @@ class Phase { |
/// phases, they will be the outputs from the previous phase. |
final inputs = new Map<AssetId, AssetNode>(); |
- /// The transforms currently applicable to assets in [inputs]. |
+ /// The transforms currently applicable to assets in [inputs], indexed by |
+ /// the ids of their primary inputs. |
/// |
/// These are the transforms that have been "wired up": they represent a |
/// repeatable transformation of a single concrete set of inputs. "dart2js" |
/// is a transformer. "dart2js on web/main.dart" is a transform. |
- final _transforms = new Set<TransformNode>(); |
+ final _transforms = new Map<AssetId, Set<TransformNode>>(); |
- /// The nodes that are new in this phase since the last time [process] was |
- /// called. |
+ /// Futures that will complete once the transformers that can consume a given |
+ /// asset are determined. |
/// |
- /// When we process, we'll check these to see if we can hang new transforms |
- /// off them. |
- final _newInputs = new Set<AssetNode>(); |
+ /// Whenever an asset is added or modified, we need to asynchronously |
+ /// determine which transformers can use it as their primary input. We can't |
+ /// start processing until we know which transformers to run, and this allows |
+ /// us to wait until we do. |
+ var _adjustTransformersFutures = new Map<AssetId, Future>(); |
+ |
+ /// New asset nodes that were added while [_adjustTransformers] was still |
+ /// being run on an old version of that asset. |
+ var _pendingNewInputs = new Map<AssetId, AssetNode>(); |
+ |
+ /// The ids of assets that are emitted by transforms in this phase. |
+ /// |
+ /// This is used to detect collisions where multiple transforms emit the same |
+ /// output. |
+ final _outputs = new Set<AssetId>(); |
/// The phase after this one. |
/// |
@@ -66,133 +80,217 @@ class Phase { |
Phase(this.cascade, this._index, this._transformers, this._next); |
- /// Updates the phase's inputs with [updated] and removes [removed]. |
+ /// Adds a new asset as an input for this phase. |
/// |
- /// This marks any affected [transforms] as dirty or discards them if their |
- /// inputs are removed. |
- void updateInputs(AssetSet updated, Set<AssetId> removed) { |
- // Remove any nodes that are no longer being output. Handle removals first |
- // in case there are assets that were removed by one transform but updated |
- // by another. In that case, the update should win. |
- for (var id in removed) { |
- var node = inputs.remove(id); |
- |
- // Every transform that was using it is dirty now. |
- if (node != null) { |
- node.consumers.forEach((consumer) => consumer.dirty()); |
- } |
- } |
+ /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase |
+ /// will automatically begin determining which transforms can consume it as a |
+ /// primary input. The transforms themselves won't be applied until [process] |
+ /// is called, however. |
+ /// |
+ /// This should only be used for brand-new assets or assets that have been |
+ /// removed and re-created. The phase will automatically handle updated assets |
+ /// using the [AssetNode.onStateChange] stream. |
+ void addInput(AssetNode node) { |
+ // We remove [node.id] from [inputs] as soon as the node is removed rather |
+ // than at the same time [node.id] is removed from [_transforms] so we don't |
+ // have to wait on [_adjustTransformers]. It's important that [inputs] is |
+ // always up-to-date so that the [AssetCascade] can look there for available |
+ // assets. |
+ inputs[node.id] = node; |
+ node.whenRemoved.then((_) => inputs.remove(node.id)); |
- // Update and new or modified assets. |
- for (var asset in updated) { |
- var node = inputs[asset.id]; |
- if (node == null) { |
- // It's a new node. Add it and remember it so we can see if any new |
- // transforms will consume it. |
- node = new AssetNode(asset); |
- inputs[asset.id] = node; |
- _newInputs.add(node); |
- } else { |
- node.updateAsset(asset); |
- } |
+ if (!_adjustTransformersFutures.containsKey(node.id)) { |
+ _transforms[node.id] = new Set<TransformNode>(); |
+ _adjustTransformers(node); |
+ return; |
} |
+ |
+ // If an input is added while the same input is still being processed, |
+ // that means that the asset was removed and recreated while |
+ // [_adjustTransformers] was being run on the old value. We have to wait |
+ // until that finishes, then run it again on whatever the newest version |
+ // of that asset is. |
+ |
+ // We may already be waiting for the existing [_adjustTransformers] call to |
+ // finish. If so, all we need to do is change the node that will be loaded |
+ // after it completes. |
+ var containedKey = _pendingNewInputs.containsKey(node.id); |
+ _pendingNewInputs[node.id] = node; |
+ if (containedKey) return; |
+ |
+ // If we aren't already waiting, start doing so. |
+ _adjustTransformersFutures[node.id].then((_) { |
+ assert(!_adjustTransformersFutures.containsKey(node.id)); |
+ assert(_pendingNewInputs.containsKey(node.id)); |
+ _transforms[node.id] = new Set<TransformNode>(); |
+ _adjustTransformers(_pendingNewInputs.remove(node.id)); |
+ }, onError: (_) { |
+ // If there was a programmatic error while processing the old input, |
+ // we don't want to just ignore it; it may have left the system in an |
+ // inconsistent state. We also don't want to top-level it, so we |
+ // ignore it here but don't start processing the new input. That way |
+ // when [process] is called, the error will be piped through its |
+ // return value. |
+ }).catchError((e) { |
+ // If our code above has a programmatic error, ensure it will be piped |
+ // through [process] by putting it into [_adjustTransformersFutures]. |
+ _adjustTransformersFutures[node.id] = new Future.error(e); |
+ }); |
} |
- /// Processes this phase. |
+ /// Returns the input for this phase with the given [id], but only if that |
+ /// input is known not to be consumed as a transformer's primary input. |
/// |
- /// For all new inputs, it tries to see if there are transformers that can |
- /// consume them. Then all applicable transforms are applied. |
+ /// If the input is unavailable, or if the phase hasn't determined whether or |
+ /// not any transformers will consume it as a primary input, null will be |
+ /// returned instead. This means that the return value is guaranteed to always |
+ /// be [AssetState.AVAILABLE]. |
+ AssetNode getUnconsumedInput(AssetId id) { |
+ if (!inputs.containsKey(id)) return null; |
+ |
+ // If the asset has transforms, it's not unconsumed. |
+ if (!_transforms[id].isEmpty) return null; |
+ |
+ // If we're working on figuring out if the asset has transforms, we can't |
+ // prove that it's unconsumed. |
+ if (_adjustTransformersFutures.containsKey(id)) return null; |
+ |
+ // The asset should be available. If it were removed, it wouldn't be in |
+ // _inputs, and if it were dirty, it'd be in _adjustTransformersFutures. |
+ assert(inputs[id].state.isAvailable); |
+ return inputs[id]; |
+ } |
+ |
+ /// Asynchronously determines which transformers can consume [node] as a |
+ /// primary input and creates transforms for them. |
/// |
- /// Returns a future that completes when processing is done. If there is |
- /// nothing to process, returns `null`. |
- Future process() { |
- var future = _processNewInputs(); |
- if (future == null) { |
- return _processTransforms(); |
- } |
+ /// This ensures that if [node] is modified or removed during or after the |
+ /// time it takes to adjust its transformers, they're appropriately |
+ /// re-adjusted. Its progress can be tracked in [_adjustTransformersFutures]. |
+ void _adjustTransformers(AssetNode node) { |
+ // Once the input is available, hook up transformers for it. If it changes |
+ // while that's happening, try again. |
+ _adjustTransformersFutures[node.id] = node.tryUntilStable((asset) { |
+ var oldTransformers = _transforms[node.id] |
+ .map((transform) => transform.transformer).toSet(); |
- return future.then((_) => _processTransforms()); |
+ return _removeStaleTransforms(asset) |
+ .then((_) => _addFreshTransforms(node, oldTransformers)); |
+ }).then((_) { |
+ // Now all the transforms are set up correctly and the asset is available |
+ // for the time being. Set up handlers for when the asset changes in the |
+ // future. |
+ node.onStateChange.first.then((state) { |
+ if (state.isRemoved) { |
+ _transforms.remove(node.id); |
+ } else { |
+ _adjustTransformers(node); |
+ } |
+ }).catchError((e) { |
+ _adjustTransformersFutures[node.id] = new Future.error(e); |
+ }); |
+ }).catchError((error) { |
+ if (error is! AssetNotFoundException || error.id != node.id) throw error; |
+ |
+ // If the asset is removed, [tryUntilStable] will throw an |
+ // [AssetNotFoundException]. In that case, just remove all transforms for |
+ // the node. |
+ _transforms.remove(node.id); |
+ }).whenComplete(() { |
+ _adjustTransformersFutures.remove(node.id); |
+ }); |
+ |
+ // Don't top-level errors coming from the input processing. Any errors will |
+ // eventually be piped through [process]'s returned Future. |
+ _adjustTransformersFutures[node.id].catchError((_) {}); |
} |
- /// Creates new transforms for any new inputs that are applicable. |
- Future _processNewInputs() { |
- if (_newInputs.isEmpty) return null; |
- |
- var futures = []; |
- for (var node in _newInputs) { |
- for (var transformer in _transformers) { |
- // TODO(rnystrom): Catch all errors from isPrimary() and redirect |
- // to results. |
- futures.add(transformer.isPrimary(node.asset).then((isPrimary) { |
- if (!isPrimary) return; |
- var transform = new TransformNode(this, transformer, node); |
- node.consumers.add(transform); |
- _transforms.add(transform); |
- })); |
- } |
- } |
+ // Remove any old transforms that used to have [asset] as a primary asset but |
+ // no longer apply to its new contents. |
+ Future _removeStaleTransforms(Asset asset) { |
+ return Future.wait(_transforms[asset.id].map((transform) { |
+ // TODO(rnystrom): Catch all errors from isPrimary() and redirect to |
+ // results. |
+ return transform.transformer.isPrimary(asset).then((isPrimary) { |
+ if (isPrimary) return; |
+ _transforms[asset.id].remove(transform); |
+ transform.remove(); |
+ }); |
+ })); |
+ } |
- _newInputs.clear(); |
+ // Add new transforms for transformers that consider [node]'s asset to be a |
+ // primary input. |
+ // |
+ // [oldTransformers] is the set of transformers that had [node] as a primary |
+ // input prior to this. They don't need to be checked, since they were removed |
+ // or preserved in [_removeStaleTransforms]. |
+ Future _addFreshTransforms(AssetNode node, Set<Transformer> oldTransformers) { |
+ return Future.wait(_transformers.map((transformer) { |
+ if (oldTransformers.contains(transformer)) return new Future.value(); |
- return Future.wait(futures); |
+ // If the asset is unavailable, the results of this [_adjustTransformers] |
+ // run will be discarded, so we can just short-circuit. |
+ if (node.asset == null) return new Future.value(); |
+ |
+ // We can safely access [node.asset] here even though it might have |
+ // changed since (as above) if it has, [_adjustTransformers] will just be |
+ // re-run. |
+ // TODO(rnystrom): Catch all errors from isPrimary() and redirect to |
+ // results. |
+ return transformer.isPrimary(node.asset).then((isPrimary) { |
+ if (!isPrimary) return; |
+ _transforms[node.id].add(new TransformNode(this, transformer, node)); |
+ }); |
+ })); |
} |
- /// Applies all currently wired up and dirty transforms. |
+ /// Processes this phase. |
/// |
- /// Passes their outputs to the next phase. |
+ /// Returns a future that completes when processing is done. If there is |
+ /// nothing to process, returns `null`. |
+ Future process() { |
+ if (_adjustTransformersFutures.isEmpty) return _processTransforms(); |
+ return _waitForInputs().then((_) => _processTransforms()); |
+ } |
+ |
+ Future _waitForInputs() { |
+ if (_adjustTransformersFutures.isEmpty) return new Future.value(); |
+ return Future.wait(_adjustTransformersFutures.values) |
+ .then((_) => _waitForInputs()); |
+ } |
+ |
+ /// Applies all currently wired up and dirty transforms. |
Future _processTransforms() { |
// Convert this to a list so we can safely modify _transforms while |
// iterating over it. |
- var dirtyTransforms = _transforms.where((transform) => transform.isDirty) |
- .toList(); |
+ var dirtyTransforms = |
+ flatten(_transforms.values.map((transforms) => transforms.toList())) |
+ .where((transform) => transform.isDirty).toList(); |
if (dirtyTransforms.isEmpty) return null; |
- return Future.wait(dirtyTransforms.map((transform) { |
- if (inputs.containsKey(transform.primary.id)) return transform.apply(); |
- |
- // If the primary input for the transform has been removed, get rid of it |
- // and all its outputs. |
- _transforms.remove(transform); |
- return new Future.value( |
- new TransformOutputs(new AssetSet(), transform.outputs)); |
- })).then((transformOutputs) { |
- // Collect all of the outputs. Since the transforms are run in parallel, |
- // we have to be careful here to ensure that the result is deterministic |
- // and not influenced by the order that transforms complete. |
- var updated = new AssetSet(); |
- var removed = new Set<AssetId>(); |
- var collisions = new Set<AssetId>(); |
+ return Future.wait(dirtyTransforms.map((transform) => transform.apply())) |
+ .then((allNewOutputs) { |
+ var newOutputs = allNewOutputs.reduce((set1, set2) => set1.union(set2)); |
- // Handle the generated outputs of all transforms first. |
- for (var outputs in transformOutputs) { |
- // Collect the outputs of all transformers together. |
- for (var asset in outputs.updated) { |
- if (updated.containsId(asset.id)) { |
- // Report a collision. |
- collisions.add(asset.id); |
- } else { |
- // TODO(rnystrom): In the case of a collision, the asset that |
- // "wins" is chosen non-deterministically. Do something better. |
- updated.add(asset); |
- } |
+ var collisions = new Set<AssetId>(); |
+ for (var newOutput in newOutputs) { |
+ if (_outputs.contains(newOutput.id)) { |
+ collisions.add(newOutput.id); |
+ } else { |
+ _next.addInput(newOutput); |
+ _outputs.add(newOutput.id); |
+ newOutput.whenRemoved.then((_) => _outputs.remove(newOutput.id)); |
} |
- |
- // Track any assets no longer output by this transform. We don't |
- // handle the case where *another* transform generates the asset |
- // no longer generated by this one. updateInputs() handles that. |
- removed.addAll(outputs.removed); |
} |
- // Report any collisions in deterministic order. |
+ // Report collisions in a deterministic order. |
collisions = collisions.toList(); |
collisions.sort((a, b) => a.toString().compareTo(b.toString())); |
for (var collision in collisions) { |
cascade.reportError(new AssetCollisionException(collision)); |
// TODO(rnystrom): Define what happens after a collision occurs. |
} |
- |
- // Pass the outputs to the next phase. |
- _next.updateInputs(updated, removed); |
}); |
} |
} |