OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package model | 5 package model |
6 | 6 |
7 import ( | 7 import ( |
8 "crypto/subtle" | 8 "crypto/subtle" |
9 "encoding/hex" | 9 "encoding/hex" |
10 "errors" | 10 "errors" |
11 "fmt" | 11 "fmt" |
12 "time" | 12 "time" |
13 | 13 |
14 "google.golang.org/grpc/codes" | 14 "google.golang.org/grpc/codes" |
15 | 15 |
16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
17 | 17 |
18 "github.com/luci/gae/service/datastore" | 18 "github.com/luci/gae/service/datastore" |
19 » "github.com/luci/luci-go/common/api/dm/service/v1" | 19 » dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| 20 » "github.com/luci/luci-go/common/clock" |
| 21 » "github.com/luci/luci-go/common/cryptorand" |
20 "github.com/luci/luci-go/common/grpcutil" | 22 "github.com/luci/luci-go/common/grpcutil" |
21 "github.com/luci/luci-go/common/logging" | 23 "github.com/luci/luci-go/common/logging" |
22 google_pb "github.com/luci/luci-go/common/proto/google" | 24 google_pb "github.com/luci/luci-go/common/proto/google" |
23 ) | 25 ) |
24 | 26 |
25 const ek = logging.ErrorKey | 27 const ek = logging.ErrorKey |
26 | 28 |
27 // Execution represents either an ongoing execution on the Quest's specified | 29 // Execution represents either an ongoing execution on the Quest's specified |
28 // distributor, or is a placeholder for an already-completed Execution. | 30 // distributor, or is a placeholder for an already-completed Execution. |
29 type Execution struct { | 31 type Execution struct { |
30 ID uint32 `gae:"$id"` | 32 ID uint32 `gae:"$id"` |
31 Attempt *datastore.Key `gae:"$parent"` | 33 Attempt *datastore.Key `gae:"$parent"` |
32 | 34 |
33 » State dm.Execution_State | 35 » Created time.Time |
34 » StateReason string `gae:",noindex"` | 36 » Modified time.Time |
35 | 37 |
36 » Created time.Time | 38 » DistributorConfigName string |
37 » DistributorToken string | 39 » DistributorConfigVersion string |
38 » DistributorURL string `gae:",noindex"` | 40 » DistributorToken string |
| 41 |
| 42 » State dm.Execution_State |
| 43 |
| 44 » // Only valid in the ABNORMAL_FINISHED state. |
| 45 » AbnormalFinish dm.AbnormalFinish |
| 46 |
| 47 » // Only valid in the FINISHED state. |
| 48 » ResultPersistentState []byte `gae:",noindex"` |
| 49 |
| 50 » // These are DM's internal mechanism for performing timeout actions on |
| 51 » // Executions. |
| 52 » // |
| 53 » // The TimeTo* variables are filled in by the distributor when this Exec
ution |
| 54 » // is created. |
| 55 » // |
| 56 » // The Timeout is only active when the Execution is in a non-terminal st
ate. |
| 57 » TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING |
| 58 » TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING |
| 59 » TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED |
39 | 60 |
40 // Token is a randomized nonce that's used to verify that RPCs verify fr
om the | 61 // Token is a randomized nonce that's used to verify that RPCs verify fr
om the |
41 // expected client (the client that's currently running the Execution).
The | 62 // expected client (the client that's currently running the Execution).
The |
42 // Token has 2 modes. | 63 // Token has 2 modes. |
43 // | 64 // |
44 // When the Execution is handed to the distributor, the Token is randoml
y | 65 // When the Execution is handed to the distributor, the Token is randoml
y |
45 // generated by DM and passed to the distributor. The State of the Execu
tion | 66 // generated by DM and passed to the distributor. The State of the Execu
tion |
46 » // starts as Scheduled. This token may be used by the client to "activat
e" the | 67 » // starts as SCHEDULED. This token may be used by the client to "activat
e" the |
47 // Execution with the ActivateExecution rpc. At that point, the client | 68 // Execution with the ActivateExecution rpc. At that point, the client |
48 » // provides a new random token, the Execution State moves from Scheduled
to | 69 » // provides a new random token, the Execution State moves from SCHEDULED
to |
49 » // Running, and Token assumes the new value. As long as the Execution St
ate is | 70 » // RUNNING, and Token assumes the new value. As long as the Execution St
ate is |
50 » // running, the client may continue to use that new Token value to | 71 » // RUNNING, the client may continue to use that new Token value to |
51 // authenticate other rpc's like AddDeps and FinishAttempt. | 72 // authenticate other rpc's like AddDeps and FinishAttempt. |
52 // | 73 // |
53 » // As soon as the Execution is no longer supposed to have access, Token
will | 74 » // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FIN
ISHED |
54 » // be nil'd out. | 75 » // state, this will be nil'd out. |
55 Token []byte `gae:",noindex"` | 76 Token []byte `gae:",noindex"` |
56 } | 77 } |
57 | 78 |
58 // Revoke will null-out the Token and Put this Execution to the datastore. | 79 // MakeExecution makes a new Execution in the SCHEDULING state, with a new |
| 80 // random Token. |
| 81 func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers strin
g) *Execution { |
| 82 » now := clock.Now(c).UTC() |
| 83 » ret := &Execution{ |
| 84 » » ID: e.Id, |
| 85 » » Attempt: AttemptKeyFromID(c, e.AttemptID()), |
| 86 |
| 87 » » Created: now, |
| 88 » » Modified: now, |
| 89 |
| 90 » » DistributorConfigName: cfgName, |
| 91 » » DistributorConfigVersion: cfgVers, |
| 92 |
| 93 » » Token: MakeRandomToken(c, dm.MinimumActivationTokenLength), |
| 94 » } |
| 95 » return ret |
| 96 } |
| 97 |
| 98 // ModifyState changes the current state of this Execution and updates its |
| 99 // Modified timestamp. |
| 100 func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State)
error { |
| 101 » if e.State == newState { |
| 102 » » return nil |
| 103 » } |
| 104 » if err := e.State.Evolve(newState); err != nil { |
| 105 » » return err |
| 106 » } |
| 107 » now := clock.Now(c).UTC() |
| 108 » if now.After(e.Modified) { |
| 109 » » e.Modified = now |
| 110 » } else { |
| 111 » » // Microsecond is the smallest granularity that datastore can st
ore |
| 112 » » // timestamps, so use that to disambiguate: the goal here is tha
t any |
| 113 » » // modification always increments the modified time, and never d
ecrements |
| 114 » » // it. |
| 115 » » e.Modified = e.Modified.Add(time.Microsecond) |
| 116 » } |
| 117 » return nil |
| 118 } |
| 119 |
| 120 // MakeRandomToken creates a cryptographically random byte slice of the |
| 121 // specified length. It panics if the specified length cannot be read in full. |
| 122 func MakeRandomToken(c context.Context, l uint32) []byte { |
| 123 » rtok := make([]byte, l) |
| 124 » if _, err := cryptorand.Read(c, rtok); err != nil { |
| 125 » » panic(err) |
| 126 » } |
| 127 » return rtok |
| 128 } |
| 129 |
| 130 // Revoke will clear the Token and Put this Execution to the datastore. This |
| 131 // action requires the Execution to be in the RUNNING state, and causes it to |
| 132 // enter the STOPPING state. |
59 func (e *Execution) Revoke(c context.Context) error { | 133 func (e *Execution) Revoke(c context.Context) error { |
60 e.Token = nil | 134 e.Token = nil |
| 135 if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil { |
| 136 return err |
| 137 } |
61 return datastore.Get(c).Put(e) | 138 return datastore.Get(c).Put(e) |
62 } | 139 } |
63 | 140 |
64 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec
ution, err error) { | 141 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec
ution, err error) { |
65 ds := datastore.Get(c) | 142 ds := datastore.Get(c) |
66 | 143 |
67 a = &Attempt{ID: *eid.AttemptID()} | 144 a = &Attempt{ID: *eid.AttemptID()} |
68 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)} | 145 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)} |
69 err = datastore.Get(c).Get(a, e) | 146 err = datastore.Get(c).Get(a, e) |
70 | 147 |
71 if err != nil { | 148 if err != nil { |
72 » » err = fmt.Errorf("couldn't get attempt %v or its execution %d: %
s", a.ID, e.ID, err) | 149 » » err = grpcutil.Errf(codes.Internal, |
| 150 » » » "couldn't get attempt %v or its execution %d: %s", a.ID,
e.ID, err) |
73 return | 151 return |
74 } | 152 } |
75 | 153 |
76 if a.CurExecution != e.ID { | 154 if a.CurExecution != e.ID { |
77 err = fmt.Errorf("verifying incorrect execution %d, expected %d"
, a.CurExecution, e.ID) | 155 err = fmt.Errorf("verifying incorrect execution %d, expected %d"
, a.CurExecution, e.ID) |
78 return | 156 return |
79 } | 157 } |
80 return | 158 return |
81 } | 159 } |
82 | 160 |
(...skipping 12 matching lines...) Expand all Loading... |
95 err = errors.New("Execution is not running") | 173 err = errors.New("Execution is not running") |
96 return | 174 return |
97 } | 175 } |
98 | 176 |
99 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { | 177 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
100 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth.
Token)) | 178 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth.
Token)) |
101 } | 179 } |
102 return | 180 return |
103 } | 181 } |
104 | 182 |
| 183 func makeError(err error, msg string) error { |
| 184 code := grpcutil.Code(err) |
| 185 if code == codes.Unknown { |
| 186 code = codes.PermissionDenied |
| 187 } |
| 188 return grpcutil.Errf(code, msg) |
| 189 } |
| 190 |
105 // AuthenticateExecution verifies that the Attempt is executing, and that evkey | 191 // AuthenticateExecution verifies that the Attempt is executing, and that evkey |
106 // matches the execution key of the current Execution for this Attempt. | 192 // matches the execution key of the current Execution for this Attempt. |
107 // | 193 // |
108 // As a bonus, it will return the loaded Attempt and Execution. | 194 // As a bonus, it will return the loaded Attempt and Execution. |
109 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem
pt, e *Execution, err error) { | 195 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem
pt, e *Execution, err error) { |
110 a, e, err = verifyExecutionAndCheckExTok(c, auth) | 196 a, e, err = verifyExecutionAndCheckExTok(c, auth) |
111 if err != nil { | 197 if err != nil { |
112 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver
ify execution") | 198 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver
ify execution") |
113 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A
uth") | 199 » » err = makeError(err, "requires execution Auth") |
114 } | 200 } |
115 return a, e, err | 201 return a, e, err |
116 } | 202 } |
117 | 203 |
118 // InvalidateExecution verifies that the execution key is valid, and then | 204 // InvalidateExecution verifies that the execution key is valid, and then |
119 // revokes the execution key. | 205 // revokes the execution key. |
120 // | 206 // |
121 // As a bonus, it will return the loaded Attempt and Execution. | 207 // As a bonus, it will return the loaded Attempt and Execution. |
122 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt
, e *Execution, err error) { | 208 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt
, e *Execution, err error) { |
123 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { | 209 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { |
124 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver
ify execution") | 210 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver
ify execution") |
125 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A
uth") | 211 » » err = makeError(err, "requires execution Auth") |
126 return | 212 return |
127 } | 213 } |
128 | 214 |
129 err = e.Revoke(c) | 215 err = e.Revoke(c) |
130 if err != nil { | 216 if err != nil { |
131 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev
oke execution") | 217 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev
oke execution") |
132 » » err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth") | 218 » » err = makeError(err, "unable to invalidate Auth") |
133 } | 219 } |
134 return | 220 return |
135 } | 221 } |
136 | 222 |
137 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
ok []byte) (a *Attempt, e *Execution, err error) { | 223 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
ok []byte) (a *Attempt, e *Execution, err error) { |
138 a, e, err = loadExecution(c, auth.Id) | 224 a, e, err = loadExecution(c, auth.Id) |
139 if err != nil { | 225 if err != nil { |
140 return | 226 return |
141 } | 227 } |
142 | 228 |
143 if a.State != dm.Attempt_EXECUTING { | 229 if a.State != dm.Attempt_EXECUTING { |
144 err = errors.New("Attempt is in wrong state") | 230 err = errors.New("Attempt is in wrong state") |
145 return | 231 return |
146 } | 232 } |
147 | 233 |
148 switch e.State { | 234 switch e.State { |
149 » case dm.Execution_SCHEDULED: | 235 » case dm.Execution_SCHEDULING: |
150 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { | 236 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
151 err = errors.New("incorrect ActivationToken") | 237 err = errors.New("incorrect ActivationToken") |
152 return | 238 return |
153 } | 239 } |
154 | 240 |
155 e.State.MustEvolve(dm.Execution_RUNNING) | 241 e.State.MustEvolve(dm.Execution_RUNNING) |
156 e.Token = actTok | 242 e.Token = actTok |
157 err = datastore.Get(c).Put(e) | 243 err = datastore.Get(c).Put(e) |
| 244 logging.Infof(c, "activated execution %s: was SCHEDULING now RUN
NING", auth.Id) |
158 | 245 |
159 case dm.Execution_RUNNING: | 246 case dm.Execution_RUNNING: |
160 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { | 247 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { |
161 err = errors.New("incorrect Token") | 248 err = errors.New("incorrect Token") |
162 } | 249 } |
163 // either the Token matched, in which case this is simply a retr
y | 250 // either the Token matched, in which case this is simply a retr
y |
164 // by the same client, so there's no error, or it's wrong which
means it's | 251 // by the same client, so there's no error, or it's wrong which
means it's |
165 // a retry by a different client. | 252 // a retry by a different client. |
166 | 253 |
| 254 logging.Infof(c, "already activated execution %s", auth.Id) |
| 255 |
167 default: | 256 default: |
168 err = fmt.Errorf("Execution is in wrong state") | 257 err = fmt.Errorf("Execution is in wrong state") |
169 } | 258 } |
170 return | 259 return |
171 } | 260 } |
172 | 261 |
173 // ActivateExecution validates that the execution is unactivated and that | 262 // ActivateExecution validates that the execution is unactivated and that |
174 // the activation token matches and then sets the token to the new | 263 // the activation token matches and then sets the token to the new |
175 // value. | 264 // value. |
176 // | 265 // |
177 // It's OK to retry this. Subsequent invocations with the same Token | 266 // It's OK to retry this. Subsequent invocations with the same Token |
178 // will recognize this case and not return an error. | 267 // will recognize this case and not return an error. |
179 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by
te) (a *Attempt, e *Execution, err error) { | 268 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by
te) (a *Attempt, e *Execution, err error) { |
180 a, e, err = verifyExecutionAndActivate(c, auth, actToken) | 269 a, e, err = verifyExecutionAndActivate(c, auth, actToken) |
181 if err != nil { | 270 if err != nil { |
182 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act
ivate execution") | 271 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act
ivate execution") |
183 » » err = grpcutil.Errf(codes.Unauthenticated, "failed to activate e
xecution Auth") | 272 » » err = makeError(err, "failed to activate execution Auth") |
184 } | 273 } |
185 return a, e, err | 274 return a, e, err |
186 } | 275 } |
187 | 276 |
| 277 // GetEID gets an Execution_ID for this Execution. It panics if the Execution |
| 278 // is in an invalid state. |
| 279 func (e *Execution) GetEID() *dm.Execution_ID { |
| 280 aid := &dm.Attempt_ID{} |
| 281 if e.ID == 0 { |
| 282 panic("cannot create valid Execution_ID with 0-value ID field") |
| 283 } |
| 284 if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { |
| 285 panic(err) |
| 286 } |
| 287 return dm.NewExecutionID(aid.Quest, aid.Id, e.ID) |
| 288 } |
| 289 |
188 // ToProto returns a dm proto version of this Execution. | 290 // ToProto returns a dm proto version of this Execution. |
189 func (e *Execution) ToProto(includeID bool) *dm.Execution { | 291 func (e *Execution) ToProto(includeID bool) *dm.Execution { |
190 » ret := &dm.Execution{ | 292 » ret := &dm.Execution{Data: e.DataProto()} |
191 » » Data: &dm.Execution_Data{ | |
192 » » » State: e.State, | |
193 » » » StateReason: e.StateReason, | |
194 » » » Created: google_pb.NewTimestamp(e.Created), | |
195 » » » DistributorToken: e.DistributorToken, | |
196 » » » DistributorInfoUrl: e.DistributorURL, | |
197 » » }, | |
198 » } | |
199 if includeID { | 293 if includeID { |
200 » » aid := &dm.Attempt_ID{} | 294 » » ret.Id = e.GetEID() |
201 » » if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { | |
202 » » » panic(err) | |
203 » » } | |
204 » » ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID) | |
205 } | 295 } |
206 return ret | 296 return ret |
207 } | 297 } |
| 298 |
| 299 // DataProto returns an Execution.Data message for this Execution. |
| 300 // |
| 301 // This omits the DistributorInfo.Url portion, which must be filled in elsewhere
for |
| 302 // package cyclical import reasons. |
| 303 func (e *Execution) DataProto() (ret *dm.Execution_Data) { |
| 304 switch e.State { |
| 305 case dm.Execution_SCHEDULING: |
| 306 ret = dm.NewExecutionScheduling().Data |
| 307 case dm.Execution_RUNNING: |
| 308 ret = dm.NewExecutionRunning().Data |
| 309 case dm.Execution_STOPPING: |
| 310 ret = dm.NewExecutionStopping().Data |
| 311 case dm.Execution_FINISHED: |
| 312 ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).D
ata |
| 313 case dm.Execution_ABNORMAL_FINISHED: |
| 314 ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data |
| 315 default: |
| 316 panic(fmt.Errorf("unknown Execution_State: %s", e.State)) |
| 317 } |
| 318 ret.Created = google_pb.NewTimestamp(e.Created) |
| 319 ret.Modified = google_pb.NewTimestamp(e.Modified) |
| 320 ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{ |
| 321 ConfigName: e.DistributorConfigName, |
| 322 ConfigVersion: e.DistributorConfigVersion, |
| 323 Token: e.DistributorToken, |
| 324 } |
| 325 return ret |
| 326 } |
OLD | NEW |