| Index: pkg/barback/lib/src/asset_cascade.dart
 | 
| diff --git a/pkg/barback/lib/src/asset_cascade.dart b/pkg/barback/lib/src/asset_cascade.dart
 | 
| index 338910d5477b5b364ccfa31febf5a01d48856a5d..40a615cbd462356708b37dee8ba4f4e35500dac6 100644
 | 
| --- a/pkg/barback/lib/src/asset_cascade.dart
 | 
| +++ b/pkg/barback/lib/src/asset_cascade.dart
 | 
| @@ -11,7 +11,8 @@ import 'package:stack_trace/stack_trace.dart';
 | 
|  
 | 
|  import 'asset.dart';
 | 
|  import 'asset_id.dart';
 | 
| -import 'asset_set.dart';
 | 
| +import 'asset_node.dart';
 | 
| +import 'cancelable_future.dart';
 | 
|  import 'errors.dart';
 | 
|  import 'change_batch.dart';
 | 
|  import 'package_graph.dart';
 | 
| @@ -38,6 +39,17 @@ class AssetCascade {
 | 
|    /// the current app.
 | 
|    final PackageGraph _graph;
 | 
|  
 | 
| +  /// The controllers for the [AssetNode]s that provide information about this
 | 
| +  /// cascade's package's source assets.
 | 
| +  final _sourceControllerMap = new Map<AssetId, AssetNodeController>();
 | 
| +
 | 
| +  /// Futures for source assets that are currently being loaded.
 | 
| +  ///
 | 
| +  /// These futures are cancelable so that if an asset is updated after a load
 | 
| +  /// has been kicked off, the previous load can be ignored in favor of a new
 | 
| +  /// one.
 | 
| +  final _loadingSources = new Map<AssetId, CancelableFuture<Asset>>();
 | 
| +
 | 
|    final _phases = <Phase>[];
 | 
|  
 | 
|    /// A stream that emits a [BuildResult] each time the build is completed,
 | 
| @@ -68,7 +80,9 @@ class AssetCascade {
 | 
|    /// If no build it in progress, is `null`.
 | 
|    Future _processDone;
 | 
|  
 | 
| -  ChangeBatch _sourceChanges;
 | 
| +  /// Whether any source assets have been updated or removed since processing
 | 
| +  /// last began.
 | 
| +  var _newChanges = false;
 | 
|  
 | 
|    /// Creates a new [AssetCascade].
 | 
|    ///
 | 
| @@ -106,54 +120,95 @@ class AssetCascade {
 | 
|      // * [id] may be generated before the compilation is finished. We should
 | 
|      //   be able to quickly check whether there are any more in-place
 | 
|      //   transformations that can be run on it. If not, we can return it early.
 | 
| -    // * If everything is compiled, something that didn't output [id] is
 | 
| -    //   dirtied, and then [id] is requested, we can return it immediately,
 | 
| -    //   since anything overwriting it at that point is an error.
 | 
|      // * If [id] has never been generated and all active transformers provide
 | 
|      //   metadata about the file names of assets it can emit, we can prove that
 | 
|      //   none of them can emit [id] and fail early.
 | 
| -    return (_processDone == null ? new Future.value() : _processDone).then((_) {
 | 
| -      // Each phase's inputs are the outputs of the previous phase. Find the
 | 
| -      // last phase that contains the asset. Since the last phase has no
 | 
| -      // transformers, this will find the latest output for that id.
 | 
| -
 | 
| -      // TODO(rnystrom): Currently does not omit assets that are actually used
 | 
| -      // as inputs for transformers. This means you can request and get an
 | 
| -      // asset that should be "consumed" because it's used to generate the
 | 
| -      // real asset you care about. Need to figure out how we want to handle
 | 
| -      // that and what use cases there are related to it.
 | 
| -      for (var i = _phases.length - 1; i >= 0; i--) {
 | 
| -        var node = _phases[i].inputs[id];
 | 
| -        if (node != null) {
 | 
| -          // By the time we get here, the asset should have been built.
 | 
| -          assert(node.asset != null);
 | 
| -          return node.asset;
 | 
| -        }
 | 
| +    return newFuture(() {
 | 
| +      var node = _getAssetNode(id);
 | 
| +
 | 
| +      // If the requested asset is available, we can just return it.
 | 
| +      if (node != null) return node.asset;
 | 
| +
 | 
| +      // If there's a build running, that build might generate the asset, so we
 | 
| +      // wait for it to complete and then try again.
 | 
| +      if (_processDone != null) {
 | 
| +        return _processDone.then((_) => getAssetById(id));
 | 
|        }
 | 
|  
 | 
| -      // Couldn't find it.
 | 
| +      // If the asset hasn't been built and nothing is building now, the asset
 | 
| +      // won't be generated, so we throw an error.
 | 
|        throw new AssetNotFoundException(id);
 | 
|      });
 | 
|    }
 | 
|  
 | 
| +  // Returns the post-transformation asset node for [id], if one is available.
 | 
| +  //
 | 
| +  // This will only return a node that has an asset available, and only if that
 | 
| +  // node is guaranteed not to be consumed by any transforms. If the phase is
 | 
| +  // still working to figure out if a node will be consumed by a transformer,
 | 
| +  // that node won't be returned.
 | 
| +  AssetNode _getAssetNode(AssetId id) {
 | 
| +    // Each phase's inputs are the outputs of the previous phase. Find the last
 | 
| +    // phase that contains the asset. Since the last phase has no transformers,
 | 
| +    // this will find the latest output for that id.
 | 
| +    for (var i = _phases.length - 1; i >= 0; i--) {
 | 
| +      var node = _phases[i].getUnconsumedInput(id);
 | 
| +      if (node != null) return node;
 | 
| +    }
 | 
| +
 | 
| +    return null;
 | 
| +  }
 | 
| +
 | 
|    /// Adds [sources] to the graph's known set of source assets.
 | 
|    ///
 | 
|    /// Begins applying any transforms that can consume any of the sources. If a
 | 
|    /// given source is already known, it is considered modified and all
 | 
|    /// transforms that use it will be re-applied.
 | 
|    void updateSources(Iterable<AssetId> sources) {
 | 
| -    if (_sourceChanges == null) _sourceChanges = new ChangeBatch();
 | 
| -    assert(sources.every((id) => id.package == package));
 | 
| -    _sourceChanges.update(sources);
 | 
| +    _newChanges = true;
 | 
| +
 | 
| +    for (var id in sources) {
 | 
| +      var controller = _sourceControllerMap[id];
 | 
| +      if (controller != null) {
 | 
| +        controller.setDirty();
 | 
| +      } else {
 | 
| +        _sourceControllerMap[id] = new AssetNodeController(id);
 | 
| +        _phases.first.addInput(_sourceControllerMap[id].node);
 | 
| +      }
 | 
| +
 | 
| +      // If this source was already loading, cancel the old load, since it may
 | 
| +      // return out-of-date contents for the asset.
 | 
| +      if (_loadingSources.containsKey(id)) _loadingSources[id].cancel();
 | 
| +
 | 
| +      _loadingSources[id] =
 | 
| +          new CancelableFuture<Asset>(_graph.provider.getAsset(id));
 | 
| +      _loadingSources[id].whenComplete(() {
 | 
| +        _loadingSources.remove(id);
 | 
| +      }).then((asset) {
 | 
| +        var controller = _sourceControllerMap[id].setAvailable(asset);
 | 
| +      }).catchError((error) {
 | 
| +        reportError(error);
 | 
| +
 | 
| +        // TODO(nweiz): propagate error information through asset nodes.
 | 
| +        _sourceControllerMap.remove(id).setRemoved();
 | 
| +      });
 | 
| +    }
 | 
|  
 | 
|      _waitForProcess();
 | 
|    }
 | 
|  
 | 
|    /// Removes [removed] from the graph's known set of source assets.
 | 
|    void removeSources(Iterable<AssetId> removed) {
 | 
| -    if (_sourceChanges == null) _sourceChanges = new ChangeBatch();
 | 
| -    assert(removed.every((id) => id.package == package));
 | 
| -    _sourceChanges.remove(removed);
 | 
| +    _newChanges = true;
 | 
| +
 | 
| +    removed.forEach((id) {
 | 
| +      // If the source was being loaded, cancel that load.
 | 
| +      if (_loadingSources.containsKey(id)) _loadingSources.remove(id).cancel();
 | 
| +
 | 
| +      var controller = _sourceControllerMap.remove(id);
 | 
| +      // Don't choke if an id is double-removed for some reason.
 | 
| +      if (controller != null) controller.setRemoved();
 | 
| +    });
 | 
|  
 | 
|      _waitForProcess();
 | 
|    }
 | 
| @@ -198,7 +253,8 @@ class AssetCascade {
 | 
|    ///
 | 
|    /// Returns a future that completes when all assets have been processed.
 | 
|    Future _process() {
 | 
| -    return _processSourceChanges().then((_) {
 | 
| +    _newChanges = false;
 | 
| +    return newFuture(() {
 | 
|        // Find the first phase that has work to do and do it.
 | 
|        var future;
 | 
|        for (var phase in _phases) {
 | 
| @@ -209,7 +265,7 @@ class AssetCascade {
 | 
|        // If all phases are done and no new updates have come in, we're done.
 | 
|        if (future == null) {
 | 
|          // If changes have come in, start over.
 | 
| -        if (_sourceChanges != null) return _process();
 | 
| +        if (_newChanges) return _process();
 | 
|  
 | 
|          // Otherwise, everything is done.
 | 
|          return;
 | 
| @@ -219,42 +275,6 @@ class AssetCascade {
 | 
|        return future.then((_) => _process());
 | 
|      });
 | 
|    }
 | 
| -
 | 
| -  /// Processes the current batch of changes to source assets.
 | 
| -  Future _processSourceChanges() {
 | 
| -    // Always pump the event loop. This ensures a bunch of synchronous source
 | 
| -    // changes are processed in a single batch even when the first one starts
 | 
| -    // the build process.
 | 
| -    return newFuture(() {
 | 
| -      if (_sourceChanges == null) return null;
 | 
| -
 | 
| -      // Take the current batch to ensure it doesn't get added to while we're
 | 
| -      // processing it.
 | 
| -      var changes = _sourceChanges;
 | 
| -      _sourceChanges = null;
 | 
| -
 | 
| -      var updated = new AssetSet();
 | 
| -      var futures = [];
 | 
| -      for (var id in changes.updated) {
 | 
| -        // TODO(rnystrom): Catch all errors from provider and route to results.
 | 
| -        futures.add(_graph.provider.getAsset(id).then((asset) {
 | 
| -          updated.add(asset);
 | 
| -        }).catchError((error) {
 | 
| -          if (error is AssetNotFoundException) {
 | 
| -            // Handle missing asset errors like regular missing assets.
 | 
| -            reportError(error);
 | 
| -          } else {
 | 
| -            // It's an unexpected error, so rethrow it.
 | 
| -            throw error;
 | 
| -          }
 | 
| -        }));
 | 
| -      }
 | 
| -
 | 
| -      return Future.wait(futures).then((_) {
 | 
| -        _phases.first.updateInputs(updated, changes.removed);
 | 
| -      });
 | 
| -    });
 | 
| -  }
 | 
|  }
 | 
|  
 | 
|  /// An event indicating that the cascade has finished building all assets.
 | 
| 
 |