| Index: appengine/cmd/dm/mutate/add_deps.go
|
| diff --git a/appengine/cmd/dm/mutate/add_deps.go b/appengine/cmd/dm/mutate/add_deps.go
|
| index ef06a67d1d817aa971d94d1504d17937c3c031da..d1b888ad24e054086369850887cf6bcf5a17fee7 100644
|
| --- a/appengine/cmd/dm/mutate/add_deps.go
|
| +++ b/appengine/cmd/dm/mutate/add_deps.go
|
| @@ -11,19 +11,19 @@ import (
|
| "github.com/luci/luci-go/common/api/dm/service/v1"
|
| "github.com/luci/luci-go/common/bit_field"
|
| "github.com/luci/luci-go/common/grpcutil"
|
| + "github.com/luci/luci-go/common/logging"
|
| "golang.org/x/net/context"
|
| "google.golang.org/grpc/codes"
|
| )
|
|
|
| // AddDeps transactionally stops the current execution and adds one or more
|
| -// dependencies. It assumes that, prior to execution, all Quests named by Deps
|
| -// have already been recorded globally.
|
| +// dependencies.
|
| type AddDeps struct {
|
| Auth *dm.Execution_Auth
|
| Quests []*model.Quest
|
|
|
| - // Atmpts is attempts we think are missing from the global graph.
|
| - Atmpts *dm.AttemptList
|
| + // Attempts is attempts we think are missing from the global graph.
|
| + Attempts *dm.AttemptList
|
|
|
| // Deps are fwddeps we think are missing from the auth'd attempt.
|
| Deps *dm.AttemptList
|
| @@ -31,12 +31,12 @@ type AddDeps struct {
|
|
|
| // Root implements tumble.Mutation
|
| func (a *AddDeps) Root(c context.Context) *datastore.Key {
|
| - return datastore.Get(c).KeyForObj(&model.Attempt{ID: *a.Auth.Id.AttemptID()})
|
| + return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID())
|
| }
|
|
|
| // RollForward implements tumble.Mutation
|
| //
|
| -// This mutation is called directly, so we use return grpc errors.
|
| +// This mutation is called directly, so we return grpc errors.
|
| func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error) {
|
| // Invalidate the execution key so that they can't make more API calls.
|
| atmpt, _, err := model.InvalidateExecution(c, a.Auth)
|
| @@ -52,9 +52,8 @@ func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er
|
|
|
| ds := datastore.Get(c)
|
|
|
| - atmpt.AddingDepsBitmap = bf.Make(uint32(len(fwdDeps)))
|
| - atmpt.WaitingDepBitmap = bf.Make(uint32(len(fwdDeps)))
|
| - atmpt.MustModifyState(c, dm.Attempt_ADDING_DEPS)
|
| + logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added deps")
|
| + atmpt.DepMap = bf.Make(uint32(len(fwdDeps)))
|
|
|
| for i, fdp := range fwdDeps {
|
| fdp.BitIndex = uint32(i)
|
| @@ -70,22 +69,42 @@ func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er
|
| return
|
| }
|
|
|
| - muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Atmpts.GetTo()))
|
| - for _, d := range fwdDeps {
|
| - if nums, ok := a.Atmpts.GetTo()[d.Dependee.Quest]; ok {
|
| + mergeQuestMap := map[string]*MergeQuest(nil)
|
| + if len(a.Quests) > 0 {
|
| + mergeQuestMap = make(map[string]*MergeQuest, len(a.Quests))
|
| + for _, q := range a.Quests {
|
| + mergeQuestMap[q.ID] = &MergeQuest{Quest: q}
|
| + }
|
| + }
|
| +
|
| + muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+len(a.Quests))
|
| + for _, dep := range fwdDeps {
|
| + toAppend := &muts
|
| + if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil {
|
| + toAppend = &mq.AndThen
|
| + }
|
| +
|
| + if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok {
|
| for _, n := range nums.Nums {
|
| - if n == d.Dependee.Id {
|
| - muts = append(muts, &EnsureAttempt{ID: &d.Dependee})
|
| + if n == dep.Dependee.Id {
|
| + *toAppend = append(*toAppend, &EnsureAttempt{ID: &dep.Dependee})
|
| break
|
| }
|
| }
|
| }
|
| - muts = append(muts, &AddBackDep{
|
| - Dep: d.Edge(),
|
| + *toAppend = append(*toAppend, &AddBackDep{
|
| + Dep: dep.Edge(),
|
| NeedsAck: true,
|
| })
|
| }
|
|
|
| + // TODO(iannucci): This could run into datastore transaction limits. We could
|
| + // allieviate this by only emitting a single mutation which does tail-calls to
|
| + // decrease its own, unprocessed size by emitting new MergeQuest mutations.
|
| + for _, mut := range mergeQuestMap {
|
| + muts = append(muts, mut)
|
| + }
|
| +
|
| return
|
| }
|
|
|
|
|