Index: appengine/cmd/dm/mutate/ack_fwd_dep.go |
diff --git a/appengine/cmd/dm/mutate/ack_fwd_dep.go b/appengine/cmd/dm/mutate/ack_fwd_dep.go |
index b982cc9a6445dad718d6b5c2e987e4477ed99996..1dbd67e6bf85d01a8be7fda12d6db457144f0df6 100644 |
--- a/appengine/cmd/dm/mutate/ack_fwd_dep.go |
+++ b/appengine/cmd/dm/mutate/ack_fwd_dep.go |
@@ -12,16 +12,9 @@ import ( |
"golang.org/x/net/context" |
) |
-// AckFwdDep records the fact that a BackDep was successfully created for this |
-// FwdDep. It may also propagate Finished information (e.g. that the depended-on |
-// Attempt was actually already completed at the time that the dependency was |
-// taken on it). |
-// |
-// AckFwdDep is also used to propagate the fact that an Attempt (A) completed |
-// back to another Attempt (B) that's blocked on A. |
+// AckFwdDep records the fact that a dependency was completed. |
type AckFwdDep struct { |
- Dep *model.FwdEdge |
- DepIsFinished bool |
+ Dep *model.FwdEdge |
} |
// Root implements tumble.Mutation. |
@@ -39,39 +32,23 @@ func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err |
return |
} |
- // if the attempt and fdep aren't on the same execution, then bail |
- if atmpt.CurExecution != fdep.ForExecution { |
+ if atmpt.State != dm.Attempt_WAITING || atmpt.CurExecution != fdep.ForExecution { |
return |
} |
- needPut := false |
- |
idx := uint32(fdep.BitIndex) |
- if !atmpt.AddingDepsBitmap.IsSet(idx) { |
- atmpt.AddingDepsBitmap.Set(idx) |
- |
- if atmpt.AddingDepsBitmap.All(true) { |
- atmpt.MustModifyState(c, dm.Attempt_BLOCKED) |
- } |
- |
- needPut = true |
- } |
- |
- if f.DepIsFinished { |
- if !atmpt.WaitingDepBitmap.IsSet(idx) { |
- atmpt.WaitingDepBitmap.Set(idx) |
+ if !atmpt.DepMap.IsSet(idx) { |
+ atmpt.DepMap.Set(idx) |
- if atmpt.WaitingDepBitmap.All(true) { |
- atmpt.MustModifyState(c, dm.Attempt_NEEDS_EXECUTION) |
- muts = append(muts, &ScheduleExecution{For: f.Dep.From}) |
+ if atmpt.DepMap.All(true) { |
+ if err = atmpt.ModifyState(c, dm.Attempt_SCHEDULING); err != nil { |
+ return |
} |
- |
- needPut = true |
+ atmpt.DepMap.Reset() |
+ muts = append(muts, &ScheduleExecution{For: f.Dep.From}) |
} |
- } |
- if needPut { |
err = ds.Put(atmpt) |
} |