OLD | NEW |
---|---|
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package mutate | 5 package mutate |
6 | 6 |
7 import ( | 7 import ( |
8 "github.com/luci/gae/service/datastore" | 8 "github.com/luci/gae/service/datastore" |
9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 9 "github.com/luci/luci-go/appengine/cmd/dm/model" |
10 "github.com/luci/luci-go/appengine/tumble" | 10 "github.com/luci/luci-go/appengine/tumble" |
11 "github.com/luci/luci-go/common/api/dm/service/v1" | 11 "github.com/luci/luci-go/common/api/dm/service/v1" |
12 "github.com/luci/luci-go/common/bit_field" | 12 "github.com/luci/luci-go/common/bit_field" |
13 "github.com/luci/luci-go/common/grpcutil" | 13 "github.com/luci/luci-go/common/grpcutil" |
14 "github.com/luci/luci-go/common/logging" | |
14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
15 "google.golang.org/grpc/codes" | 16 "google.golang.org/grpc/codes" |
16 ) | 17 ) |
17 | 18 |
18 // AddDeps transactionally stops the current execution and adds one or more | 19 // AddDeps transactionally stops the current execution and adds one or more |
19 // dependencies. It assumes that, prior to execution, all Quests named by Deps | 20 // dependencies. |
20 // have already been recorded globally. | |
21 type AddDeps struct { | 21 type AddDeps struct { |
22 Auth *dm.Execution_Auth | 22 Auth *dm.Execution_Auth |
23 Quests []*model.Quest | 23 Quests []*model.Quest |
24 | 24 |
25 » // Atmpts is attempts we think are missing from the global graph. | 25 » // Attempts is attempts we think are missing from the global graph. |
26 » Atmpts *dm.AttemptList | 26 » Attempts *dm.AttemptList |
27 | 27 |
28 // Deps are fwddeps we think are missing from the auth'd attempt. | 28 // Deps are fwddeps we think are missing from the auth'd attempt. |
29 Deps *dm.AttemptList | 29 Deps *dm.AttemptList |
30 } | 30 } |
31 | 31 |
32 // Root implements tumble.Mutation | 32 // Root implements tumble.Mutation |
33 func (a *AddDeps) Root(c context.Context) *datastore.Key { | 33 func (a *AddDeps) Root(c context.Context) *datastore.Key { |
34 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *a.Auth.Id.AttemptI D()}) | 34 » return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID()) |
35 } | 35 } |
36 | 36 |
37 // RollForward implements tumble.Mutation | 37 // RollForward implements tumble.Mutation |
38 // | 38 // |
39 // This mutation is called directly, so we use return grpc errors. | 39 // This mutation is called directly, so we return grpc errors. |
40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er ror) { | 40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er ror) { |
41 // Invalidate the execution key so that they can't make more API calls. | 41 // Invalidate the execution key so that they can't make more API calls. |
42 atmpt, _, err := model.InvalidateExecution(c, a.Auth) | 42 atmpt, _, err := model.InvalidateExecution(c, a.Auth) |
43 if err != nil { | 43 if err != nil { |
44 return | 44 return |
45 } | 45 } |
46 | 46 |
47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att emptID(), a.Deps)) | 47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att emptID(), a.Deps)) |
48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps ") | 48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps ") |
49 if err != nil || len(fwdDeps) == 0 { | 49 if err != nil || len(fwdDeps) == 0 { |
50 return | 50 return |
51 } | 51 } |
52 | 52 |
53 ds := datastore.Get(c) | 53 ds := datastore.Get(c) |
54 | 54 |
55 » atmpt.AddingDepsBitmap = bf.Make(uint32(len(fwdDeps))) | 55 » logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added d eps") |
56 » atmpt.WaitingDepBitmap = bf.Make(uint32(len(fwdDeps))) | 56 » atmpt.DepMap = bf.Make(uint32(len(fwdDeps))) |
57 » atmpt.MustModifyState(c, dm.Attempt_ADDING_DEPS) | |
58 | 57 |
59 for i, fdp := range fwdDeps { | 58 for i, fdp := range fwdDeps { |
60 fdp.BitIndex = uint32(i) | 59 fdp.BitIndex = uint32(i) |
61 fdp.ForExecution = atmpt.CurExecution | 60 fdp.ForExecution = atmpt.CurExecution |
62 } | 61 } |
63 | 62 |
64 if err = ds.PutMulti(fwdDeps); err != nil { | 63 if err = ds.PutMulti(fwdDeps); err != nil { |
65 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g new fwdDeps") | 64 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g new fwdDeps") |
66 return | 65 return |
67 } | 66 } |
68 if err = ds.Put(atmpt); err != nil { | 67 if err = ds.Put(atmpt); err != nil { |
69 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g attempt") | 68 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g attempt") |
70 return | 69 return |
71 } | 70 } |
72 | 71 |
73 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Atmpts.GetTo())) | 72 » mergeQuestMap := map[string]*MergeQuest{} |
dnj (Google)
2016/06/09 18:00:56
nit: avoid an allocation if not needed by surround
iannucci
2016/06/15 00:46:01
Done
| |
74 » for _, d := range fwdDeps { | 73 » for _, q := range a.Quests { |
75 » » if nums, ok := a.Atmpts.GetTo()[d.Dependee.Quest]; ok { | 74 » » mergeQuestMap[q.ID] = &MergeQuest{Quest: q} |
75 » } | |
76 | |
77 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+l en(a.Quests)) | |
78 » for _, dep := range fwdDeps { | |
79 » » toAppend := &muts | |
80 » » if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil { | |
81 » » » toAppend = &mq.AndThen | |
82 » » } | |
83 | |
84 » » if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok { | |
76 for _, n := range nums.Nums { | 85 for _, n := range nums.Nums { |
77 » » » » if n == d.Dependee.Id { | 86 » » » » if n == dep.Dependee.Id { |
78 » » » » » muts = append(muts, &EnsureAttempt{ID: & d.Dependee}) | 87 » » » » » *toAppend = append(*toAppend, &EnsureAtt empt{ID: &dep.Dependee}) |
79 break | 88 break |
80 } | 89 } |
81 } | 90 } |
82 } | 91 } |
83 » » muts = append(muts, &AddBackDep{ | 92 » » *toAppend = append(*toAppend, &AddBackDep{ |
84 » » » Dep: d.Edge(), | 93 » » » Dep: dep.Edge(), |
85 NeedsAck: true, | 94 NeedsAck: true, |
86 }) | 95 }) |
87 } | 96 } |
88 | 97 |
98 for _, mut := range mergeQuestMap { | |
dnj (Google)
2016/06/09 18:00:56
Any chance this will ever exceed maximum datastore
iannucci
2016/06/15 00:46:01
Possibly, but that would require a lot to hit (~de
| |
99 muts = append(muts, mut) | |
100 } | |
101 | |
89 return | 102 return |
90 } | 103 } |
91 | 104 |
92 func init() { | 105 func init() { |
93 tumble.Register((*AddDeps)(nil)) | 106 tumble.Register((*AddDeps)(nil)) |
94 } | 107 } |
OLD | NEW |