Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Side by Side Diff: appengine/cmd/dm/model/attempt.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/frontend/init.go ('k') | appengine/cmd/dm/model/attempt_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package model 5 package model
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "golang.org/x/net/context" 11 "golang.org/x/net/context"
12 12
13 » "github.com/luci/luci-go/common/api/dm/service/v1" 13 » dm "github.com/luci/luci-go/common/api/dm/service/v1"
14 » "github.com/luci/luci-go/common/bit_field" 14 » bf "github.com/luci/luci-go/common/bit_field"
15 "github.com/luci/luci-go/common/clock" 15 "github.com/luci/luci-go/common/clock"
16 google_pb "github.com/luci/luci-go/common/proto/google" 16 google_pb "github.com/luci/luci-go/common/proto/google"
17 ) 17 )
18 18
19 // AttemptRetryState indicates the current state of the Attempt's retry
20 // counters.
21 type AttemptRetryState struct {
22 Failed uint32
23 Expired uint32
24 TimedOut uint32
25 Crashed uint32
26 }
27
28 // Reset resets all of the AttemptRetryState counters.
29 func (a *AttemptRetryState) Reset() {
30 *a = AttemptRetryState{}
31 }
32
19 // Attempt is the datastore model for a DM Attempt. It has no parent key, but 33 // Attempt is the datastore model for a DM Attempt. It has no parent key, but
20 // it may have the following children entities: 34 // it may have the following children entities:
21 // * FwdDep 35 // * FwdDep
22 // * AttemptResult 36 // * AttemptResult
23 // 37 //
24 // Additionally, every Attempt has an associated BackDepGroup whose ID equals 38 // Additionally, every Attempt has an associated BackDepGroup whose ID equals
25 // the ID of this Attempt. 39 // the ID of this Attempt.
26 type Attempt struct { 40 type Attempt struct {
27 ID dm.Attempt_ID `gae:"$id"` 41 ID dm.Attempt_ID `gae:"$id"`
28 42
29 Created time.Time 43 Created time.Time
30 Modified time.Time 44 Modified time.Time
31 45
32 » State dm.Attempt_State 46 » State dm.Attempt_State
47 » RetryState AttemptRetryState
48
49 » // Only valid when State == ABNORMAL_FINISHED
50 » AbnormalFinish dm.AbnormalFinish
51
52 » // Only valid when State == FINISHED
53 » ResultExpiration time.Time `gae:",noindex"`
54 » ResultSize uint32 `gae:",noindex"`
55
56 » // PersistentState is the last successful execution's returned
57 » // PersistentState. It is set whenever an execution for this Attempt fin ishes
58 » // sucessfully. This is denormalized with the Execution's
59 » // ResultPersistentState field.
60 » PersistentState []byte `gae:",noindex"`
33 61
34 // TODO(iannucci): Use CurExecution as a 'deps block version' 62 // TODO(iannucci): Use CurExecution as a 'deps block version'
35 // then we can have an 'ANY' directive which executes the attempt as soo n 63 // then we can have an 'ANY' directive which executes the attempt as soo n
36 // as any of the dependencies are ready. If it adds more deps in ANY mod e, 64 // as any of the dependencies are ready. If it adds more deps in ANY mod e,
37 // the bitmaps get /extended/, and new deps bit indices are added to the 65 // the bitmaps get /extended/, and new deps bit indices are added to the
38 // existing max. 66 // existing max.
39 // If it adds more deps in ALL mode, it just converts from ANY -> ALL mo de 67 // If it adds more deps in ALL mode, it just converts from ANY -> ALL mo de
40 // and follows the current behavior. 68 // and follows the current behavior.
41 69
42 // CurExecution is the maximum Execution ID for this Attempt so far. Exe cution 70 // CurExecution is the maximum Execution ID for this Attempt so far. Exe cution
43 // IDs are contiguous from [1, CurExecution]. If the State is not curren tly 71 // IDs are contiguous from [1, CurExecution]. If the State is not curren tly
44 // Executing, then CurExecution represents the execution that JUST finis hed 72 // Executing, then CurExecution represents the execution that JUST finis hed
45 // (or 0 if no Executions have been made yet). 73 // (or 0 if no Executions have been made yet).
46 CurExecution uint32 74 CurExecution uint32
47 75
48 » // AddingDepsBitmap is valid only while Attempt is in 'AddingDeps'. 76 » // DepMap is valid only while Attempt is in a State of EXECUTING or WAIT ING.
49 » // A field value of 0 means the backdep hasn't been added yet. 77 » //
50 » AddingDepsBitmap bf.BitField `gae:",noindex" json:"-"` 78 » // The size of this field is inspected to deteremine what the next state after
51 79 » // EXECUTING is. If the size == 0, it means the Attempt should move to t he
52 » // WaitingDepBitmap is valid only while Attempt is in a Status of 'Addin gDeps' 80 » // FINISHED state. Otherwise it means that the Attempt should move to th e
53 » // or 'Blocked'. 81 » // WAITING state.
54 » // A field value of 0 means that the dep is currently waiting. 82 » //
55 » WaitingDepBitmap bf.BitField `gae:",noindex" json:"-"` 83 » // A bit field value of 0 means that the dep is currently waiting, and a bit
56 84 » // value of 1 means that the coresponding dep is satisfined. The Attempt can
57 » // Only valid while Attempt is Finished 85 » // be unblocked from WAITING back to SCHEDULING when all bits are set to 1.
58 » ResultExpiration time.Time 86 » DepMap bf.BitField `gae:",noindex" json:"-"`
59 » ResultSize uint32
60 87
61 // A lazily-updated boolean to reflect that this Attempt is expired for 88 // A lazily-updated boolean to reflect that this Attempt is expired for
62 // queries. 89 // queries.
63 » Expired bool 90 » ResultExpired bool
64 } 91 }
65 92
66 // MakeAttempt is a convenience function to create a new Attempt model in 93 // MakeAttempt is a convenience function to create a new Attempt model in
67 // the NeedsExecution state. 94 // the NeedsExecution state.
68 func MakeAttempt(c context.Context, aid *dm.Attempt_ID) *Attempt { 95 func MakeAttempt(c context.Context, aid *dm.Attempt_ID) *Attempt {
69 now := clock.Now(c).UTC() 96 now := clock.Now(c).UTC()
70 return &Attempt{ 97 return &Attempt{
71 ID: *aid, 98 ID: *aid,
72 Created: now, 99 Created: now,
73 Modified: now, 100 Modified: now,
74 } 101 }
75 } 102 }
76 103
77 // ModifyState changes the current state of this Attempt and updates its 104 // ModifyState changes the current state of this Attempt and updates its
78 // Modified timestamp. 105 // Modified timestamp.
79 func (a *Attempt) ModifyState(c context.Context, newState dm.Attempt_State) erro r { 106 func (a *Attempt) ModifyState(c context.Context, newState dm.Attempt_State) erro r {
107 if a.State == newState {
108 return nil
109 }
80 if err := a.State.Evolve(newState); err != nil { 110 if err := a.State.Evolve(newState); err != nil {
81 return err 111 return err
82 } 112 }
83 now := clock.Now(c).UTC() 113 now := clock.Now(c).UTC()
84 if now.After(a.Modified) { 114 if now.After(a.Modified) {
85 a.Modified = now 115 a.Modified = now
86 } else { 116 } else {
87 // Microsecond is the smallest granularity that datastore can st ore 117 // Microsecond is the smallest granularity that datastore can st ore
88 // timestamps, so use that to disambiguate: the goal here is tha t any 118 // timestamps, so use that to disambiguate: the goal here is tha t any
89 // modification always increments the modified time, and never d ecrements 119 // modification always increments the modified time, and never d ecrements
90 // it. 120 // it.
91 a.Modified = a.Modified.Add(time.Microsecond) 121 a.Modified = a.Modified.Add(time.Microsecond)
92 } 122 }
93 return nil 123 return nil
94 } 124 }
95 125
96 // MustModifyState is the same as ModifyState, except that it panics if the
97 // state transition is invalid.
98 func (a *Attempt) MustModifyState(c context.Context, newState dm.Attempt_State) {
99 err := a.ModifyState(c, newState)
100 if err != nil {
101 panic(err)
102 }
103 }
104
105 // ToProto returns a dm proto version of this Attempt. 126 // ToProto returns a dm proto version of this Attempt.
106 func (a *Attempt) ToProto(withData bool) *dm.Attempt { 127 func (a *Attempt) ToProto(withData bool) *dm.Attempt {
107 ret := dm.Attempt{Id: &a.ID} 128 ret := dm.Attempt{Id: &a.ID}
108 if withData { 129 if withData {
109 ret.Data = a.DataProto() 130 ret.Data = a.DataProto()
110 } 131 }
111 return &ret 132 return &ret
112 } 133 }
113 134
114 // DataProto returns an Attempt.Data message for this Attempt. 135 // DataProto returns an Attempt.Data message for this Attempt.
115 func (a *Attempt) DataProto() *dm.Attempt_Data { 136 func (a *Attempt) DataProto() (ret *dm.Attempt_Data) {
116 » ret := (*dm.Attempt_Data)(nil)
117 switch a.State { 137 switch a.State {
118 » case dm.Attempt_NEEDS_EXECUTION: 138 » case dm.Attempt_SCHEDULING:
119 » » ret = dm.NewAttemptNeedsExecution(a.Modified).Data 139 » » ret = dm.NewAttemptScheduling().Data
120
121 case dm.Attempt_EXECUTING: 140 case dm.Attempt_EXECUTING:
122 ret = dm.NewAttemptExecuting(a.CurExecution).Data 141 ret = dm.NewAttemptExecuting(a.CurExecution).Data
123 142 » case dm.Attempt_WAITING:
124 » case dm.Attempt_ADDING_DEPS: 143 » » ret = dm.NewAttemptWaiting(a.DepMap.Size() - a.DepMap.CountSet() ).Data
125 » » addset := a.AddingDepsBitmap
126 » » waitset := a.WaitingDepBitmap
127 » » setlen := addset.Size()
128
129 » » ret = dm.NewAttemptAddingDeps(
130 » » » setlen-addset.CountSet(), setlen-waitset.CountSet()).Dat a
131
132 » case dm.Attempt_BLOCKED:
133 » » waitset := a.WaitingDepBitmap
134 » » setlen := waitset.Size()
135
136 » » ret = dm.NewAttemptBlocked(setlen - waitset.CountSet()).Data
137
138 case dm.Attempt_FINISHED: 144 case dm.Attempt_FINISHED:
139 » » ret = dm.NewAttemptFinished(a.ResultExpiration, a.ResultSize, "" ).Data 145 » » ret = dm.NewAttemptFinished(a.ResultExpiration, a.ResultSize, "" ,
140 146 » » » a.PersistentState).Data
147 » case dm.Attempt_ABNORMAL_FINISHED:
148 » » ret = dm.NewAttemptAbnormalFinish(&a.AbnormalFinish).Data
141 default: 149 default:
142 panic(fmt.Errorf("unknown Attempt_State: %s", a.State)) 150 panic(fmt.Errorf("unknown Attempt_State: %s", a.State))
143 } 151 }
144 ret.Created = google_pb.NewTimestamp(a.Created) 152 ret.Created = google_pb.NewTimestamp(a.Created)
145 ret.Modified = google_pb.NewTimestamp(a.Modified) 153 ret.Modified = google_pb.NewTimestamp(a.Modified)
146 ret.NumExecutions = a.CurExecution 154 ret.NumExecutions = a.CurExecution
147 return ret 155 return ret
148 } 156 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/frontend/init.go ('k') | appengine/cmd/dm/model/attempt_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698