Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package model | 5 package model |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | 8 "crypto/subtle" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "errors" | 10 "errors" |
| 11 "fmt" | 11 "fmt" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "google.golang.org/grpc/codes" | 14 "google.golang.org/grpc/codes" |
| 15 | 15 |
| 16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 17 | 17 |
| 18 "github.com/luci/gae/service/datastore" | 18 "github.com/luci/gae/service/datastore" |
| 19 » "github.com/luci/luci-go/common/api/dm/service/v1" | 19 » dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| 20 » "github.com/luci/luci-go/common/clock" | |
| 21 » "github.com/luci/luci-go/common/cryptorand" | |
| 20 "github.com/luci/luci-go/common/grpcutil" | 22 "github.com/luci/luci-go/common/grpcutil" |
| 21 "github.com/luci/luci-go/common/logging" | 23 "github.com/luci/luci-go/common/logging" |
| 22 google_pb "github.com/luci/luci-go/common/proto/google" | 24 google_pb "github.com/luci/luci-go/common/proto/google" |
| 23 ) | 25 ) |
| 24 | 26 |
| 25 const ek = logging.ErrorKey | 27 const ek = logging.ErrorKey |
| 26 | 28 |
| 27 // Execution represents either an ongoing execution on the Quest's specified | 29 // Execution represents either an ongoing execution on the Quest's specified |
| 28 // distributor, or is a placeholder for an already-completed Execution. | 30 // distributor, or is a placeholder for an already-completed Execution. |
| 29 type Execution struct { | 31 type Execution struct { |
| 30 ID uint32 `gae:"$id"` | 32 ID uint32 `gae:"$id"` |
| 31 Attempt *datastore.Key `gae:"$parent"` | 33 Attempt *datastore.Key `gae:"$parent"` |
| 32 | 34 |
| 33 » State dm.Execution_State | 35 » Created time.Time |
| 34 » StateReason string `gae:",noindex"` | 36 » Modified time.Time |
| 35 | 37 |
| 36 » Created time.Time | 38 » DistributorConfigName string |
| 37 » DistributorToken string | 39 » DistributorConfigVersion string |
| 38 » DistributorURL string `gae:",noindex"` | 40 » DistributorToken string |
| 41 | |
| 42 » State dm.Execution_State | |
| 43 | |
| 44 » // Only valid in the ABNORMAL_FINISHED state. | |
| 45 » AbnormalFinish dm.AbnormalFinish | |
| 46 | |
| 47 » // Only valid in the FINISHED state. | |
| 48 » ResultPersistentState string | |
| 49 | |
| 50 » // These are DM's internal mechanism for performing timeout actions on | |
| 51 » // Executions. | |
| 52 » // | |
| 53 » // The TimeTo* variables are filled in by the distributor when this Exec ution | |
| 54 » // is created. | |
| 55 » // | |
| 56 » // The Timeout is only active when the Execution is in a non-terminal st ate. | |
| 57 » TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING | |
| 58 » TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING | |
| 59 » TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED | |
|
iannucci
2016/06/08 02:54:24
timeouts!
| |
| 39 | 60 |
| 40 // Token is a randomized nonce that's used to verify that RPCs verify fr om the | 61 // Token is a randomized nonce that's used to verify that RPCs verify fr om the |
| 41 // expected client (the client that's currently running the Execution). The | 62 // expected client (the client that's currently running the Execution). The |
| 42 // Token has 2 modes. | 63 // Token has 2 modes. |
| 43 // | 64 // |
| 44 // When the Execution is handed to the distributor, the Token is randoml y | 65 // When the Execution is handed to the distributor, the Token is randoml y |
| 45 // generated by DM and passed to the distributor. The State of the Execu tion | 66 // generated by DM and passed to the distributor. The State of the Execu tion |
| 46 » // starts as Scheduled. This token may be used by the client to "activat e" the | 67 » // starts as SCHEDULED. This token may be used by the client to "activat e" the |
| 47 // Execution with the ActivateExecution rpc. At that point, the client | 68 // Execution with the ActivateExecution rpc. At that point, the client |
| 48 » // provides a new random token, the Execution State moves from Scheduled to | 69 » // provides a new random token, the Execution State moves from SCHEDULED to |
| 49 » // Running, and Token assumes the new value. As long as the Execution St ate is | 70 » // RUNNING, and Token assumes the new value. As long as the Execution St ate is |
| 50 » // running, the client may continue to use that new Token value to | 71 » // RUNNING, the client may continue to use that new Token value to |
| 51 // authenticate other rpc's like AddDeps and FinishAttempt. | 72 // authenticate other rpc's like AddDeps and FinishAttempt. |
| 52 // | 73 // |
| 53 » // As soon as the Execution is no longer supposed to have access, Token will | 74 » // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FIN ISHED |
| 54 » // be nil'd out. | 75 » // state, this will be nil'd out. |
| 55 Token []byte `gae:",noindex"` | 76 Token []byte `gae:",noindex"` |
| 56 } | 77 } |
| 57 | 78 |
| 58 // Revoke will null-out the Token and Put this Execution to the datastore. | 79 // MakeExecution makes a new Execution in the SCHEDULING state, with a new |
| 80 // random Token. | |
| 81 func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers strin g) *Execution { | |
| 82 » now := clock.Now(c).UTC() | |
| 83 » ret := &Execution{ | |
| 84 » » ID: e.Id, | |
| 85 » » Attempt: datastore.Get(c).MakeKey("Attempt", e.AttemptID().DMEnc oded()), | |
|
dnj (Google)
2016/06/09 18:00:56
AttemptKeyFromID?
iannucci
2016/06/15 00:46:01
good catch, I tried to find these all but apparent
| |
| 86 | |
| 87 » » Created: now, | |
| 88 » » Modified: now, | |
| 89 | |
| 90 » » DistributorConfigName: cfgName, | |
| 91 » » DistributorConfigVersion: cfgVers, | |
| 92 | |
| 93 » » Token: MakeRandomToken(c, dm.MinimumActivationTokenLength), | |
| 94 » } | |
| 95 » return ret | |
| 96 } | |
| 97 | |
| 98 // ModifyState changes the current state of this Execution and updates its | |
| 99 // Modified timestamp. | |
| 100 func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error { | |
| 101 » if e.State == newState { | |
| 102 » » return nil | |
| 103 » } | |
| 104 » if err := e.State.Evolve(newState); err != nil { | |
| 105 » » return err | |
| 106 » } | |
| 107 » now := clock.Now(c).UTC() | |
| 108 » if now.After(e.Modified) { | |
| 109 » » e.Modified = now | |
| 110 » } else { | |
| 111 » » // Microsecond is the smallest granularity that datastore can st ore | |
| 112 » » // timestamps, so use that to disambiguate: the goal here is tha t any | |
| 113 » » // modification always increments the modified time, and never d ecrements | |
| 114 » » // it. | |
| 115 » » e.Modified = e.Modified.Add(time.Microsecond) | |
| 116 » } | |
| 117 » return nil | |
| 118 } | |
| 119 | |
| 120 // MakeRandomToken creates a cryptographically random byte slice of the | |
| 121 // specified length. It panics if the specified length cannot be read in full. | |
| 122 func MakeRandomToken(c context.Context, l uint32) []byte { | |
| 123 » rtok := make([]byte, l) | |
| 124 » if _, err := cryptorand.Read(c, rtok); err != nil { | |
| 125 » » panic(err) | |
| 126 » } | |
| 127 » return rtok | |
| 128 } | |
| 129 | |
| 130 // Revoke will null-out the Token and Put this Execution to the datastore. This | |
|
dnj (Google)
2016/06/09 18:00:56
nit: "nil-out" in above documentation, "null-out"
iannucci
2016/06/15 00:46:01
Done.
| |
| 131 // action requires the Execution to be in the RUNNING state, and causes it to | |
| 132 // enter the STOPPING state. | |
| 59 func (e *Execution) Revoke(c context.Context) error { | 133 func (e *Execution) Revoke(c context.Context) error { |
| 60 e.Token = nil | 134 e.Token = nil |
| 135 if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil { | |
| 136 return err | |
| 137 } | |
| 61 return datastore.Get(c).Put(e) | 138 return datastore.Get(c).Put(e) |
| 62 } | 139 } |
| 63 | 140 |
| 64 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec ution, err error) { | 141 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec ution, err error) { |
| 65 ds := datastore.Get(c) | 142 ds := datastore.Get(c) |
| 66 | 143 |
| 67 a = &Attempt{ID: *eid.AttemptID()} | 144 a = &Attempt{ID: *eid.AttemptID()} |
| 68 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)} | 145 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)} |
| 69 err = datastore.Get(c).GetMulti([]interface{}{a, e}) | 146 err = datastore.Get(c).GetMulti([]interface{}{a, e}) |
| 70 | 147 |
| 71 if err != nil { | 148 if err != nil { |
| 72 » » err = fmt.Errorf("couldn't get attempt %v or its execution %d: % s", a.ID, e.ID, err) | 149 » » err = grpcutil.Errf(codes.Internal, |
| 150 » » » "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) | |
| 73 return | 151 return |
| 74 } | 152 } |
| 75 | 153 |
| 76 if a.CurExecution != e.ID { | 154 if a.CurExecution != e.ID { |
| 77 err = fmt.Errorf("verifying incorrect execution %d, expected %d" , a.CurExecution, e.ID) | 155 err = fmt.Errorf("verifying incorrect execution %d, expected %d" , a.CurExecution, e.ID) |
| 78 return | 156 return |
| 79 } | 157 } |
| 80 return | 158 return |
| 81 } | 159 } |
| 82 | 160 |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 95 err = errors.New("Execution is not running") | 173 err = errors.New("Execution is not running") |
| 96 return | 174 return |
| 97 } | 175 } |
| 98 | 176 |
| 99 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { | 177 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
| 100 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth. Token)) | 178 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth. Token)) |
| 101 } | 179 } |
| 102 return | 180 return |
| 103 } | 181 } |
| 104 | 182 |
| 183 func makeError(err error, msg string) error { | |
| 184 code := grpcutil.Code(err) | |
| 185 if code == codes.Unknown { | |
| 186 code = codes.Unauthenticated | |
|
dnj (Google)
2016/06/09 18:00:56
Does this make sense? Unauthenticated specifically
iannucci
2016/06/15 00:46:01
Yeah, basically: unless we specifically tagged thi
| |
| 187 } | |
| 188 return grpcutil.Errf(code, msg) | |
| 189 } | |
| 190 | |
| 105 // AuthenticateExecution verifies that the Attempt is executing, and that evkey | 191 // AuthenticateExecution verifies that the Attempt is executing, and that evkey |
| 106 // matches the execution key of the current Execution for this Attempt. | 192 // matches the execution key of the current Execution for this Attempt. |
| 107 // | 193 // |
| 108 // As a bonus, it will return the loaded Attempt and Execution. | 194 // As a bonus, it will return the loaded Attempt and Execution. |
| 109 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem pt, e *Execution, err error) { | 195 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem pt, e *Execution, err error) { |
| 110 a, e, err = verifyExecutionAndCheckExTok(c, auth) | 196 a, e, err = verifyExecutionAndCheckExTok(c, auth) |
| 111 if err != nil { | 197 if err != nil { |
| 112 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") | 198 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") |
| 113 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A uth") | 199 » » err = makeError(err, "requires execution Auth") |
| 114 } | 200 } |
| 115 return a, e, err | 201 return a, e, err |
| 116 } | 202 } |
| 117 | 203 |
| 118 // InvalidateExecution verifies that the execution key is valid, and then | 204 // InvalidateExecution verifies that the execution key is valid, and then |
| 119 // revokes the execution key. | 205 // revokes the execution key. |
| 120 // | 206 // |
| 121 // As a bonus, it will return the loaded Attempt and Execution. | 207 // As a bonus, it will return the loaded Attempt and Execution. |
| 122 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt , e *Execution, err error) { | 208 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt , e *Execution, err error) { |
| 123 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { | 209 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { |
| 124 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") | 210 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") |
| 125 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A uth") | 211 » » err = makeError(err, "requires execution Auth") |
| 126 return | 212 return |
| 127 } | 213 } |
| 128 | 214 |
| 129 err = e.Revoke(c) | 215 err = e.Revoke(c) |
| 130 if err != nil { | 216 if err != nil { |
| 131 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev oke execution") | 217 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev oke execution") |
| 132 » » err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth") | 218 » » err = makeError(err, "unable to invalidate Auth") |
| 133 } | 219 } |
| 134 return | 220 return |
| 135 } | 221 } |
| 136 | 222 |
| 137 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT ok []byte) (a *Attempt, e *Execution, err error) { | 223 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT ok []byte) (a *Attempt, e *Execution, err error) { |
| 138 a, e, err = loadExecution(c, auth.Id) | 224 a, e, err = loadExecution(c, auth.Id) |
| 139 if err != nil { | 225 if err != nil { |
| 140 return | 226 return |
| 141 } | 227 } |
| 142 | 228 |
| 143 if a.State != dm.Attempt_EXECUTING { | 229 if a.State != dm.Attempt_EXECUTING { |
| 144 err = errors.New("Attempt is in wrong state") | 230 err = errors.New("Attempt is in wrong state") |
| 145 return | 231 return |
| 146 } | 232 } |
| 147 | 233 |
| 148 switch e.State { | 234 switch e.State { |
| 149 » case dm.Execution_SCHEDULED: | 235 » case dm.Execution_SCHEDULING: |
| 150 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { | 236 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
| 151 err = errors.New("incorrect ActivationToken") | 237 err = errors.New("incorrect ActivationToken") |
| 152 return | 238 return |
| 153 } | 239 } |
| 154 | 240 |
| 155 e.State.MustEvolve(dm.Execution_RUNNING) | 241 e.State.MustEvolve(dm.Execution_RUNNING) |
| 156 e.Token = actTok | 242 e.Token = actTok |
| 157 err = datastore.Get(c).Put(e) | 243 err = datastore.Get(c).Put(e) |
| 244 logging.Infof(c, "activated execution %s: was SCHEDULING now RUN NING", auth.Id) | |
| 158 | 245 |
| 159 case dm.Execution_RUNNING: | 246 case dm.Execution_RUNNING: |
| 160 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { | 247 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { |
| 161 err = errors.New("incorrect Token") | 248 err = errors.New("incorrect Token") |
| 162 } | 249 } |
| 163 // either the Token matched, in which case this is simply a retr y | 250 // either the Token matched, in which case this is simply a retr y |
| 164 // by the same client, so there's no error, or it's wrong which means it's | 251 // by the same client, so there's no error, or it's wrong which means it's |
| 165 // a retry by a different client. | 252 // a retry by a different client. |
| 166 | 253 |
| 254 logging.Infof(c, "already activated execution %s", auth.Id) | |
| 255 | |
| 167 default: | 256 default: |
| 168 err = fmt.Errorf("Execution is in wrong state") | 257 err = fmt.Errorf("Execution is in wrong state") |
| 169 } | 258 } |
| 170 return | 259 return |
| 171 } | 260 } |
| 172 | 261 |
| 173 // ActivateExecution validates that the execution is unactivated and that | 262 // ActivateExecution validates that the execution is unactivated and that |
| 174 // the activation token matches and then sets the token to the new | 263 // the activation token matches and then sets the token to the new |
| 175 // value. | 264 // value. |
| 176 // | 265 // |
| 177 // It's OK to retry this. Subsequent invocations with the same Token | 266 // It's OK to retry this. Subsequent invocations with the same Token |
| 178 // will recognize this case and not return an error. | 267 // will recognize this case and not return an error. |
| 179 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by te) (a *Attempt, e *Execution, err error) { | 268 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by te) (a *Attempt, e *Execution, err error) { |
| 180 a, e, err = verifyExecutionAndActivate(c, auth, actToken) | 269 a, e, err = verifyExecutionAndActivate(c, auth, actToken) |
| 181 if err != nil { | 270 if err != nil { |
| 182 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act ivate execution") | 271 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act ivate execution") |
| 183 » » err = grpcutil.Errf(codes.Unauthenticated, "failed to activate e xecution Auth") | 272 » » err = makeError(err, "failed to activate execution Auth") |
| 184 } | 273 } |
| 185 return a, e, err | 274 return a, e, err |
| 186 } | 275 } |
| 187 | 276 |
| 277 // GetEID gets an Execution_ID for this Execution. It panics if the Execution | |
| 278 // is in an invalid state. | |
| 279 func (e *Execution) GetEID() *dm.Execution_ID { | |
| 280 aid := &dm.Attempt_ID{} | |
| 281 if e.ID == 0 { | |
| 282 panic("cannot create valid Execution_ID with 0-value ID field") | |
| 283 } | |
| 284 if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { | |
| 285 panic(err) | |
| 286 } | |
| 287 return dm.NewExecutionID(aid.Quest, aid.Id, e.ID) | |
| 288 } | |
| 289 | |
| 188 // ToProto returns a dm proto version of this Execution. | 290 // ToProto returns a dm proto version of this Execution. |
| 189 func (e *Execution) ToProto(includeID bool) *dm.Execution { | 291 func (e *Execution) ToProto(includeID bool) *dm.Execution { |
| 190 » ret := &dm.Execution{ | 292 » ret := &dm.Execution{Data: e.DataProto()} |
| 191 » » Data: &dm.Execution_Data{ | |
| 192 » » » State: e.State, | |
| 193 » » » StateReason: e.StateReason, | |
| 194 » » » Created: google_pb.NewTimestamp(e.Created), | |
| 195 » » » DistributorToken: e.DistributorToken, | |
| 196 » » » DistributorInfoUrl: e.DistributorURL, | |
| 197 » » }, | |
| 198 » } | |
| 199 if includeID { | 293 if includeID { |
| 200 » » aid := &dm.Attempt_ID{} | 294 » » ret.Id = e.GetEID() |
| 201 » » if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { | |
| 202 » » » panic(err) | |
| 203 » » } | |
| 204 » » ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID) | |
| 205 } | 295 } |
| 206 return ret | 296 return ret |
| 207 } | 297 } |
| 298 | |
| 299 // DataProto returns an Execution.Data message for this Execution. | |
| 300 // | |
| 301 // This omits the DistributorInfo.Url portion, which must be filled in elsewhere for | |
| 302 // package cyclical import reasons. | |
| 303 func (e *Execution) DataProto() (ret *dm.Execution_Data) { | |
| 304 switch e.State { | |
| 305 case dm.Execution_SCHEDULING: | |
| 306 ret = dm.NewExecutionScheduling().Data | |
| 307 case dm.Execution_RUNNING: | |
| 308 ret = dm.NewExecutionRunning().Data | |
| 309 case dm.Execution_STOPPING: | |
| 310 ret = dm.NewExecutionStopping().Data | |
| 311 case dm.Execution_FINISHED: | |
| 312 ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).D ata | |
| 313 case dm.Execution_ABNORMAL_FINISHED: | |
| 314 ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data | |
| 315 default: | |
| 316 panic(fmt.Errorf("unknown Execution_State: %s", e.State)) | |
| 317 } | |
| 318 ret.Created = google_pb.NewTimestamp(e.Created) | |
| 319 ret.Modified = google_pb.NewTimestamp(e.Modified) | |
| 320 ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{ | |
| 321 ConfigName: e.DistributorConfigName, | |
| 322 ConfigVersion: e.DistributorConfigVersion, | |
| 323 Token: e.DistributorToken, | |
| 324 } | |
| 325 return ret | |
| 326 } | |
| OLD | NEW |