Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2335)

Unified Diff: appengine/cmd/dm/mutate/add_deps.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/dm/mutate/add_backdep_test.go ('k') | appengine/cmd/dm/mutate/add_deps_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/dm/mutate/add_deps.go
diff --git a/appengine/cmd/dm/mutate/add_deps.go b/appengine/cmd/dm/mutate/add_deps.go
index ef06a67d1d817aa971d94d1504d17937c3c031da..d1b888ad24e054086369850887cf6bcf5a17fee7 100644
--- a/appengine/cmd/dm/mutate/add_deps.go
+++ b/appengine/cmd/dm/mutate/add_deps.go
@@ -11,19 +11,19 @@ import (
"github.com/luci/luci-go/common/api/dm/service/v1"
"github.com/luci/luci-go/common/bit_field"
"github.com/luci/luci-go/common/grpcutil"
+ "github.com/luci/luci-go/common/logging"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)
// AddDeps transactionally stops the current execution and adds one or more
-// dependencies. It assumes that, prior to execution, all Quests named by Deps
-// have already been recorded globally.
+// dependencies.
type AddDeps struct {
Auth *dm.Execution_Auth
Quests []*model.Quest
- // Atmpts is attempts we think are missing from the global graph.
- Atmpts *dm.AttemptList
+ // Attempts is attempts we think are missing from the global graph.
+ Attempts *dm.AttemptList
// Deps are fwddeps we think are missing from the auth'd attempt.
Deps *dm.AttemptList
@@ -31,12 +31,12 @@ type AddDeps struct {
// Root implements tumble.Mutation
func (a *AddDeps) Root(c context.Context) *datastore.Key {
- return datastore.Get(c).KeyForObj(&model.Attempt{ID: *a.Auth.Id.AttemptID()})
+ return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID())
}
// RollForward implements tumble.Mutation
//
-// This mutation is called directly, so we use return grpc errors.
+// This mutation is called directly, so we return grpc errors.
func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error) {
// Invalidate the execution key so that they can't make more API calls.
atmpt, _, err := model.InvalidateExecution(c, a.Auth)
@@ -52,9 +52,8 @@ func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er
ds := datastore.Get(c)
- atmpt.AddingDepsBitmap = bf.Make(uint32(len(fwdDeps)))
- atmpt.WaitingDepBitmap = bf.Make(uint32(len(fwdDeps)))
- atmpt.MustModifyState(c, dm.Attempt_ADDING_DEPS)
+ logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added deps")
+ atmpt.DepMap = bf.Make(uint32(len(fwdDeps)))
for i, fdp := range fwdDeps {
fdp.BitIndex = uint32(i)
@@ -70,22 +69,42 @@ func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er
return
}
- muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Atmpts.GetTo()))
- for _, d := range fwdDeps {
- if nums, ok := a.Atmpts.GetTo()[d.Dependee.Quest]; ok {
+ mergeQuestMap := map[string]*MergeQuest(nil)
+ if len(a.Quests) > 0 {
+ mergeQuestMap = make(map[string]*MergeQuest, len(a.Quests))
+ for _, q := range a.Quests {
+ mergeQuestMap[q.ID] = &MergeQuest{Quest: q}
+ }
+ }
+
+ muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+len(a.Quests))
+ for _, dep := range fwdDeps {
+ toAppend := &muts
+ if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil {
+ toAppend = &mq.AndThen
+ }
+
+ if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok {
for _, n := range nums.Nums {
- if n == d.Dependee.Id {
- muts = append(muts, &EnsureAttempt{ID: &d.Dependee})
+ if n == dep.Dependee.Id {
+ *toAppend = append(*toAppend, &EnsureAttempt{ID: &dep.Dependee})
break
}
}
}
- muts = append(muts, &AddBackDep{
- Dep: d.Edge(),
+ *toAppend = append(*toAppend, &AddBackDep{
+ Dep: dep.Edge(),
NeedsAck: true,
})
}
+ // TODO(iannucci): This could run into datastore transaction limits. We could
+ // allieviate this by only emitting a single mutation which does tail-calls to
+ // decrease its own, unprocessed size by emitting new MergeQuest mutations.
+ for _, mut := range mergeQuestMap {
+ muts = append(muts, mut)
+ }
+
return
}
« no previous file with comments | « appengine/cmd/dm/mutate/add_backdep_test.go ('k') | appengine/cmd/dm/mutate/add_deps_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698