| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package mutate | 5 package mutate |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "github.com/luci/gae/service/datastore" | 8 "github.com/luci/gae/service/datastore" |
| 9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 9 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 10 "github.com/luci/luci-go/appengine/tumble" | 10 "github.com/luci/luci-go/appengine/tumble" |
| 11 "github.com/luci/luci-go/common/api/dm/service/v1" | 11 "github.com/luci/luci-go/common/api/dm/service/v1" |
| 12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
| 13 ) | 13 ) |
| 14 | 14 |
| 15 // Attempt to complete attempts 64-at-a-time. Rely on tumble's | 15 // Attempt to complete attempts 64-at-a-time. Rely on tumble's |
| 16 // tail-call optimization to save on transactions. | 16 // tail-call optimization to save on transactions. |
| 17 const completionLimit = 64 | 17 const completionLimit = 64 |
| 18 | 18 |
| 19 // RecordCompletion marks that fact that an Attempt is completed (Finished) on | 19 // RecordCompletion marks that fact that an Attempt is completed (Finished) on |
| 20 // its corresponding BackDepGroup, and fires off additional AckFwdDep mutations | 20 // its corresponding BackDepGroup, and fires off additional AckFwdDep mutations |
| 21 // for each incoming dependency that is blocked. | 21 // for each incoming dependency that is blocked. |
| 22 // | 22 // |
| 23 // In the case where an Attempt has hundreds or thousands of incoming | 23 // In the case where an Attempt has hundreds or thousands of incoming |
| 24 // dependencies, the naieve implementation of this mutation could easily | 24 // dependencies, the naive implementation of this mutation could easily overfill |
| 25 // overfill a single datastore transaction. For that reason, the implementation | 25 // a single datastore transaction. For that reason, the implementation here |
| 26 // here unblocks things 64 edges at a time, and keeps returning itself as a | 26 // unblocks things 64 edges at a time, and keeps returning itself as a mutation |
| 27 // mutation until it unblocks less than 64 things (e.g. it does a tail-call). | 27 // until it unblocks less than 64 things (e.g. it does a tail-call). |
| 28 // | 28 // |
| 29 // This relies on tumble's tail-call optimization to be performant in terms of | 29 // This relies on tumble's tail-call optimization to be performant in terms of |
| 30 // the number of transactions, otherwise this would take 1 transaction per | 30 // the number of transactions, otherwise this would take 1 transaction per |
| 31 // 64 dependencies. With the TCO, it could do hundreds or thousands of | 31 // 64 dependencies. With the TCO, it could do hundreds or thousands of |
| 32 // dependencies, but it will also be fair to other work (e.g. it will allow | 32 // dependencies, but it will also be fair to other work (e.g. it will allow |
| 33 // other Attempts to take dependencies on this Attempt while RecordCompletion | 33 // other Attempts to take dependencies on this Attempt while RecordCompletion |
| 34 // is in between tail-calls). | 34 // is in between tail-calls). |
| 35 type RecordCompletion struct { | 35 type RecordCompletion struct { |
| 36 » For *dm.Attempt_ID `datastore:",noindex"` | 36 » For *dm.Attempt_ID |
| 37 } | 37 } |
| 38 | 38 |
| 39 // Root implements tumble.Mutation. | 39 // Root implements tumble.Mutation. |
| 40 func (r *RecordCompletion) Root(c context.Context) *datastore.Key { | 40 func (r *RecordCompletion) Root(c context.Context) *datastore.Key { |
| 41 return datastore.Get(c).KeyForObj(&model.BackDepGroup{Dependee: *r.For}) | 41 return datastore.Get(c).KeyForObj(&model.BackDepGroup{Dependee: *r.For}) |
| 42 } | 42 } |
| 43 | 43 |
| 44 // RollForward implements tumble.Mutation. | 44 // RollForward implements tumble.Mutation. |
| 45 func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutatio
n, err error) { | 45 func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutatio
n, err error) { |
| 46 ds := datastore.Get(c) | 46 ds := datastore.Get(c) |
| (...skipping 12 matching lines...) Expand all Loading... |
| 59 | 59 |
| 60 if err = ds.GetAll(q, &needProp); err != nil { | 60 if err = ds.GetAll(q, &needProp); err != nil { |
| 61 return | 61 return |
| 62 } | 62 } |
| 63 | 63 |
| 64 if len(needProp) > 0 { | 64 if len(needProp) > 0 { |
| 65 muts = make([]tumble.Mutation, len(needProp)) | 65 muts = make([]tumble.Mutation, len(needProp)) |
| 66 | 66 |
| 67 for i, bdep := range needProp { | 67 for i, bdep := range needProp { |
| 68 bdep.Propagated = true | 68 bdep.Propagated = true |
| 69 » » » muts[i] = &AckFwdDep{ | 69 » » » muts[i] = &AckFwdDep{bdep.Edge()} |
| 70 » » » » Dep: bdep.Edge(), | |
| 71 » » » » DepIsFinished: true, | |
| 72 » » » } | |
| 73 } | 70 } |
| 74 | 71 |
| 75 if len(needProp) == completionLimit { | 72 if len(needProp) == completionLimit { |
| 76 // Append ourself if there might be more to do! | 73 // Append ourself if there might be more to do! |
| 77 muts = append(muts, r) | 74 muts = append(muts, r) |
| 78 } | 75 } |
| 79 | 76 |
| 80 if err = ds.Put(needProp); err != nil { | 77 if err = ds.Put(needProp); err != nil { |
| 81 return | 78 return |
| 82 } | 79 } |
| 83 } | 80 } |
| 84 | 81 |
| 85 if !bdg.AttemptFinished { | 82 if !bdg.AttemptFinished { |
| 86 bdg.AttemptFinished = true | 83 bdg.AttemptFinished = true |
| 87 if err = ds.Put(bdg); err != nil { | 84 if err = ds.Put(bdg); err != nil { |
| 88 return | 85 return |
| 89 } | 86 } |
| 90 } | 87 } |
| 91 | 88 |
| 92 return | 89 return |
| 93 } | 90 } |
| 94 | 91 |
| 95 func init() { | 92 func init() { |
| 96 tumble.Register((*RecordCompletion)(nil)) | 93 tumble.Register((*RecordCompletion)(nil)) |
| 97 } | 94 } |
| OLD | NEW |