Index: appengine/cmd/dm/mutate/add_deps.go |
diff --git a/appengine/cmd/dm/mutate/add_deps.go b/appengine/cmd/dm/mutate/add_deps.go |
index ef06a67d1d817aa971d94d1504d17937c3c031da..d1b888ad24e054086369850887cf6bcf5a17fee7 100644 |
--- a/appengine/cmd/dm/mutate/add_deps.go |
+++ b/appengine/cmd/dm/mutate/add_deps.go |
@@ -11,19 +11,19 @@ import ( |
"github.com/luci/luci-go/common/api/dm/service/v1" |
"github.com/luci/luci-go/common/bit_field" |
"github.com/luci/luci-go/common/grpcutil" |
+ "github.com/luci/luci-go/common/logging" |
"golang.org/x/net/context" |
"google.golang.org/grpc/codes" |
) |
// AddDeps transactionally stops the current execution and adds one or more |
-// dependencies. It assumes that, prior to execution, all Quests named by Deps |
-// have already been recorded globally. |
+// dependencies. |
type AddDeps struct { |
Auth *dm.Execution_Auth |
Quests []*model.Quest |
- // Atmpts is attempts we think are missing from the global graph. |
- Atmpts *dm.AttemptList |
+ // Attempts is attempts we think are missing from the global graph. |
+ Attempts *dm.AttemptList |
// Deps are fwddeps we think are missing from the auth'd attempt. |
Deps *dm.AttemptList |
@@ -31,12 +31,12 @@ type AddDeps struct { |
// Root implements tumble.Mutation |
func (a *AddDeps) Root(c context.Context) *datastore.Key { |
- return datastore.Get(c).KeyForObj(&model.Attempt{ID: *a.Auth.Id.AttemptID()}) |
+ return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID()) |
} |
// RollForward implements tumble.Mutation |
// |
-// This mutation is called directly, so we use return grpc errors. |
+// This mutation is called directly, so we return grpc errors. |
func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error) { |
// Invalidate the execution key so that they can't make more API calls. |
atmpt, _, err := model.InvalidateExecution(c, a.Auth) |
@@ -52,9 +52,8 @@ func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er |
ds := datastore.Get(c) |
- atmpt.AddingDepsBitmap = bf.Make(uint32(len(fwdDeps))) |
- atmpt.WaitingDepBitmap = bf.Make(uint32(len(fwdDeps))) |
- atmpt.MustModifyState(c, dm.Attempt_ADDING_DEPS) |
+ logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added deps") |
+ atmpt.DepMap = bf.Make(uint32(len(fwdDeps))) |
for i, fdp := range fwdDeps { |
fdp.BitIndex = uint32(i) |
@@ -70,22 +69,42 @@ func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er |
return |
} |
- muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Atmpts.GetTo())) |
- for _, d := range fwdDeps { |
- if nums, ok := a.Atmpts.GetTo()[d.Dependee.Quest]; ok { |
+ mergeQuestMap := map[string]*MergeQuest(nil) |
+ if len(a.Quests) > 0 { |
+ mergeQuestMap = make(map[string]*MergeQuest, len(a.Quests)) |
+ for _, q := range a.Quests { |
+ mergeQuestMap[q.ID] = &MergeQuest{Quest: q} |
+ } |
+ } |
+ |
+ muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+len(a.Quests)) |
+ for _, dep := range fwdDeps { |
+ toAppend := &muts |
+ if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil { |
+ toAppend = &mq.AndThen |
+ } |
+ |
+ if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok { |
for _, n := range nums.Nums { |
- if n == d.Dependee.Id { |
- muts = append(muts, &EnsureAttempt{ID: &d.Dependee}) |
+ if n == dep.Dependee.Id { |
+ *toAppend = append(*toAppend, &EnsureAttempt{ID: &dep.Dependee}) |
break |
} |
} |
} |
- muts = append(muts, &AddBackDep{ |
- Dep: d.Edge(), |
+ *toAppend = append(*toAppend, &AddBackDep{ |
+ Dep: dep.Edge(), |
NeedsAck: true, |
}) |
} |
+ // TODO(iannucci): This could run into datastore transaction limits. We could |
+ // allieviate this by only emitting a single mutation which does tail-calls to |
+ // decrease its own, unprocessed size by emitting new MergeQuest mutations. |
+ for _, mut := range mergeQuestMap { |
+ muts = append(muts, mut) |
+ } |
+ |
return |
} |