| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package mutate | 5 package mutate |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "github.com/luci/gae/service/datastore" | 8 "github.com/luci/gae/service/datastore" |
| 9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 9 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 10 "github.com/luci/luci-go/appengine/tumble" | 10 "github.com/luci/luci-go/appengine/tumble" |
| 11 "github.com/luci/luci-go/common/api/dm/service/v1" | 11 "github.com/luci/luci-go/common/api/dm/service/v1" |
| 12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
| 13 ) | 13 ) |
| 14 | 14 |
| 15 // AckFwdDep records the fact that a BackDep was successfully created for this | 15 // AckFwdDep records the fact that a dependency was completed. |
| 16 // FwdDep. It may also propagate Finished information (e.g. that the depended-on | |
| 17 // Attempt was actually already completed at the time that the dependency was | |
| 18 // taken on it). | |
| 19 // | |
| 20 // AckFwdDep is also used to propagate the fact that an Attempt (A) completed | |
| 21 // back to another Attempt (B) that's blocked on A. | |
| 22 type AckFwdDep struct { | 16 type AckFwdDep struct { |
| 23 » Dep *model.FwdEdge | 17 » Dep *model.FwdEdge |
| 24 » DepIsFinished bool | |
| 25 } | 18 } |
| 26 | 19 |
| 27 // Root implements tumble.Mutation. | 20 // Root implements tumble.Mutation. |
| 28 func (f *AckFwdDep) Root(c context.Context) *datastore.Key { | 21 func (f *AckFwdDep) Root(c context.Context) *datastore.Key { |
| 29 return datastore.Get(c).MakeKey("Attempt", f.Dep.From.DMEncoded()) | 22 return datastore.Get(c).MakeKey("Attempt", f.Dep.From.DMEncoded()) |
| 30 } | 23 } |
| 31 | 24 |
| 32 // RollForward implements tumble.Mutation. | 25 // RollForward implements tumble.Mutation. |
| 33 func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err
error) { | 26 func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err
error) { |
| 34 ds := datastore.Get(c) | 27 ds := datastore.Get(c) |
| 35 | 28 |
| 36 atmpt, fdep := f.Dep.Fwd(c) | 29 atmpt, fdep := f.Dep.Fwd(c) |
| 37 err = ds.GetMulti([]interface{}{atmpt, fdep}) | 30 err = ds.GetMulti([]interface{}{atmpt, fdep}) |
| 38 if err != nil { | 31 if err != nil { |
| 39 return | 32 return |
| 40 } | 33 } |
| 41 | 34 |
| 42 » // if the attempt and fdep aren't on the same execution, then bail | 35 » if atmpt.State != dm.Attempt_WAITING || atmpt.CurExecution != fdep.ForEx
ecution { |
| 43 » if atmpt.CurExecution != fdep.ForExecution { | |
| 44 return | 36 return |
| 45 } | 37 } |
| 46 | 38 |
| 47 needPut := false | |
| 48 | |
| 49 idx := uint32(fdep.BitIndex) | 39 idx := uint32(fdep.BitIndex) |
| 50 | 40 |
| 51 » if !atmpt.AddingDepsBitmap.IsSet(idx) { | 41 » if !atmpt.DepMap.IsSet(idx) { |
| 52 » » atmpt.AddingDepsBitmap.Set(idx) | 42 » » atmpt.DepMap.Set(idx) |
| 53 | 43 |
| 54 » » if atmpt.AddingDepsBitmap.All(true) { | 44 » » if atmpt.DepMap.All(true) { |
| 55 » » » atmpt.MustModifyState(c, dm.Attempt_BLOCKED) | 45 » » » if err = atmpt.ModifyState(c, dm.Attempt_SCHEDULING); er
r != nil { |
| 46 » » » » return |
| 47 » » » } |
| 48 » » » atmpt.DepMap.Reset() |
| 49 » » » muts = append(muts, &ScheduleExecution{For: f.Dep.From}) |
| 56 } | 50 } |
| 57 | 51 |
| 58 needPut = true | |
| 59 } | |
| 60 | |
| 61 if f.DepIsFinished { | |
| 62 if !atmpt.WaitingDepBitmap.IsSet(idx) { | |
| 63 atmpt.WaitingDepBitmap.Set(idx) | |
| 64 | |
| 65 if atmpt.WaitingDepBitmap.All(true) { | |
| 66 atmpt.MustModifyState(c, dm.Attempt_NEEDS_EXECUT
ION) | |
| 67 muts = append(muts, &ScheduleExecution{For: f.De
p.From}) | |
| 68 } | |
| 69 | |
| 70 needPut = true | |
| 71 } | |
| 72 } | |
| 73 | |
| 74 if needPut { | |
| 75 err = ds.Put(atmpt) | 52 err = ds.Put(atmpt) |
| 76 } | 53 } |
| 77 | 54 |
| 78 return | 55 return |
| 79 } | 56 } |
| 80 | 57 |
| 81 func init() { | 58 func init() { |
| 82 tumble.Register((*AckFwdDep)(nil)) | 59 tumble.Register((*AckFwdDep)(nil)) |
| 83 } | 60 } |
| OLD | NEW |