Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(196)

Side by Side Diff: appengine/cmd/dm/model/attempt.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package model 5 package model
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "golang.org/x/net/context" 11 "golang.org/x/net/context"
12 12
13 » "github.com/luci/luci-go/common/api/dm/service/v1" 13 » dm "github.com/luci/luci-go/common/api/dm/service/v1"
14 » "github.com/luci/luci-go/common/bit_field" 14 » bf "github.com/luci/luci-go/common/bit_field"
15 "github.com/luci/luci-go/common/clock" 15 "github.com/luci/luci-go/common/clock"
16 google_pb "github.com/luci/luci-go/common/proto/google" 16 google_pb "github.com/luci/luci-go/common/proto/google"
17 ) 17 )
18 18
19 // AttemptRetryState indicates the current state of the Attempt's retry
20 // counters.
21 type AttemptRetryState struct {
22 Failed uint32
23 Expired uint32
24 TimedOut uint32
25 Crashed uint32
26 }
27
28 // Reset resets all of the AttemptRetryState counters.
29 func (a *AttemptRetryState) Reset() {
30 a.Failed = 0
dnj (Google) 2016/06/09 18:00:56 *a = AttemptRetryState{} !
iannucci 2016/06/15 00:46:01 mmmm tasty.
31 a.Expired = 0
32 a.TimedOut = 0
33 a.Crashed = 0
34 }
35
19 // Attempt is the datastore model for a DM Attempt. It has no parent key, but 36 // Attempt is the datastore model for a DM Attempt. It has no parent key, but
20 // it may have the following children entities: 37 // it may have the following children entities:
21 // * FwdDep 38 // * FwdDep
22 // * AttemptResult 39 // * AttemptResult
23 // 40 //
24 // Additionally, every Attempt has an associated BackDepGroup whose ID equals 41 // Additionally, every Attempt has an associated BackDepGroup whose ID equals
25 // the ID of this Attempt. 42 // the ID of this Attempt.
26 type Attempt struct { 43 type Attempt struct {
27 ID dm.Attempt_ID `gae:"$id"` 44 ID dm.Attempt_ID `gae:"$id"`
28 45
29 Created time.Time 46 Created time.Time
30 Modified time.Time 47 Modified time.Time
31 48
32 » State dm.Attempt_State 49 » State dm.Attempt_State
50 » RetryState AttemptRetryState
51
52 » // Only valid when State == ABNORMAL_FINISHED
53 » AbnormalFinish dm.AbnormalFinish
dnj (Google) 2016/06/09 18:00:56 Why not make these pointers?
iannucci 2016/06/15 00:46:01 because luci/gae doesn't allow embed-by-pointer in
54
55 » // Only valid when State == FINISHED
56 » ResultExpiration time.Time `gae:",noindex"`
57 » ResultSize uint32 `gae:",noindex"`
58
59 » // PersistentState is the last successful execution's returned
60 » // PersistentState. It is set whenever an execution for this Attempt fin ishes
61 » // sucessfully. This is denormalized with the Execution's
62 » // ResultPersistentState field.
63 » PersistentState string `gae:",noindex"`
33 64
34 // TODO(iannucci): Use CurExecution as a 'deps block version' 65 // TODO(iannucci): Use CurExecution as a 'deps block version'
35 // then we can have an 'ANY' directive which executes the attempt as soo n 66 // then we can have an 'ANY' directive which executes the attempt as soo n
36 // as any of the dependencies are ready. If it adds more deps in ANY mod e, 67 // as any of the dependencies are ready. If it adds more deps in ANY mod e,
37 // the bitmaps get /extended/, and new deps bit indices are added to the 68 // the bitmaps get /extended/, and new deps bit indices are added to the
38 // existing max. 69 // existing max.
39 // If it adds more deps in ALL mode, it just converts from ANY -> ALL mo de 70 // If it adds more deps in ALL mode, it just converts from ANY -> ALL mo de
40 // and follows the current behavior. 71 // and follows the current behavior.
41 72
42 // CurExecution is the maximum Execution ID for this Attempt so far. Exe cution 73 // CurExecution is the maximum Execution ID for this Attempt so far. Exe cution
43 // IDs are contiguous from [1, CurExecution]. If the State is not curren tly 74 // IDs are contiguous from [1, CurExecution]. If the State is not curren tly
44 // Executing, then CurExecution represents the execution that JUST finis hed 75 // Executing, then CurExecution represents the execution that JUST finis hed
45 // (or 0 if no Executions have been made yet). 76 // (or 0 if no Executions have been made yet).
46 CurExecution uint32 77 CurExecution uint32
47 78
48 » // AddingDepsBitmap is valid only while Attempt is in 'AddingDeps'. 79 » // DepMap is valid only while Attempt is in a State of EXECUTING or WAIT ING.
49 » // A field value of 0 means the backdep hasn't been added yet. 80 » //
50 » AddingDepsBitmap bf.BitField `gae:",noindex" json:"-"` 81 » // The size of this field is inspected to deteremine what the next state after
51 82 » // EXECUTING is. If the size == 0, it means the Attempt should move to t he
52 » // WaitingDepBitmap is valid only while Attempt is in a Status of 'Addin gDeps' 83 » // FINISHED state. Otherwise it means that the Attempt should move to th e
53 » // or 'Blocked'. 84 » // WAITING state.
54 » // A field value of 0 means that the dep is currently waiting. 85 » //
55 » WaitingDepBitmap bf.BitField `gae:",noindex" json:"-"` 86 » // A bit field value of 0 means that the dep is currently waiting, and a bit
56 87 » // value of 1 means that the coresponding dep is satisfined. The Attempt can
57 » // Only valid while Attempt is Finished 88 » // be unblocked from WAITING back to SCHEDULING when all bits are set to 1.
58 » ResultExpiration time.Time 89 » DepMap bf.BitField `gae:",noindex" json:"-"`
iannucci 2016/06/08 02:54:24 this is part of the state machine simplification;
59 » ResultSize uint32
60 90
61 // A lazily-updated boolean to reflect that this Attempt is expired for 91 // A lazily-updated boolean to reflect that this Attempt is expired for
62 // queries. 92 // queries.
63 » Expired bool 93 » ResultExpired bool
64 } 94 }
65 95
66 // MakeAttempt is a convenience function to create a new Attempt model in 96 // MakeAttempt is a convenience function to create a new Attempt model in
67 // the NeedsExecution state. 97 // the NeedsExecution state.
68 func MakeAttempt(c context.Context, aid *dm.Attempt_ID) *Attempt { 98 func MakeAttempt(c context.Context, aid *dm.Attempt_ID) *Attempt {
69 now := clock.Now(c).UTC() 99 now := clock.Now(c).UTC()
70 return &Attempt{ 100 return &Attempt{
71 ID: *aid, 101 ID: *aid,
72 Created: now, 102 Created: now,
73 Modified: now, 103 Modified: now,
74 } 104 }
75 } 105 }
76 106
77 // ModifyState changes the current state of this Attempt and updates its 107 // ModifyState changes the current state of this Attempt and updates its
78 // Modified timestamp. 108 // Modified timestamp.
79 func (a *Attempt) ModifyState(c context.Context, newState dm.Attempt_State) erro r { 109 func (a *Attempt) ModifyState(c context.Context, newState dm.Attempt_State) erro r {
110 if a.State == newState {
111 return nil
112 }
80 if err := a.State.Evolve(newState); err != nil { 113 if err := a.State.Evolve(newState); err != nil {
81 return err 114 return err
82 } 115 }
83 now := clock.Now(c).UTC() 116 now := clock.Now(c).UTC()
84 if now.After(a.Modified) { 117 if now.After(a.Modified) {
85 a.Modified = now 118 a.Modified = now
86 } else { 119 } else {
87 // Microsecond is the smallest granularity that datastore can st ore 120 // Microsecond is the smallest granularity that datastore can st ore
88 // timestamps, so use that to disambiguate: the goal here is tha t any 121 // timestamps, so use that to disambiguate: the goal here is tha t any
89 // modification always increments the modified time, and never d ecrements 122 // modification always increments the modified time, and never d ecrements
90 // it. 123 // it.
91 a.Modified = a.Modified.Add(time.Microsecond) 124 a.Modified = a.Modified.Add(time.Microsecond)
92 } 125 }
93 return nil 126 return nil
94 } 127 }
95 128
96 // MustModifyState is the same as ModifyState, except that it panics if the
97 // state transition is invalid.
98 func (a *Attempt) MustModifyState(c context.Context, newState dm.Attempt_State) {
99 err := a.ModifyState(c, newState)
100 if err != nil {
101 panic(err)
102 }
103 }
104
105 // ToProto returns a dm proto version of this Attempt. 129 // ToProto returns a dm proto version of this Attempt.
106 func (a *Attempt) ToProto(withData bool) *dm.Attempt { 130 func (a *Attempt) ToProto(withData bool) *dm.Attempt {
107 ret := dm.Attempt{Id: &a.ID} 131 ret := dm.Attempt{Id: &a.ID}
108 if withData { 132 if withData {
109 ret.Data = a.DataProto() 133 ret.Data = a.DataProto()
110 } 134 }
111 return &ret 135 return &ret
112 } 136 }
113 137
114 // DataProto returns an Attempt.Data message for this Attempt. 138 // DataProto returns an Attempt.Data message for this Attempt.
115 func (a *Attempt) DataProto() *dm.Attempt_Data { 139 func (a *Attempt) DataProto() (ret *dm.Attempt_Data) {
116 » ret := (*dm.Attempt_Data)(nil)
117 switch a.State { 140 switch a.State {
118 » case dm.Attempt_NEEDS_EXECUTION: 141 » case dm.Attempt_SCHEDULING:
iannucci 2016/06/08 02:54:24 these changes are due to the state machine simplif
119 » » ret = dm.NewAttemptNeedsExecution(a.Modified).Data 142 » » ret = dm.NewAttemptScheduling().Data
120
121 case dm.Attempt_EXECUTING: 143 case dm.Attempt_EXECUTING:
122 ret = dm.NewAttemptExecuting(a.CurExecution).Data 144 ret = dm.NewAttemptExecuting(a.CurExecution).Data
123 145 » case dm.Attempt_WAITING:
124 » case dm.Attempt_ADDING_DEPS: 146 » » ret = dm.NewAttemptWaiting(a.DepMap.Size() - a.DepMap.CountSet() ).Data
125 » » addset := a.AddingDepsBitmap
126 » » waitset := a.WaitingDepBitmap
127 » » setlen := addset.Size()
128
129 » » ret = dm.NewAttemptAddingDeps(
130 » » » setlen-addset.CountSet(), setlen-waitset.CountSet()).Dat a
131
132 » case dm.Attempt_BLOCKED:
133 » » waitset := a.WaitingDepBitmap
134 » » setlen := waitset.Size()
135
136 » » ret = dm.NewAttemptBlocked(setlen - waitset.CountSet()).Data
137
138 case dm.Attempt_FINISHED: 147 case dm.Attempt_FINISHED:
139 » » ret = dm.NewAttemptFinished(a.ResultExpiration, a.ResultSize, "" ).Data 148 » » ret = dm.NewAttemptFinished(a.ResultExpiration, a.ResultSize, "" ,
140 149 » » » string(a.PersistentState)).Data
150 » case dm.Attempt_ABNORMAL_FINISHED:
151 » » ret = dm.NewAttemptAbnormalFinish(&a.AbnormalFinish).Data
141 default: 152 default:
142 panic(fmt.Errorf("unknown Attempt_State: %s", a.State)) 153 panic(fmt.Errorf("unknown Attempt_State: %s", a.State))
143 } 154 }
144 ret.Created = google_pb.NewTimestamp(a.Created) 155 ret.Created = google_pb.NewTimestamp(a.Created)
145 ret.Modified = google_pb.NewTimestamp(a.Modified) 156 ret.Modified = google_pb.NewTimestamp(a.Modified)
146 ret.NumExecutions = a.CurExecution 157 ret.NumExecutions = a.CurExecution
147 return ret 158 return ret
148 } 159 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698