Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: appengine/cmd/dm/model/execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/model/attempt_test.go ('k') | appengine/cmd/dm/model/execution_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package model 5 package model
6 6
7 import ( 7 import (
8 "crypto/subtle" 8 "crypto/subtle"
9 "encoding/hex" 9 "encoding/hex"
10 "errors" 10 "errors"
11 "fmt" 11 "fmt"
12 "time" 12 "time"
13 13
14 "google.golang.org/grpc/codes" 14 "google.golang.org/grpc/codes"
15 15
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 17
18 "github.com/luci/gae/service/datastore" 18 "github.com/luci/gae/service/datastore"
19 » "github.com/luci/luci-go/common/api/dm/service/v1" 19 » dm "github.com/luci/luci-go/common/api/dm/service/v1"
20 » "github.com/luci/luci-go/common/clock"
21 » "github.com/luci/luci-go/common/cryptorand"
20 "github.com/luci/luci-go/common/grpcutil" 22 "github.com/luci/luci-go/common/grpcutil"
21 "github.com/luci/luci-go/common/logging" 23 "github.com/luci/luci-go/common/logging"
22 google_pb "github.com/luci/luci-go/common/proto/google" 24 google_pb "github.com/luci/luci-go/common/proto/google"
23 ) 25 )
24 26
25 const ek = logging.ErrorKey 27 const ek = logging.ErrorKey
26 28
27 // Execution represents either an ongoing execution on the Quest's specified 29 // Execution represents either an ongoing execution on the Quest's specified
28 // distributor, or is a placeholder for an already-completed Execution. 30 // distributor, or is a placeholder for an already-completed Execution.
29 type Execution struct { 31 type Execution struct {
30 ID uint32 `gae:"$id"` 32 ID uint32 `gae:"$id"`
31 Attempt *datastore.Key `gae:"$parent"` 33 Attempt *datastore.Key `gae:"$parent"`
32 34
33 » State dm.Execution_State 35 » Created time.Time
34 » StateReason string `gae:",noindex"` 36 » Modified time.Time
35 37
36 » Created time.Time 38 » DistributorConfigName string
37 » DistributorToken string 39 » DistributorConfigVersion string
38 » DistributorURL string `gae:",noindex"` 40 » DistributorToken string
41
42 » State dm.Execution_State
43
44 » // Only valid in the ABNORMAL_FINISHED state.
45 » AbnormalFinish dm.AbnormalFinish
46
47 » // Only valid in the FINISHED state.
48 » ResultPersistentState []byte `gae:",noindex"`
49
50 » // These are DM's internal mechanism for performing timeout actions on
51 » // Executions.
52 » //
53 » // The TimeTo* variables are filled in by the distributor when this Exec ution
54 » // is created.
55 » //
56 » // The Timeout is only active when the Execution is in a non-terminal st ate.
57 » TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING
58 » TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING
59 » TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED
39 60
40 // Token is a randomized nonce that's used to verify that RPCs verify fr om the 61 // Token is a randomized nonce that's used to verify that RPCs verify fr om the
41 // expected client (the client that's currently running the Execution). The 62 // expected client (the client that's currently running the Execution). The
42 // Token has 2 modes. 63 // Token has 2 modes.
43 // 64 //
44 // When the Execution is handed to the distributor, the Token is randoml y 65 // When the Execution is handed to the distributor, the Token is randoml y
45 // generated by DM and passed to the distributor. The State of the Execu tion 66 // generated by DM and passed to the distributor. The State of the Execu tion
46 » // starts as Scheduled. This token may be used by the client to "activat e" the 67 » // starts as SCHEDULED. This token may be used by the client to "activat e" the
47 // Execution with the ActivateExecution rpc. At that point, the client 68 // Execution with the ActivateExecution rpc. At that point, the client
48 » // provides a new random token, the Execution State moves from Scheduled to 69 » // provides a new random token, the Execution State moves from SCHEDULED to
49 » // Running, and Token assumes the new value. As long as the Execution St ate is 70 » // RUNNING, and Token assumes the new value. As long as the Execution St ate is
50 » // running, the client may continue to use that new Token value to 71 » // RUNNING, the client may continue to use that new Token value to
51 // authenticate other rpc's like AddDeps and FinishAttempt. 72 // authenticate other rpc's like AddDeps and FinishAttempt.
52 // 73 //
53 » // As soon as the Execution is no longer supposed to have access, Token will 74 » // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FIN ISHED
54 » // be nil'd out. 75 » // state, this will be nil'd out.
55 Token []byte `gae:",noindex"` 76 Token []byte `gae:",noindex"`
56 } 77 }
57 78
58 // Revoke will null-out the Token and Put this Execution to the datastore. 79 // MakeExecution makes a new Execution in the SCHEDULING state, with a new
80 // random Token.
81 func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers strin g) *Execution {
82 » now := clock.Now(c).UTC()
83 » ret := &Execution{
84 » » ID: e.Id,
85 » » Attempt: AttemptKeyFromID(c, e.AttemptID()),
86
87 » » Created: now,
88 » » Modified: now,
89
90 » » DistributorConfigName: cfgName,
91 » » DistributorConfigVersion: cfgVers,
92
93 » » Token: MakeRandomToken(c, dm.MinimumActivationTokenLength),
94 » }
95 » return ret
96 }
97
98 // ModifyState changes the current state of this Execution and updates its
99 // Modified timestamp.
100 func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error {
101 » if e.State == newState {
102 » » return nil
103 » }
104 » if err := e.State.Evolve(newState); err != nil {
105 » » return err
106 » }
107 » now := clock.Now(c).UTC()
108 » if now.After(e.Modified) {
109 » » e.Modified = now
110 » } else {
111 » » // Microsecond is the smallest granularity that datastore can st ore
112 » » // timestamps, so use that to disambiguate: the goal here is tha t any
113 » » // modification always increments the modified time, and never d ecrements
114 » » // it.
115 » » e.Modified = e.Modified.Add(time.Microsecond)
116 » }
117 » return nil
118 }
119
120 // MakeRandomToken creates a cryptographically random byte slice of the
121 // specified length. It panics if the specified length cannot be read in full.
122 func MakeRandomToken(c context.Context, l uint32) []byte {
123 » rtok := make([]byte, l)
124 » if _, err := cryptorand.Read(c, rtok); err != nil {
125 » » panic(err)
126 » }
127 » return rtok
128 }
129
130 // Revoke will clear the Token and Put this Execution to the datastore. This
131 // action requires the Execution to be in the RUNNING state, and causes it to
132 // enter the STOPPING state.
59 func (e *Execution) Revoke(c context.Context) error { 133 func (e *Execution) Revoke(c context.Context) error {
60 e.Token = nil 134 e.Token = nil
135 if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil {
136 return err
137 }
61 return datastore.Get(c).Put(e) 138 return datastore.Get(c).Put(e)
62 } 139 }
63 140
64 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec ution, err error) { 141 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec ution, err error) {
65 ds := datastore.Get(c) 142 ds := datastore.Get(c)
66 143
67 a = &Attempt{ID: *eid.AttemptID()} 144 a = &Attempt{ID: *eid.AttemptID()}
68 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)} 145 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)}
69 err = datastore.Get(c).Get(a, e) 146 err = datastore.Get(c).Get(a, e)
70 147
71 if err != nil { 148 if err != nil {
72 » » err = fmt.Errorf("couldn't get attempt %v or its execution %d: % s", a.ID, e.ID, err) 149 » » err = grpcutil.Errf(codes.Internal,
150 » » » "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err)
73 return 151 return
74 } 152 }
75 153
76 if a.CurExecution != e.ID { 154 if a.CurExecution != e.ID {
77 err = fmt.Errorf("verifying incorrect execution %d, expected %d" , a.CurExecution, e.ID) 155 err = fmt.Errorf("verifying incorrect execution %d, expected %d" , a.CurExecution, e.ID)
78 return 156 return
79 } 157 }
80 return 158 return
81 } 159 }
82 160
(...skipping 12 matching lines...) Expand all
95 err = errors.New("Execution is not running") 173 err = errors.New("Execution is not running")
96 return 174 return
97 } 175 }
98 176
99 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { 177 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
100 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth. Token)) 178 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth. Token))
101 } 179 }
102 return 180 return
103 } 181 }
104 182
183 func makeError(err error, msg string) error {
184 code := grpcutil.Code(err)
185 if code == codes.Unknown {
186 code = codes.PermissionDenied
187 }
188 return grpcutil.Errf(code, msg)
189 }
190
105 // AuthenticateExecution verifies that the Attempt is executing, and that evkey 191 // AuthenticateExecution verifies that the Attempt is executing, and that evkey
106 // matches the execution key of the current Execution for this Attempt. 192 // matches the execution key of the current Execution for this Attempt.
107 // 193 //
108 // As a bonus, it will return the loaded Attempt and Execution. 194 // As a bonus, it will return the loaded Attempt and Execution.
109 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem pt, e *Execution, err error) { 195 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem pt, e *Execution, err error) {
110 a, e, err = verifyExecutionAndCheckExTok(c, auth) 196 a, e, err = verifyExecutionAndCheckExTok(c, auth)
111 if err != nil { 197 if err != nil {
112 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") 198 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution")
113 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A uth") 199 » » err = makeError(err, "requires execution Auth")
114 } 200 }
115 return a, e, err 201 return a, e, err
116 } 202 }
117 203
118 // InvalidateExecution verifies that the execution key is valid, and then 204 // InvalidateExecution verifies that the execution key is valid, and then
119 // revokes the execution key. 205 // revokes the execution key.
120 // 206 //
121 // As a bonus, it will return the loaded Attempt and Execution. 207 // As a bonus, it will return the loaded Attempt and Execution.
122 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt , e *Execution, err error) { 208 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt , e *Execution, err error) {
123 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { 209 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil {
124 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") 210 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution")
125 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A uth") 211 » » err = makeError(err, "requires execution Auth")
126 return 212 return
127 } 213 }
128 214
129 err = e.Revoke(c) 215 err = e.Revoke(c)
130 if err != nil { 216 if err != nil {
131 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev oke execution") 217 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev oke execution")
132 » » err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth") 218 » » err = makeError(err, "unable to invalidate Auth")
133 } 219 }
134 return 220 return
135 } 221 }
136 222
137 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT ok []byte) (a *Attempt, e *Execution, err error) { 223 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT ok []byte) (a *Attempt, e *Execution, err error) {
138 a, e, err = loadExecution(c, auth.Id) 224 a, e, err = loadExecution(c, auth.Id)
139 if err != nil { 225 if err != nil {
140 return 226 return
141 } 227 }
142 228
143 if a.State != dm.Attempt_EXECUTING { 229 if a.State != dm.Attempt_EXECUTING {
144 err = errors.New("Attempt is in wrong state") 230 err = errors.New("Attempt is in wrong state")
145 return 231 return
146 } 232 }
147 233
148 switch e.State { 234 switch e.State {
149 » case dm.Execution_SCHEDULED: 235 » case dm.Execution_SCHEDULING:
150 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { 236 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
151 err = errors.New("incorrect ActivationToken") 237 err = errors.New("incorrect ActivationToken")
152 return 238 return
153 } 239 }
154 240
155 e.State.MustEvolve(dm.Execution_RUNNING) 241 e.State.MustEvolve(dm.Execution_RUNNING)
156 e.Token = actTok 242 e.Token = actTok
157 err = datastore.Get(c).Put(e) 243 err = datastore.Get(c).Put(e)
244 logging.Infof(c, "activated execution %s: was SCHEDULING now RUN NING", auth.Id)
158 245
159 case dm.Execution_RUNNING: 246 case dm.Execution_RUNNING:
160 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { 247 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 {
161 err = errors.New("incorrect Token") 248 err = errors.New("incorrect Token")
162 } 249 }
163 // either the Token matched, in which case this is simply a retr y 250 // either the Token matched, in which case this is simply a retr y
164 // by the same client, so there's no error, or it's wrong which means it's 251 // by the same client, so there's no error, or it's wrong which means it's
165 // a retry by a different client. 252 // a retry by a different client.
166 253
254 logging.Infof(c, "already activated execution %s", auth.Id)
255
167 default: 256 default:
168 err = fmt.Errorf("Execution is in wrong state") 257 err = fmt.Errorf("Execution is in wrong state")
169 } 258 }
170 return 259 return
171 } 260 }
172 261
173 // ActivateExecution validates that the execution is unactivated and that 262 // ActivateExecution validates that the execution is unactivated and that
174 // the activation token matches and then sets the token to the new 263 // the activation token matches and then sets the token to the new
175 // value. 264 // value.
176 // 265 //
177 // It's OK to retry this. Subsequent invocations with the same Token 266 // It's OK to retry this. Subsequent invocations with the same Token
178 // will recognize this case and not return an error. 267 // will recognize this case and not return an error.
179 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by te) (a *Attempt, e *Execution, err error) { 268 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by te) (a *Attempt, e *Execution, err error) {
180 a, e, err = verifyExecutionAndActivate(c, auth, actToken) 269 a, e, err = verifyExecutionAndActivate(c, auth, actToken)
181 if err != nil { 270 if err != nil {
182 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act ivate execution") 271 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act ivate execution")
183 » » err = grpcutil.Errf(codes.Unauthenticated, "failed to activate e xecution Auth") 272 » » err = makeError(err, "failed to activate execution Auth")
184 } 273 }
185 return a, e, err 274 return a, e, err
186 } 275 }
187 276
277 // GetEID gets an Execution_ID for this Execution. It panics if the Execution
278 // is in an invalid state.
279 func (e *Execution) GetEID() *dm.Execution_ID {
280 aid := &dm.Attempt_ID{}
281 if e.ID == 0 {
282 panic("cannot create valid Execution_ID with 0-value ID field")
283 }
284 if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
285 panic(err)
286 }
287 return dm.NewExecutionID(aid.Quest, aid.Id, e.ID)
288 }
289
188 // ToProto returns a dm proto version of this Execution. 290 // ToProto returns a dm proto version of this Execution.
189 func (e *Execution) ToProto(includeID bool) *dm.Execution { 291 func (e *Execution) ToProto(includeID bool) *dm.Execution {
190 » ret := &dm.Execution{ 292 » ret := &dm.Execution{Data: e.DataProto()}
191 » » Data: &dm.Execution_Data{
192 » » » State: e.State,
193 » » » StateReason: e.StateReason,
194 » » » Created: google_pb.NewTimestamp(e.Created),
195 » » » DistributorToken: e.DistributorToken,
196 » » » DistributorInfoUrl: e.DistributorURL,
197 » » },
198 » }
199 if includeID { 293 if includeID {
200 » » aid := &dm.Attempt_ID{} 294 » » ret.Id = e.GetEID()
201 » » if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
202 » » » panic(err)
203 » » }
204 » » ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID)
205 } 295 }
206 return ret 296 return ret
207 } 297 }
298
299 // DataProto returns an Execution.Data message for this Execution.
300 //
301 // This omits the DistributorInfo.Url portion, which must be filled in elsewhere for
302 // package cyclical import reasons.
303 func (e *Execution) DataProto() (ret *dm.Execution_Data) {
304 switch e.State {
305 case dm.Execution_SCHEDULING:
306 ret = dm.NewExecutionScheduling().Data
307 case dm.Execution_RUNNING:
308 ret = dm.NewExecutionRunning().Data
309 case dm.Execution_STOPPING:
310 ret = dm.NewExecutionStopping().Data
311 case dm.Execution_FINISHED:
312 ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).D ata
313 case dm.Execution_ABNORMAL_FINISHED:
314 ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data
315 default:
316 panic(fmt.Errorf("unknown Execution_State: %s", e.State))
317 }
318 ret.Created = google_pb.NewTimestamp(e.Created)
319 ret.Modified = google_pb.NewTimestamp(e.Modified)
320 ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{
321 ConfigName: e.DistributorConfigName,
322 ConfigVersion: e.DistributorConfigVersion,
323 Token: e.DistributorToken,
324 }
325 return ret
326 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/model/attempt_test.go ('k') | appengine/cmd/dm/model/execution_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698