OLD | NEW |
---|---|
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package model | 5 package model |
6 | 6 |
7 import ( | 7 import ( |
8 "crypto/subtle" | 8 "crypto/subtle" |
9 "encoding/hex" | 9 "encoding/hex" |
10 "errors" | 10 "errors" |
11 "fmt" | 11 "fmt" |
12 "time" | 12 "time" |
13 | 13 |
14 "google.golang.org/grpc/codes" | 14 "google.golang.org/grpc/codes" |
15 | 15 |
16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
17 | 17 |
18 "github.com/luci/gae/service/datastore" | 18 "github.com/luci/gae/service/datastore" |
19 » "github.com/luci/luci-go/common/api/dm/service/v1" | 19 » dm "github.com/luci/luci-go/common/api/dm/service/v1" |
20 » "github.com/luci/luci-go/common/clock" | |
21 » "github.com/luci/luci-go/common/cryptorand" | |
20 "github.com/luci/luci-go/common/grpcutil" | 22 "github.com/luci/luci-go/common/grpcutil" |
21 "github.com/luci/luci-go/common/logging" | 23 "github.com/luci/luci-go/common/logging" |
22 google_pb "github.com/luci/luci-go/common/proto/google" | 24 google_pb "github.com/luci/luci-go/common/proto/google" |
23 ) | 25 ) |
24 | 26 |
25 const ek = logging.ErrorKey | 27 const ek = logging.ErrorKey |
26 | 28 |
27 // Execution represents either an ongoing execution on the Quest's specified | 29 // Execution represents either an ongoing execution on the Quest's specified |
28 // distributor, or is a placeholder for an already-completed Execution. | 30 // distributor, or is a placeholder for an already-completed Execution. |
29 type Execution struct { | 31 type Execution struct { |
30 ID uint32 `gae:"$id"` | 32 ID uint32 `gae:"$id"` |
31 Attempt *datastore.Key `gae:"$parent"` | 33 Attempt *datastore.Key `gae:"$parent"` |
32 | 34 |
33 » State dm.Execution_State | 35 » Created time.Time |
34 » StateReason string `gae:",noindex"` | 36 » Modified time.Time |
35 | 37 |
36 » Created time.Time | 38 » DistributorConfigName string |
37 » DistributorToken string | 39 » DistributorConfigVersion string |
38 » DistributorURL string `gae:",noindex"` | 40 » DistributorToken string |
41 | |
42 » State dm.Execution_State | |
43 | |
44 » // Only valid in the ABNORMAL_FINISHED state. | |
45 » AbnormalFinish dm.AbnormalFinish | |
46 | |
47 » // Only valid in the FINISHED state. | |
48 » ResultPersistentState string | |
49 | |
50 » // These are DM's internal mechanism for performing timeout actions on | |
51 » // Executions. | |
52 » // | |
53 » // The TimeTo* variables are filled in by the distributor when this Exec ution | |
54 » // is created. | |
55 » // | |
56 » // The Timeout is only active when the Execution is in a non-terminal st ate. | |
57 » TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING | |
58 » TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING | |
59 » TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED | |
iannucci
2016/06/08 02:54:24
timeouts!
| |
39 | 60 |
40 // Token is a randomized nonce that's used to verify that RPCs verify fr om the | 61 // Token is a randomized nonce that's used to verify that RPCs verify fr om the |
41 // expected client (the client that's currently running the Execution). The | 62 // expected client (the client that's currently running the Execution). The |
42 // Token has 2 modes. | 63 // Token has 2 modes. |
43 // | 64 // |
44 // When the Execution is handed to the distributor, the Token is randoml y | 65 // When the Execution is handed to the distributor, the Token is randoml y |
45 // generated by DM and passed to the distributor. The State of the Execu tion | 66 // generated by DM and passed to the distributor. The State of the Execu tion |
46 » // starts as Scheduled. This token may be used by the client to "activat e" the | 67 » // starts as SCHEDULED. This token may be used by the client to "activat e" the |
47 // Execution with the ActivateExecution rpc. At that point, the client | 68 // Execution with the ActivateExecution rpc. At that point, the client |
48 » // provides a new random token, the Execution State moves from Scheduled to | 69 » // provides a new random token, the Execution State moves from SCHEDULED to |
49 » // Running, and Token assumes the new value. As long as the Execution St ate is | 70 » // RUNNING, and Token assumes the new value. As long as the Execution St ate is |
50 » // running, the client may continue to use that new Token value to | 71 » // RUNNING, the client may continue to use that new Token value to |
51 // authenticate other rpc's like AddDeps and FinishAttempt. | 72 // authenticate other rpc's like AddDeps and FinishAttempt. |
52 // | 73 // |
53 » // As soon as the Execution is no longer supposed to have access, Token will | 74 » // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FIN ISHED |
54 » // be nil'd out. | 75 » // state, this will be nil'd out. |
55 Token []byte `gae:",noindex"` | 76 Token []byte `gae:",noindex"` |
56 } | 77 } |
57 | 78 |
58 // Revoke will null-out the Token and Put this Execution to the datastore. | 79 // MakeExecution makes a new Execution in the SCHEDULING state, with a new |
80 // random Token. | |
81 func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers strin g) *Execution { | |
82 » now := clock.Now(c).UTC() | |
83 » ret := &Execution{ | |
84 » » ID: e.Id, | |
85 » » Attempt: datastore.Get(c).MakeKey("Attempt", e.AttemptID().DMEnc oded()), | |
dnj (Google)
2016/06/09 18:00:56
AttemptKeyFromID?
iannucci
2016/06/15 00:46:01
good catch, I tried to find these all but apparent
| |
86 | |
87 » » Created: now, | |
88 » » Modified: now, | |
89 | |
90 » » DistributorConfigName: cfgName, | |
91 » » DistributorConfigVersion: cfgVers, | |
92 | |
93 » » Token: MakeRandomToken(c, dm.MinimumActivationTokenLength), | |
94 » } | |
95 » return ret | |
96 } | |
97 | |
98 // ModifyState changes the current state of this Execution and updates its | |
99 // Modified timestamp. | |
100 func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error { | |
101 » if e.State == newState { | |
102 » » return nil | |
103 » } | |
104 » if err := e.State.Evolve(newState); err != nil { | |
105 » » return err | |
106 » } | |
107 » now := clock.Now(c).UTC() | |
108 » if now.After(e.Modified) { | |
109 » » e.Modified = now | |
110 » } else { | |
111 » » // Microsecond is the smallest granularity that datastore can st ore | |
112 » » // timestamps, so use that to disambiguate: the goal here is tha t any | |
113 » » // modification always increments the modified time, and never d ecrements | |
114 » » // it. | |
115 » » e.Modified = e.Modified.Add(time.Microsecond) | |
116 » } | |
117 » return nil | |
118 } | |
119 | |
120 // MakeRandomToken creates a cryptographically random byte slice of the | |
121 // specified length. It panics if the specified length cannot be read in full. | |
122 func MakeRandomToken(c context.Context, l uint32) []byte { | |
123 » rtok := make([]byte, l) | |
124 » if _, err := cryptorand.Read(c, rtok); err != nil { | |
125 » » panic(err) | |
126 » } | |
127 » return rtok | |
128 } | |
129 | |
130 // Revoke will null-out the Token and Put this Execution to the datastore. This | |
dnj (Google)
2016/06/09 18:00:56
nit: "nil-out" in above documentation, "null-out"
iannucci
2016/06/15 00:46:01
Done.
| |
131 // action requires the Execution to be in the RUNNING state, and causes it to | |
132 // enter the STOPPING state. | |
59 func (e *Execution) Revoke(c context.Context) error { | 133 func (e *Execution) Revoke(c context.Context) error { |
60 e.Token = nil | 134 e.Token = nil |
135 if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil { | |
136 return err | |
137 } | |
61 return datastore.Get(c).Put(e) | 138 return datastore.Get(c).Put(e) |
62 } | 139 } |
63 | 140 |
64 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec ution, err error) { | 141 func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec ution, err error) { |
65 ds := datastore.Get(c) | 142 ds := datastore.Get(c) |
66 | 143 |
67 a = &Attempt{ID: *eid.AttemptID()} | 144 a = &Attempt{ID: *eid.AttemptID()} |
68 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)} | 145 e = &Execution{ID: eid.Id, Attempt: ds.KeyForObj(a)} |
69 err = datastore.Get(c).GetMulti([]interface{}{a, e}) | 146 err = datastore.Get(c).GetMulti([]interface{}{a, e}) |
70 | 147 |
71 if err != nil { | 148 if err != nil { |
72 » » err = fmt.Errorf("couldn't get attempt %v or its execution %d: % s", a.ID, e.ID, err) | 149 » » err = grpcutil.Errf(codes.Internal, |
150 » » » "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) | |
73 return | 151 return |
74 } | 152 } |
75 | 153 |
76 if a.CurExecution != e.ID { | 154 if a.CurExecution != e.ID { |
77 err = fmt.Errorf("verifying incorrect execution %d, expected %d" , a.CurExecution, e.ID) | 155 err = fmt.Errorf("verifying incorrect execution %d, expected %d" , a.CurExecution, e.ID) |
78 return | 156 return |
79 } | 157 } |
80 return | 158 return |
81 } | 159 } |
82 | 160 |
(...skipping 12 matching lines...) Expand all Loading... | |
95 err = errors.New("Execution is not running") | 173 err = errors.New("Execution is not running") |
96 return | 174 return |
97 } | 175 } |
98 | 176 |
99 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { | 177 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
100 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth. Token)) | 178 err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth. Token)) |
101 } | 179 } |
102 return | 180 return |
103 } | 181 } |
104 | 182 |
183 func makeError(err error, msg string) error { | |
184 code := grpcutil.Code(err) | |
185 if code == codes.Unknown { | |
186 code = codes.Unauthenticated | |
dnj (Google)
2016/06/09 18:00:56
Does this make sense? Unauthenticated specifically
iannucci
2016/06/15 00:46:01
Yeah, basically: unless we specifically tagged thi
| |
187 } | |
188 return grpcutil.Errf(code, msg) | |
189 } | |
190 | |
105 // AuthenticateExecution verifies that the Attempt is executing, and that evkey | 191 // AuthenticateExecution verifies that the Attempt is executing, and that evkey |
106 // matches the execution key of the current Execution for this Attempt. | 192 // matches the execution key of the current Execution for this Attempt. |
107 // | 193 // |
108 // As a bonus, it will return the loaded Attempt and Execution. | 194 // As a bonus, it will return the loaded Attempt and Execution. |
109 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem pt, e *Execution, err error) { | 195 func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem pt, e *Execution, err error) { |
110 a, e, err = verifyExecutionAndCheckExTok(c, auth) | 196 a, e, err = verifyExecutionAndCheckExTok(c, auth) |
111 if err != nil { | 197 if err != nil { |
112 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") | 198 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") |
113 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A uth") | 199 » » err = makeError(err, "requires execution Auth") |
114 } | 200 } |
115 return a, e, err | 201 return a, e, err |
116 } | 202 } |
117 | 203 |
118 // InvalidateExecution verifies that the execution key is valid, and then | 204 // InvalidateExecution verifies that the execution key is valid, and then |
119 // revokes the execution key. | 205 // revokes the execution key. |
120 // | 206 // |
121 // As a bonus, it will return the loaded Attempt and Execution. | 207 // As a bonus, it will return the loaded Attempt and Execution. |
122 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt , e *Execution, err error) { | 208 func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt , e *Execution, err error) { |
123 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { | 209 if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { |
124 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") | 210 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to ver ify execution") |
125 » » err = grpcutil.Errf(codes.Unauthenticated, "requires execution A uth") | 211 » » err = makeError(err, "requires execution Auth") |
126 return | 212 return |
127 } | 213 } |
128 | 214 |
129 err = e.Revoke(c) | 215 err = e.Revoke(c) |
130 if err != nil { | 216 if err != nil { |
131 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev oke execution") | 217 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to rev oke execution") |
132 » » err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth") | 218 » » err = makeError(err, "unable to invalidate Auth") |
133 } | 219 } |
134 return | 220 return |
135 } | 221 } |
136 | 222 |
137 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT ok []byte) (a *Attempt, e *Execution, err error) { | 223 func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT ok []byte) (a *Attempt, e *Execution, err error) { |
138 a, e, err = loadExecution(c, auth.Id) | 224 a, e, err = loadExecution(c, auth.Id) |
139 if err != nil { | 225 if err != nil { |
140 return | 226 return |
141 } | 227 } |
142 | 228 |
143 if a.State != dm.Attempt_EXECUTING { | 229 if a.State != dm.Attempt_EXECUTING { |
144 err = errors.New("Attempt is in wrong state") | 230 err = errors.New("Attempt is in wrong state") |
145 return | 231 return |
146 } | 232 } |
147 | 233 |
148 switch e.State { | 234 switch e.State { |
149 » case dm.Execution_SCHEDULED: | 235 » case dm.Execution_SCHEDULING: |
150 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { | 236 if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
151 err = errors.New("incorrect ActivationToken") | 237 err = errors.New("incorrect ActivationToken") |
152 return | 238 return |
153 } | 239 } |
154 | 240 |
155 e.State.MustEvolve(dm.Execution_RUNNING) | 241 e.State.MustEvolve(dm.Execution_RUNNING) |
156 e.Token = actTok | 242 e.Token = actTok |
157 err = datastore.Get(c).Put(e) | 243 err = datastore.Get(c).Put(e) |
244 logging.Infof(c, "activated execution %s: was SCHEDULING now RUN NING", auth.Id) | |
158 | 245 |
159 case dm.Execution_RUNNING: | 246 case dm.Execution_RUNNING: |
160 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { | 247 if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { |
161 err = errors.New("incorrect Token") | 248 err = errors.New("incorrect Token") |
162 } | 249 } |
163 // either the Token matched, in which case this is simply a retr y | 250 // either the Token matched, in which case this is simply a retr y |
164 // by the same client, so there's no error, or it's wrong which means it's | 251 // by the same client, so there's no error, or it's wrong which means it's |
165 // a retry by a different client. | 252 // a retry by a different client. |
166 | 253 |
254 logging.Infof(c, "already activated execution %s", auth.Id) | |
255 | |
167 default: | 256 default: |
168 err = fmt.Errorf("Execution is in wrong state") | 257 err = fmt.Errorf("Execution is in wrong state") |
169 } | 258 } |
170 return | 259 return |
171 } | 260 } |
172 | 261 |
173 // ActivateExecution validates that the execution is unactivated and that | 262 // ActivateExecution validates that the execution is unactivated and that |
174 // the activation token matches and then sets the token to the new | 263 // the activation token matches and then sets the token to the new |
175 // value. | 264 // value. |
176 // | 265 // |
177 // It's OK to retry this. Subsequent invocations with the same Token | 266 // It's OK to retry this. Subsequent invocations with the same Token |
178 // will recognize this case and not return an error. | 267 // will recognize this case and not return an error. |
179 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by te) (a *Attempt, e *Execution, err error) { | 268 func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by te) (a *Attempt, e *Execution, err error) { |
180 a, e, err = verifyExecutionAndActivate(c, auth, actToken) | 269 a, e, err = verifyExecutionAndActivate(c, auth, actToken) |
181 if err != nil { | 270 if err != nil { |
182 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act ivate execution") | 271 logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to act ivate execution") |
183 » » err = grpcutil.Errf(codes.Unauthenticated, "failed to activate e xecution Auth") | 272 » » err = makeError(err, "failed to activate execution Auth") |
184 } | 273 } |
185 return a, e, err | 274 return a, e, err |
186 } | 275 } |
187 | 276 |
277 // GetEID gets an Execution_ID for this Execution. It panics if the Execution | |
278 // is in an invalid state. | |
279 func (e *Execution) GetEID() *dm.Execution_ID { | |
280 aid := &dm.Attempt_ID{} | |
281 if e.ID == 0 { | |
282 panic("cannot create valid Execution_ID with 0-value ID field") | |
283 } | |
284 if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { | |
285 panic(err) | |
286 } | |
287 return dm.NewExecutionID(aid.Quest, aid.Id, e.ID) | |
288 } | |
289 | |
188 // ToProto returns a dm proto version of this Execution. | 290 // ToProto returns a dm proto version of this Execution. |
189 func (e *Execution) ToProto(includeID bool) *dm.Execution { | 291 func (e *Execution) ToProto(includeID bool) *dm.Execution { |
190 » ret := &dm.Execution{ | 292 » ret := &dm.Execution{Data: e.DataProto()} |
191 » » Data: &dm.Execution_Data{ | |
192 » » » State: e.State, | |
193 » » » StateReason: e.StateReason, | |
194 » » » Created: google_pb.NewTimestamp(e.Created), | |
195 » » » DistributorToken: e.DistributorToken, | |
196 » » » DistributorInfoUrl: e.DistributorURL, | |
197 » » }, | |
198 » } | |
199 if includeID { | 293 if includeID { |
200 » » aid := &dm.Attempt_ID{} | 294 » » ret.Id = e.GetEID() |
201 » » if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { | |
202 » » » panic(err) | |
203 » » } | |
204 » » ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID) | |
205 } | 295 } |
206 return ret | 296 return ret |
207 } | 297 } |
298 | |
299 // DataProto returns an Execution.Data message for this Execution. | |
300 // | |
301 // This omits the DistributorInfo.Url portion, which must be filled in elsewhere for | |
302 // package cyclical import reasons. | |
303 func (e *Execution) DataProto() (ret *dm.Execution_Data) { | |
304 switch e.State { | |
305 case dm.Execution_SCHEDULING: | |
306 ret = dm.NewExecutionScheduling().Data | |
307 case dm.Execution_RUNNING: | |
308 ret = dm.NewExecutionRunning().Data | |
309 case dm.Execution_STOPPING: | |
310 ret = dm.NewExecutionStopping().Data | |
311 case dm.Execution_FINISHED: | |
312 ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).D ata | |
313 case dm.Execution_ABNORMAL_FINISHED: | |
314 ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data | |
315 default: | |
316 panic(fmt.Errorf("unknown Execution_State: %s", e.State)) | |
317 } | |
318 ret.Created = google_pb.NewTimestamp(e.Created) | |
319 ret.Modified = google_pb.NewTimestamp(e.Modified) | |
320 ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{ | |
321 ConfigName: e.DistributorConfigName, | |
322 ConfigVersion: e.DistributorConfigVersion, | |
323 Token: e.DistributorToken, | |
324 } | |
325 return ret | |
326 } | |
OLD | NEW |