OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package mutate | 5 package mutate |
6 | 6 |
7 import ( | 7 import ( |
8 "github.com/luci/gae/service/datastore" | 8 "github.com/luci/gae/service/datastore" |
9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 9 "github.com/luci/luci-go/appengine/cmd/dm/model" |
10 "github.com/luci/luci-go/appengine/tumble" | 10 "github.com/luci/luci-go/appengine/tumble" |
11 "github.com/luci/luci-go/common/api/dm/service/v1" | 11 "github.com/luci/luci-go/common/api/dm/service/v1" |
12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
13 ) | 13 ) |
14 | 14 |
15 // AckFwdDep records the fact that a BackDep was successfully created for this | 15 // AckFwdDep records the fact that a dependency was completed. |
16 // FwdDep. It may also propagate Finished information (e.g. that the depended-on | |
17 // Attempt was actually already completed at the time that the dependency was | |
18 // taken on it). | |
19 // | |
20 // AckFwdDep is also used to propagate the fact that an Attempt (A) completed | |
21 // back to another Attempt (B) that's blocked on A. | |
22 type AckFwdDep struct { | 16 type AckFwdDep struct { |
23 » Dep *model.FwdEdge | 17 » Dep *model.FwdEdge |
24 » DepIsFinished bool | |
25 } | 18 } |
26 | 19 |
27 // Root implements tumble.Mutation. | 20 // Root implements tumble.Mutation. |
28 func (f *AckFwdDep) Root(c context.Context) *datastore.Key { | 21 func (f *AckFwdDep) Root(c context.Context) *datastore.Key { |
29 return datastore.Get(c).MakeKey("Attempt", f.Dep.From.DMEncoded()) | 22 return datastore.Get(c).MakeKey("Attempt", f.Dep.From.DMEncoded()) |
30 } | 23 } |
31 | 24 |
32 // RollForward implements tumble.Mutation. | 25 // RollForward implements tumble.Mutation. |
33 func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err
error) { | 26 func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err
error) { |
34 ds := datastore.Get(c) | 27 ds := datastore.Get(c) |
35 | 28 |
36 atmpt, fdep := f.Dep.Fwd(c) | 29 atmpt, fdep := f.Dep.Fwd(c) |
37 err = ds.GetMulti([]interface{}{atmpt, fdep}) | 30 err = ds.GetMulti([]interface{}{atmpt, fdep}) |
38 if err != nil { | 31 if err != nil { |
39 return | 32 return |
40 } | 33 } |
41 | 34 |
42 » // if the attempt and fdep aren't on the same execution, then bail | 35 » if atmpt.State != dm.Attempt_WAITING || atmpt.CurExecution != fdep.ForEx
ecution { |
43 » if atmpt.CurExecution != fdep.ForExecution { | |
44 return | 36 return |
45 } | 37 } |
46 | 38 |
47 needPut := false | |
48 | |
49 idx := uint32(fdep.BitIndex) | 39 idx := uint32(fdep.BitIndex) |
50 | 40 |
51 » if !atmpt.AddingDepsBitmap.IsSet(idx) { | 41 » if !atmpt.DepMap.IsSet(idx) { |
52 » » atmpt.AddingDepsBitmap.Set(idx) | 42 » » atmpt.DepMap.Set(idx) |
53 | 43 |
54 » » if atmpt.AddingDepsBitmap.All(true) { | 44 » » if atmpt.DepMap.All(true) { |
55 » » » atmpt.MustModifyState(c, dm.Attempt_BLOCKED) | 45 » » » if err = atmpt.ModifyState(c, dm.Attempt_SCHEDULING); er
r != nil { |
| 46 » » » » return |
| 47 » » » } |
| 48 » » » atmpt.DepMap.Reset() |
| 49 » » » muts = append(muts, &ScheduleExecution{For: f.Dep.From}) |
56 } | 50 } |
57 | 51 |
58 needPut = true | |
59 } | |
60 | |
61 if f.DepIsFinished { | |
62 if !atmpt.WaitingDepBitmap.IsSet(idx) { | |
63 atmpt.WaitingDepBitmap.Set(idx) | |
64 | |
65 if atmpt.WaitingDepBitmap.All(true) { | |
66 atmpt.MustModifyState(c, dm.Attempt_NEEDS_EXECUT
ION) | |
67 muts = append(muts, &ScheduleExecution{For: f.De
p.From}) | |
68 } | |
69 | |
70 needPut = true | |
71 } | |
72 } | |
73 | |
74 if needPut { | |
75 err = ds.Put(atmpt) | 52 err = ds.Put(atmpt) |
76 } | 53 } |
77 | 54 |
78 return | 55 return |
79 } | 56 } |
80 | 57 |
81 func init() { | 58 func init() { |
82 tumble.Register((*AckFwdDep)(nil)) | 59 tumble.Register((*AckFwdDep)(nil)) |
83 } | 60 } |
OLD | NEW |