Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(409)

Side by Side Diff: appengine/cmd/dm/mutate/record_completion.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package mutate 5 package mutate
6 6
7 import ( 7 import (
8 "github.com/luci/gae/service/datastore" 8 "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/appengine/cmd/dm/model" 9 "github.com/luci/luci-go/appengine/cmd/dm/model"
10 "github.com/luci/luci-go/appengine/tumble" 10 "github.com/luci/luci-go/appengine/tumble"
11 "github.com/luci/luci-go/common/api/dm/service/v1" 11 "github.com/luci/luci-go/common/api/dm/service/v1"
12 "golang.org/x/net/context" 12 "golang.org/x/net/context"
13 ) 13 )
14 14
15 // Attempt to complete attempts 64-at-a-time. Rely on tumble's 15 // Attempt to complete attempts 64-at-a-time. Rely on tumble's
16 // tail-call optimization to save on transactions. 16 // tail-call optimization to save on transactions.
17 const completionLimit = 64 17 const completionLimit = 64
18 18
19 // RecordCompletion marks that fact that an Attempt is completed (Finished) on 19 // RecordCompletion marks that fact that an Attempt is completed (Finished) on
20 // its corresponding BackDepGroup, and fires off additional AckFwdDep mutations 20 // its corresponding BackDepGroup, and fires off additional AckFwdDep mutations
21 // for each incoming dependency that is blocked. 21 // for each incoming dependency that is blocked.
22 // 22 //
23 // In the case where an Attempt has hundreds or thousands of incoming 23 // In the case where an Attempt has hundreds or thousands of incoming
24 // dependencies, the naieve implementation of this mutation could easily 24 // dependencies, the naive implementation of this mutation could easily overfill
25 // overfill a single datastore transaction. For that reason, the implementation 25 // a single datastore transaction. For that reason, the implementation here
26 // here unblocks things 64 edges at a time, and keeps returning itself as a 26 // unblocks things 64 edges at a time, and keeps returning itself as a mutation
27 // mutation until it unblocks less than 64 things (e.g. it does a tail-call). 27 // until it unblocks less than 64 things (e.g. it does a tail-call).
28 // 28 //
29 // This relies on tumble's tail-call optimization to be performant in terms of 29 // This relies on tumble's tail-call optimization to be performant in terms of
30 // the number of transactions, otherwise this would take 1 transaction per 30 // the number of transactions, otherwise this would take 1 transaction per
31 // 64 dependencies. With the TCO, it could do hundreds or thousands of 31 // 64 dependencies. With the TCO, it could do hundreds or thousands of
32 // dependencies, but it will also be fair to other work (e.g. it will allow 32 // dependencies, but it will also be fair to other work (e.g. it will allow
33 // other Attempts to take dependencies on this Attempt while RecordCompletion 33 // other Attempts to take dependencies on this Attempt while RecordCompletion
34 // is in between tail-calls). 34 // is in between tail-calls).
35 type RecordCompletion struct { 35 type RecordCompletion struct {
36 » For *dm.Attempt_ID `datastore:",noindex"` 36 » For *dm.Attempt_ID
37 } 37 }
38 38
39 // Root implements tumble.Mutation. 39 // Root implements tumble.Mutation.
40 func (r *RecordCompletion) Root(c context.Context) *datastore.Key { 40 func (r *RecordCompletion) Root(c context.Context) *datastore.Key {
41 return datastore.Get(c).KeyForObj(&model.BackDepGroup{Dependee: *r.For}) 41 return datastore.Get(c).KeyForObj(&model.BackDepGroup{Dependee: *r.For})
42 } 42 }
43 43
44 // RollForward implements tumble.Mutation. 44 // RollForward implements tumble.Mutation.
45 func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutatio n, err error) { 45 func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutatio n, err error) {
46 ds := datastore.Get(c) 46 ds := datastore.Get(c)
(...skipping 12 matching lines...) Expand all
59 59
60 if err = ds.GetAll(q, &needProp); err != nil { 60 if err = ds.GetAll(q, &needProp); err != nil {
61 return 61 return
62 } 62 }
63 63
64 if len(needProp) > 0 { 64 if len(needProp) > 0 {
65 muts = make([]tumble.Mutation, len(needProp)) 65 muts = make([]tumble.Mutation, len(needProp))
66 66
67 for i, bdep := range needProp { 67 for i, bdep := range needProp {
68 bdep.Propagated = true 68 bdep.Propagated = true
69 » » » muts[i] = &AckFwdDep{ 69 » » » muts[i] = &AckFwdDep{bdep.Edge()}
70 » » » » Dep: bdep.Edge(),
71 » » » » DepIsFinished: true,
72 » » » }
73 } 70 }
74 71
75 if len(needProp) == completionLimit { 72 if len(needProp) == completionLimit {
76 // Append ourself if there might be more to do! 73 // Append ourself if there might be more to do!
77 muts = append(muts, r) 74 muts = append(muts, r)
78 } 75 }
79 76
80 if err = ds.Put(needProp); err != nil { 77 if err = ds.Put(needProp); err != nil {
81 return 78 return
82 } 79 }
83 } 80 }
84 81
85 if !bdg.AttemptFinished { 82 if !bdg.AttemptFinished {
86 bdg.AttemptFinished = true 83 bdg.AttemptFinished = true
87 if err = ds.Put(bdg); err != nil { 84 if err = ds.Put(bdg); err != nil {
88 return 85 return
89 } 86 }
90 } 87 }
91 88
92 return 89 return
93 } 90 }
94 91
95 func init() { 92 func init() {
96 tumble.Register((*RecordCompletion)(nil)) 93 tumble.Register((*RecordCompletion)(nil))
97 } 94 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/mutate/merge_quest_test.go ('k') | appengine/cmd/dm/mutate/record_completion_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698