Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package mutate | 5 package mutate |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "github.com/luci/gae/service/datastore" | 8 "github.com/luci/gae/service/datastore" |
| 9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 9 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 10 "github.com/luci/luci-go/appengine/tumble" | 10 "github.com/luci/luci-go/appengine/tumble" |
| 11 "github.com/luci/luci-go/common/api/dm/service/v1" | 11 "github.com/luci/luci-go/common/api/dm/service/v1" |
| 12 "github.com/luci/luci-go/common/bit_field" | 12 "github.com/luci/luci-go/common/bit_field" |
| 13 "github.com/luci/luci-go/common/grpcutil" | 13 "github.com/luci/luci-go/common/grpcutil" |
| 14 "github.com/luci/luci-go/common/logging" | |
| 14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 15 "google.golang.org/grpc/codes" | 16 "google.golang.org/grpc/codes" |
| 16 ) | 17 ) |
| 17 | 18 |
| 18 // AddDeps transactionally stops the current execution and adds one or more | 19 // AddDeps transactionally stops the current execution and adds one or more |
| 19 // dependencies. It assumes that, prior to execution, all Quests named by Deps | 20 // dependencies. |
| 20 // have already been recorded globally. | |
| 21 type AddDeps struct { | 21 type AddDeps struct { |
| 22 Auth *dm.Execution_Auth | 22 Auth *dm.Execution_Auth |
| 23 Quests []*model.Quest | 23 Quests []*model.Quest |
| 24 | 24 |
| 25 » // Atmpts is attempts we think are missing from the global graph. | 25 » // Attempts is attempts we think are missing from the global graph. |
| 26 » Atmpts *dm.AttemptList | 26 » Attempts *dm.AttemptList |
| 27 | 27 |
| 28 // Deps are fwddeps we think are missing from the auth'd attempt. | 28 // Deps are fwddeps we think are missing from the auth'd attempt. |
| 29 Deps *dm.AttemptList | 29 Deps *dm.AttemptList |
| 30 } | 30 } |
| 31 | 31 |
| 32 // Root implements tumble.Mutation | 32 // Root implements tumble.Mutation |
| 33 func (a *AddDeps) Root(c context.Context) *datastore.Key { | 33 func (a *AddDeps) Root(c context.Context) *datastore.Key { |
| 34 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *a.Auth.Id.AttemptI D()}) | 34 » return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID()) |
| 35 } | 35 } |
| 36 | 36 |
| 37 // RollForward implements tumble.Mutation | 37 // RollForward implements tumble.Mutation |
| 38 // | 38 // |
| 39 // This mutation is called directly, so we use return grpc errors. | 39 // This mutation is called directly, so we return grpc errors. |
| 40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er ror) { | 40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er ror) { |
| 41 // Invalidate the execution key so that they can't make more API calls. | 41 // Invalidate the execution key so that they can't make more API calls. |
| 42 atmpt, _, err := model.InvalidateExecution(c, a.Auth) | 42 atmpt, _, err := model.InvalidateExecution(c, a.Auth) |
| 43 if err != nil { | 43 if err != nil { |
| 44 return | 44 return |
| 45 } | 45 } |
| 46 | 46 |
| 47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att emptID(), a.Deps)) | 47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att emptID(), a.Deps)) |
| 48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps ") | 48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps ") |
| 49 if err != nil || len(fwdDeps) == 0 { | 49 if err != nil || len(fwdDeps) == 0 { |
| 50 return | 50 return |
| 51 } | 51 } |
| 52 | 52 |
| 53 ds := datastore.Get(c) | 53 ds := datastore.Get(c) |
| 54 | 54 |
| 55 » atmpt.AddingDepsBitmap = bf.Make(uint32(len(fwdDeps))) | 55 » logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added d eps") |
| 56 » atmpt.WaitingDepBitmap = bf.Make(uint32(len(fwdDeps))) | 56 » atmpt.DepMap = bf.Make(uint32(len(fwdDeps))) |
| 57 » atmpt.MustModifyState(c, dm.Attempt_ADDING_DEPS) | |
| 58 | 57 |
| 59 for i, fdp := range fwdDeps { | 58 for i, fdp := range fwdDeps { |
| 60 fdp.BitIndex = uint32(i) | 59 fdp.BitIndex = uint32(i) |
| 61 fdp.ForExecution = atmpt.CurExecution | 60 fdp.ForExecution = atmpt.CurExecution |
| 62 } | 61 } |
| 63 | 62 |
| 64 if err = ds.PutMulti(fwdDeps); err != nil { | 63 if err = ds.PutMulti(fwdDeps); err != nil { |
| 65 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g new fwdDeps") | 64 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g new fwdDeps") |
| 66 return | 65 return |
| 67 } | 66 } |
| 68 if err = ds.Put(atmpt); err != nil { | 67 if err = ds.Put(atmpt); err != nil { |
| 69 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g attempt") | 68 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin g attempt") |
| 70 return | 69 return |
| 71 } | 70 } |
| 72 | 71 |
| 73 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Atmpts.GetTo())) | 72 » mergeQuestMap := map[string]*MergeQuest{} |
|
dnj (Google)
2016/06/09 18:00:56
nit: avoid an allocation if not needed by surround
iannucci
2016/06/15 00:46:01
Done
| |
| 74 » for _, d := range fwdDeps { | 73 » for _, q := range a.Quests { |
| 75 » » if nums, ok := a.Atmpts.GetTo()[d.Dependee.Quest]; ok { | 74 » » mergeQuestMap[q.ID] = &MergeQuest{Quest: q} |
| 75 » } | |
| 76 | |
| 77 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+l en(a.Quests)) | |
| 78 » for _, dep := range fwdDeps { | |
| 79 » » toAppend := &muts | |
| 80 » » if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil { | |
| 81 » » » toAppend = &mq.AndThen | |
| 82 » » } | |
| 83 | |
| 84 » » if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok { | |
| 76 for _, n := range nums.Nums { | 85 for _, n := range nums.Nums { |
| 77 » » » » if n == d.Dependee.Id { | 86 » » » » if n == dep.Dependee.Id { |
| 78 » » » » » muts = append(muts, &EnsureAttempt{ID: & d.Dependee}) | 87 » » » » » *toAppend = append(*toAppend, &EnsureAtt empt{ID: &dep.Dependee}) |
| 79 break | 88 break |
| 80 } | 89 } |
| 81 } | 90 } |
| 82 } | 91 } |
| 83 » » muts = append(muts, &AddBackDep{ | 92 » » *toAppend = append(*toAppend, &AddBackDep{ |
| 84 » » » Dep: d.Edge(), | 93 » » » Dep: dep.Edge(), |
| 85 NeedsAck: true, | 94 NeedsAck: true, |
| 86 }) | 95 }) |
| 87 } | 96 } |
| 88 | 97 |
| 98 for _, mut := range mergeQuestMap { | |
|
dnj (Google)
2016/06/09 18:00:56
Any chance this will ever exceed maximum datastore
iannucci
2016/06/15 00:46:01
Possibly, but that would require a lot to hit (~de
| |
| 99 muts = append(muts, mut) | |
| 100 } | |
| 101 | |
| 89 return | 102 return |
| 90 } | 103 } |
| 91 | 104 |
| 92 func init() { | 105 func init() { |
| 93 tumble.Register((*AddDeps)(nil)) | 106 tumble.Register((*AddDeps)(nil)) |
| 94 } | 107 } |
| OLD | NEW |