OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package mutate | 5 package mutate |
6 | 6 |
7 import ( | 7 import ( |
8 "github.com/luci/gae/service/datastore" | 8 "github.com/luci/gae/service/datastore" |
9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 9 "github.com/luci/luci-go/appengine/cmd/dm/model" |
10 "github.com/luci/luci-go/appengine/tumble" | 10 "github.com/luci/luci-go/appengine/tumble" |
11 "github.com/luci/luci-go/common/api/dm/service/v1" | 11 "github.com/luci/luci-go/common/api/dm/service/v1" |
12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
13 ) | 13 ) |
14 | 14 |
15 // Attempt to complete attempts 64-at-a-time. Rely on tumble's | 15 // Attempt to complete attempts 64-at-a-time. Rely on tumble's |
16 // tail-call optimization to save on transactions. | 16 // tail-call optimization to save on transactions. |
17 const completionLimit = 64 | 17 const completionLimit = 64 |
18 | 18 |
19 // RecordCompletion marks that fact that an Attempt is completed (Finished) on | 19 // RecordCompletion marks that fact that an Attempt is completed (Finished) on |
20 // its corresponding BackDepGroup, and fires off additional AckFwdDep mutations | 20 // its corresponding BackDepGroup, and fires off additional AckFwdDep mutations |
21 // for each incoming dependency that is blocked. | 21 // for each incoming dependency that is blocked. |
22 // | 22 // |
23 // In the case where an Attempt has hundreds or thousands of incoming | 23 // In the case where an Attempt has hundreds or thousands of incoming |
24 // dependencies, the naieve implementation of this mutation could easily | 24 // dependencies, the naive implementation of this mutation could easily overfill |
25 // overfill a single datastore transaction. For that reason, the implementation | 25 // a single datastore transaction. For that reason, the implementation here |
26 // here unblocks things 64 edges at a time, and keeps returning itself as a | 26 // unblocks things 64 edges at a time, and keeps returning itself as a mutation |
27 // mutation until it unblocks less than 64 things (e.g. it does a tail-call). | 27 // until it unblocks less than 64 things (e.g. it does a tail-call). |
28 // | 28 // |
29 // This relies on tumble's tail-call optimization to be performant in terms of | 29 // This relies on tumble's tail-call optimization to be performant in terms of |
30 // the number of transactions, otherwise this would take 1 transaction per | 30 // the number of transactions, otherwise this would take 1 transaction per |
31 // 64 dependencies. With the TCO, it could do hundreds or thousands of | 31 // 64 dependencies. With the TCO, it could do hundreds or thousands of |
32 // dependencies, but it will also be fair to other work (e.g. it will allow | 32 // dependencies, but it will also be fair to other work (e.g. it will allow |
33 // other Attempts to take dependencies on this Attempt while RecordCompletion | 33 // other Attempts to take dependencies on this Attempt while RecordCompletion |
34 // is in between tail-calls). | 34 // is in between tail-calls). |
35 type RecordCompletion struct { | 35 type RecordCompletion struct { |
36 » For *dm.Attempt_ID `datastore:",noindex"` | 36 » For *dm.Attempt_ID |
37 } | 37 } |
38 | 38 |
39 // Root implements tumble.Mutation. | 39 // Root implements tumble.Mutation. |
40 func (r *RecordCompletion) Root(c context.Context) *datastore.Key { | 40 func (r *RecordCompletion) Root(c context.Context) *datastore.Key { |
41 return datastore.Get(c).KeyForObj(&model.BackDepGroup{Dependee: *r.For}) | 41 return datastore.Get(c).KeyForObj(&model.BackDepGroup{Dependee: *r.For}) |
42 } | 42 } |
43 | 43 |
44 // RollForward implements tumble.Mutation. | 44 // RollForward implements tumble.Mutation. |
45 func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutatio
n, err error) { | 45 func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutatio
n, err error) { |
46 ds := datastore.Get(c) | 46 ds := datastore.Get(c) |
(...skipping 12 matching lines...) Expand all Loading... |
59 | 59 |
60 if err = ds.GetAll(q, &needProp); err != nil { | 60 if err = ds.GetAll(q, &needProp); err != nil { |
61 return | 61 return |
62 } | 62 } |
63 | 63 |
64 if len(needProp) > 0 { | 64 if len(needProp) > 0 { |
65 muts = make([]tumble.Mutation, len(needProp)) | 65 muts = make([]tumble.Mutation, len(needProp)) |
66 | 66 |
67 for i, bdep := range needProp { | 67 for i, bdep := range needProp { |
68 bdep.Propagated = true | 68 bdep.Propagated = true |
69 » » » muts[i] = &AckFwdDep{ | 69 » » » muts[i] = &AckFwdDep{bdep.Edge()} |
70 » » » » Dep: bdep.Edge(), | |
71 » » » » DepIsFinished: true, | |
72 » » » } | |
73 } | 70 } |
74 | 71 |
75 if len(needProp) == completionLimit { | 72 if len(needProp) == completionLimit { |
76 // Append ourself if there might be more to do! | 73 // Append ourself if there might be more to do! |
77 muts = append(muts, r) | 74 muts = append(muts, r) |
78 } | 75 } |
79 | 76 |
80 if err = ds.Put(needProp); err != nil { | 77 if err = ds.Put(needProp); err != nil { |
81 return | 78 return |
82 } | 79 } |
83 } | 80 } |
84 | 81 |
85 if !bdg.AttemptFinished { | 82 if !bdg.AttemptFinished { |
86 bdg.AttemptFinished = true | 83 bdg.AttemptFinished = true |
87 if err = ds.Put(bdg); err != nil { | 84 if err = ds.Put(bdg); err != nil { |
88 return | 85 return |
89 } | 86 } |
90 } | 87 } |
91 | 88 |
92 return | 89 return |
93 } | 90 } |
94 | 91 |
95 func init() { | 92 func init() { |
96 tumble.Register((*RecordCompletion)(nil)) | 93 tumble.Register((*RecordCompletion)(nil)) |
97 } | 94 } |
OLD | NEW |