Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1046)

Side by Side Diff: appengine/cmd/dm/mutate/ack_fwd_dep.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package mutate 5 package mutate
6 6
7 import ( 7 import (
8 "github.com/luci/gae/service/datastore" 8 "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/appengine/cmd/dm/model" 9 "github.com/luci/luci-go/appengine/cmd/dm/model"
10 "github.com/luci/luci-go/appengine/tumble" 10 "github.com/luci/luci-go/appengine/tumble"
11 "github.com/luci/luci-go/common/api/dm/service/v1" 11 "github.com/luci/luci-go/common/api/dm/service/v1"
12 "golang.org/x/net/context" 12 "golang.org/x/net/context"
13 ) 13 )
14 14
15 // AckFwdDep records the fact that a BackDep was successfully created for this 15 // AckFwdDep records the fact that a dependency was completed.
16 // FwdDep. It may also propagate Finished information (e.g. that the depended-on
17 // Attempt was actually already completed at the time that the dependency was
18 // taken on it).
19 //
20 // AckFwdDep is also used to propagate the fact that an Attempt (A) completed
21 // back to another Attempt (B) that's blocked on A.
22 type AckFwdDep struct { 16 type AckFwdDep struct {
23 » Dep *model.FwdEdge 17 » Dep *model.FwdEdge
24 » DepIsFinished bool
25 } 18 }
26 19
27 // Root implements tumble.Mutation. 20 // Root implements tumble.Mutation.
28 func (f *AckFwdDep) Root(c context.Context) *datastore.Key { 21 func (f *AckFwdDep) Root(c context.Context) *datastore.Key {
29 return datastore.Get(c).MakeKey("Attempt", f.Dep.From.DMEncoded()) 22 return datastore.Get(c).MakeKey("Attempt", f.Dep.From.DMEncoded())
30 } 23 }
31 24
32 // RollForward implements tumble.Mutation. 25 // RollForward implements tumble.Mutation.
33 func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err error) { 26 func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err error) {
34 ds := datastore.Get(c) 27 ds := datastore.Get(c)
35 28
36 atmpt, fdep := f.Dep.Fwd(c) 29 atmpt, fdep := f.Dep.Fwd(c)
37 err = ds.GetMulti([]interface{}{atmpt, fdep}) 30 err = ds.GetMulti([]interface{}{atmpt, fdep})
38 if err != nil { 31 if err != nil {
39 return 32 return
40 } 33 }
41 34
42 » // if the attempt and fdep aren't on the same execution, then bail 35 » if atmpt.State != dm.Attempt_WAITING || atmpt.CurExecution != fdep.ForEx ecution {
43 » if atmpt.CurExecution != fdep.ForExecution {
44 return 36 return
45 } 37 }
46 38
47 needPut := false
48
49 idx := uint32(fdep.BitIndex) 39 idx := uint32(fdep.BitIndex)
50 40
51 » if !atmpt.AddingDepsBitmap.IsSet(idx) { 41 » if !atmpt.DepMap.IsSet(idx) {
52 » » atmpt.AddingDepsBitmap.Set(idx) 42 » » atmpt.DepMap.Set(idx)
53 43
54 » » if atmpt.AddingDepsBitmap.All(true) { 44 » » if atmpt.DepMap.All(true) {
55 » » » atmpt.MustModifyState(c, dm.Attempt_BLOCKED) 45 » » » if err = atmpt.ModifyState(c, dm.Attempt_SCHEDULING); er r != nil {
46 » » » » return
47 » » » }
48 » » » atmpt.DepMap.Reset()
49 » » » muts = append(muts, &ScheduleExecution{For: f.Dep.From})
56 } 50 }
57 51
58 needPut = true
59 }
60
61 if f.DepIsFinished {
62 if !atmpt.WaitingDepBitmap.IsSet(idx) {
63 atmpt.WaitingDepBitmap.Set(idx)
64
65 if atmpt.WaitingDepBitmap.All(true) {
66 atmpt.MustModifyState(c, dm.Attempt_NEEDS_EXECUT ION)
67 muts = append(muts, &ScheduleExecution{For: f.De p.From})
68 }
69
70 needPut = true
71 }
72 }
73
74 if needPut {
75 err = ds.Put(atmpt) 52 err = ds.Put(atmpt)
76 } 53 }
77 54
78 return 55 return
79 } 56 }
80 57
81 func init() { 58 func init() {
82 tumble.Register((*AckFwdDep)(nil)) 59 tumble.Register((*AckFwdDep)(nil))
83 } 60 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698