Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(358)

Side by Side Diff: appengine/cmd/dm/mutate/add_deps.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/mutate/add_backdep_test.go ('k') | appengine/cmd/dm/mutate/add_deps_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package mutate 5 package mutate
6 6
7 import ( 7 import (
8 "github.com/luci/gae/service/datastore" 8 "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/appengine/cmd/dm/model" 9 "github.com/luci/luci-go/appengine/cmd/dm/model"
10 "github.com/luci/luci-go/appengine/tumble" 10 "github.com/luci/luci-go/appengine/tumble"
11 "github.com/luci/luci-go/common/api/dm/service/v1" 11 "github.com/luci/luci-go/common/api/dm/service/v1"
12 "github.com/luci/luci-go/common/bit_field" 12 "github.com/luci/luci-go/common/bit_field"
13 "github.com/luci/luci-go/common/grpcutil" 13 "github.com/luci/luci-go/common/grpcutil"
14 "github.com/luci/luci-go/common/logging"
14 "golang.org/x/net/context" 15 "golang.org/x/net/context"
15 "google.golang.org/grpc/codes" 16 "google.golang.org/grpc/codes"
16 ) 17 )
17 18
18 // AddDeps transactionally stops the current execution and adds one or more 19 // AddDeps transactionally stops the current execution and adds one or more
19 // dependencies. It assumes that, prior to execution, all Quests named by Deps 20 // dependencies.
20 // have already been recorded globally.
21 type AddDeps struct { 21 type AddDeps struct {
22 Auth *dm.Execution_Auth 22 Auth *dm.Execution_Auth
23 Quests []*model.Quest 23 Quests []*model.Quest
24 24
25 » // Atmpts is attempts we think are missing from the global graph. 25 » // Attempts is attempts we think are missing from the global graph.
26 » Atmpts *dm.AttemptList 26 » Attempts *dm.AttemptList
27 27
28 // Deps are fwddeps we think are missing from the auth'd attempt. 28 // Deps are fwddeps we think are missing from the auth'd attempt.
29 Deps *dm.AttemptList 29 Deps *dm.AttemptList
30 } 30 }
31 31
32 // Root implements tumble.Mutation 32 // Root implements tumble.Mutation
33 func (a *AddDeps) Root(c context.Context) *datastore.Key { 33 func (a *AddDeps) Root(c context.Context) *datastore.Key {
34 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *a.Auth.Id.AttemptI D()}) 34 » return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID())
35 } 35 }
36 36
37 // RollForward implements tumble.Mutation 37 // RollForward implements tumble.Mutation
38 // 38 //
39 // This mutation is called directly, so we use return grpc errors. 39 // This mutation is called directly, so we return grpc errors.
40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er ror) { 40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er ror) {
41 // Invalidate the execution key so that they can't make more API calls. 41 // Invalidate the execution key so that they can't make more API calls.
42 atmpt, _, err := model.InvalidateExecution(c, a.Auth) 42 atmpt, _, err := model.InvalidateExecution(c, a.Auth)
43 if err != nil { 43 if err != nil {
44 return 44 return
45 } 45 }
46 46
47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att emptID(), a.Deps)) 47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att emptID(), a.Deps))
48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps ") 48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps ")
49 if err != nil || len(fwdDeps) == 0 { 49 if err != nil || len(fwdDeps) == 0 {
50 return 50 return
51 } 51 }
52 52
53 ds := datastore.Get(c) 53 ds := datastore.Get(c)
54 54
55 » atmpt.AddingDepsBitmap = bf.Make(uint32(len(fwdDeps))) 55 » logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added d eps")
56 » atmpt.WaitingDepBitmap = bf.Make(uint32(len(fwdDeps))) 56 » atmpt.DepMap = bf.Make(uint32(len(fwdDeps)))
57 » atmpt.MustModifyState(c, dm.Attempt_ADDING_DEPS)
58 57
59 for i, fdp := range fwdDeps { 58 for i, fdp := range fwdDeps {
60 fdp.BitIndex = uint32(i) 59 fdp.BitIndex = uint32(i)
61 fdp.ForExecution = atmpt.CurExecution 60 fdp.ForExecution = atmpt.CurExecution
62 } 61 }
63 62
64 if err = ds.Put(fwdDeps); err != nil { 63 if err = ds.Put(fwdDeps); err != nil {
65 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g new fwdDeps") 64 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g new fwdDeps")
66 return 65 return
67 } 66 }
68 if err = ds.Put(atmpt); err != nil { 67 if err = ds.Put(atmpt); err != nil {
69 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g attempt") 68 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g attempt")
70 return 69 return
71 } 70 }
72 71
73 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Atmpts.GetTo())) 72 » mergeQuestMap := map[string]*MergeQuest(nil)
74 » for _, d := range fwdDeps { 73 » if len(a.Quests) > 0 {
75 » » if nums, ok := a.Atmpts.GetTo()[d.Dependee.Quest]; ok { 74 » » mergeQuestMap = make(map[string]*MergeQuest, len(a.Quests))
75 » » for _, q := range a.Quests {
76 » » » mergeQuestMap[q.ID] = &MergeQuest{Quest: q}
77 » » }
78 » }
79
80 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+l en(a.Quests))
81 » for _, dep := range fwdDeps {
82 » » toAppend := &muts
83 » » if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil {
84 » » » toAppend = &mq.AndThen
85 » » }
86
87 » » if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok {
76 for _, n := range nums.Nums { 88 for _, n := range nums.Nums {
77 » » » » if n == d.Dependee.Id { 89 » » » » if n == dep.Dependee.Id {
78 » » » » » muts = append(muts, &EnsureAttempt{ID: & d.Dependee}) 90 » » » » » *toAppend = append(*toAppend, &EnsureAtt empt{ID: &dep.Dependee})
79 break 91 break
80 } 92 }
81 } 93 }
82 } 94 }
83 » » muts = append(muts, &AddBackDep{ 95 » » *toAppend = append(*toAppend, &AddBackDep{
84 » » » Dep: d.Edge(), 96 » » » Dep: dep.Edge(),
85 NeedsAck: true, 97 NeedsAck: true,
86 }) 98 })
87 } 99 }
88 100
101 // TODO(iannucci): This could run into datastore transaction limits. We could
102 // allieviate this by only emitting a single mutation which does tail-ca lls to
103 // decrease its own, unprocessed size by emitting new MergeQuest mutatio ns.
104 for _, mut := range mergeQuestMap {
105 muts = append(muts, mut)
106 }
107
89 return 108 return
90 } 109 }
91 110
92 func init() { 111 func init() {
93 tumble.Register((*AddDeps)(nil)) 112 tumble.Register((*AddDeps)(nil))
94 } 113 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/mutate/add_backdep_test.go ('k') | appengine/cmd/dm/mutate/add_deps_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698