OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package fake | |
6 | |
7 import ( | |
8 "fmt" | |
9 "net/http" | |
10 "sync" | |
11 "time" | |
12 | |
13 "github.com/golang/protobuf/proto" | |
14 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
15 "github.com/luci/luci-go/appengine/cmd/dm/model" | |
16 "github.com/luci/luci-go/appengine/tumble" | |
17 dm "github.com/luci/luci-go/common/api/dm/service/v1" | |
18 googlepb "github.com/luci/luci-go/common/proto/google" | |
19 "github.com/luci/luci-go/common/testing/assertions" | |
20 "github.com/luci/luci-go/server/secrets/testsecrets" | |
21 "github.com/smartystreets/goconvey/convey" | |
22 "golang.org/x/net/context" | |
23 ) | |
24 | |
25 // Setup creates a new combination of testing and context objects: | |
26 // * ttest - a tumble.Testing to allow you to control tumble's processing | |
27 // state | |
28 // * c - a context which includes a testing distributor registry, testsecrets, | |
29 // as well as everything that tumble.Testing.Context adds (datastore, | |
30 // memcache, etc.) | |
31 // * dist - a fake Distributor implementation with a RunTask method that | |
32 // allows your test to 'run' a scheduled task with the Distributor. This | |
33 // will automatically notify the deps service (by calling `fn`). | |
34 // * reg - a distributor Testing registry, pre-registerd with `dist` using the | |
35 // configuration name 'fakeDistributor'. | |
36 // | |
37 // You should pass mutate.FinishExecutionFn for fn. It's not done automatically | |
38 // in order to break an import cycle. You could provide your own, but YMMV. | |
39 // | |
40 // Usage: | |
41 // ttest, c, dist, reg := fake.Setup(mutate.FinishExecutionFn) | |
42 // s := deps.NewDecoratedServer(reg) | |
43 // # your tests | |
44 func Setup(fn distributor.FinishExecutionFn) (ttest *tumble.Testing, c context.C ontext, dist *Distributor, reg distributor.Registry) { | |
45 ttest = &tumble.Testing{} | |
46 c = ttest.Context() | |
47 c = testsecrets.Use(c) | |
48 dist = &Distributor{} | |
49 reg = distributor.NewTestingRegistry(map[string]distributor.D{ | |
50 "fakeDistributor": dist, | |
51 }, fn) | |
52 c = distributor.WithRegistry(c, reg) | |
53 return | |
54 } | |
55 | |
56 // DistributorData is the blob of data that the fake.Distributor keeps when DM | |
57 // calls its Run method. This is roughly equivalent to the state that | |
58 // a distributor (like swarming) would store in its own datastore about a job. | |
59 type DistributorData struct { | |
60 NotifyTopic string | |
61 NotifyAuth string | |
62 | |
63 Auth *dm.Execution_Auth | |
64 Desc *dm.Quest_Desc | |
65 | |
66 State distributor.PersistentState | |
67 | |
68 done bool | |
69 abnorm *dm.AbnormalFinish | |
70 } | |
71 | |
72 // Task is the detail that the distributor task would get. This is roughly | |
73 // equivalent to the input that the swarming task/recipe engine would get. | |
74 type Task struct { | |
75 Auth *dm.Execution_Auth | |
76 Desc *dm.Quest_Desc | |
77 // State is read/writable. | |
78 State distributor.PersistentState | |
79 } | |
80 | |
81 // Activate does the activation handshake with the provided DepsServer and | |
82 // returns an ActivatedTask. | |
83 func (t *Task) Activate(c context.Context, s dm.DepsServer) (*ActivatedTask, err or) { | |
84 newTok := model.MakeRandomToken(c, 32) | |
85 _, err := s.ActivateExecution(c, &dm.ActivateExecutionReq{ | |
86 Auth: t.Auth, ExecutionToken: newTok}) | |
87 if err != nil { | |
88 return nil, err | |
89 } | |
90 | |
91 return &ActivatedTask{ | |
92 s, | |
93 c, | |
94 &dm.Execution_Auth{Id: t.Auth.Id, Token: newTok}, | |
95 t.Desc, | |
96 &t.State, | |
97 }, nil | |
98 } | |
99 | |
100 // MustActivate does the same thing as Activate, but panics if err != nil. | |
101 func (t *Task) MustActivate(c context.Context, s dm.DepsServer) *ActivatedTask { | |
102 ret, err := t.Activate(c, s) | |
103 panicIf(err) | |
104 return ret | |
105 } | |
106 | |
107 // ActivatedTask is like a Task, but exists after calling Task.MustActivate, and | |
108 // contains an activated authentication token. This may be used to either add | |
109 // new dependencies or to provide a finished result. | |
110 // | |
111 // The implementation of DepsServer also automatically populates all outgoing | |
112 // RPCs with the activated Auth value. | |
113 type ActivatedTask struct { | |
114 s dm.DepsServer | |
115 c context.Context | |
116 | |
117 Auth *dm.Execution_Auth | |
118 Desc *dm.Quest_Desc | |
119 // State is read/writable. | |
120 State *distributor.PersistentState | |
121 } | |
122 | |
123 // WalkGraph calls the bound DepsServer's WalkGraph method with the activated | |
124 // Auth field. | |
125 func (t *ActivatedTask) WalkGraph(req *dm.WalkGraphReq) (*dm.GraphData, error) { | |
126 newReq := *req | |
127 newReq.Auth = t.Auth | |
128 return t.s.WalkGraph(t.c, &newReq) | |
129 } | |
130 | |
131 // EnsureGraphData calls the bound DepsServer's EnsureGraphData method with the | |
132 // activated Auth field in ForExecution. | |
133 func (t *ActivatedTask) EnsureGraphData(req *dm.EnsureGraphDataReq) (*dm.EnsureG raphDataRsp, error) { | |
134 newReq := *req | |
135 newReq.ForExecution = t.Auth | |
136 return t.s.EnsureGraphData(t.c, &newReq) | |
137 } | |
138 | |
139 // DepOn is a shorthand for EnsureGraphData which allows you to depend on | |
140 // multiple existing quests by attempt id. The definitions for these quests must | |
141 // already have been added to the deps server (probably with an EnsureGraphData | |
142 // call). | |
143 func (t *ActivatedTask) DepOn(to ...*dm.Attempt_ID) (halt bool, err error) { | |
dnj (Google)
2016/06/09 18:00:55
nit: don't use named return parameters here if you
iannucci
2016/06/15 00:45:59
Done.
| |
144 req := &dm.EnsureGraphDataReq{Attempts: dm.NewAttemptList(nil)} | |
145 req.Attempts.AddAIDs(to...) | |
146 | |
147 rsp, err := t.EnsureGraphData(req) | |
148 return rsp.ShouldHalt, err | |
149 } | |
150 | |
151 // MustDepOn is the same as DepOn but will panic if DepOn would have returned | |
152 // a non-nil error. | |
153 func (t *ActivatedTask) MustDepOn(to ...*dm.Attempt_ID) (halt bool) { | |
154 halt, err := t.DepOn(to...) | |
155 panicIf(err) | |
156 return | |
157 } | |
158 | |
159 // Finish calls FinishAttempt with the provided json body and optional | |
dnj (Google)
2016/06/09 18:00:54
nit: JSON
iannucci
2016/06/15 00:45:59
Done.
| |
160 // expiration time. | |
161 // | |
162 // This will panic if you provide more than one expiration time (so don't do | |
163 // that). | |
164 func (t *ActivatedTask) Finish(resultJSON string, expire ...time.Time) { | |
165 req := &dm.FinishAttemptReq{ | |
166 Auth: t.Auth, | |
167 JsonResult: resultJSON, | |
168 } | |
169 if len(expire) > 0 { | |
dnj (Google)
2016/06/09 18:00:55
Suggest: switch len(expire) { case 0: case 1: defa
iannucci
2016/06/15 00:45:59
Done.
| |
170 if len(expire) > 1 { | |
171 panic("may only specify 0 or 1 expire values") | |
172 } | |
173 req.Expiration = googlepb.NewTimestamp(expire[0]) | |
174 } | |
175 | |
176 _, err := t.s.FinishAttempt(t.c, req) | |
177 panicIf(err) | |
178 } | |
179 | |
180 // WalkShouldReturn is a shorthand for the package-level WalkShouldReturn which | |
181 // binds the activated auth to the WalkGraph request, but otherwise behaves | |
182 // identically. | |
183 // | |
184 // Use this method like: | |
185 // req := &dm.WalkGraphReq{...} | |
186 // So(req, activated.WalkShouldReturn, &dm.GraphData{ | |
187 // ... | |
188 // }) | |
189 func (t *ActivatedTask) WalkShouldReturn(request interface{}, expect ...interfac e{}) string { | |
190 r := *request.(*dm.WalkGraphReq) | |
191 r.Auth = t.Auth | |
192 return WalkShouldReturn(t.c, t.s)(&r, expect...) | |
193 } | |
194 | |
195 // Distributor implements distributor.D, and provides a method (RunTask) to | |
196 // allow a test to actually run a task which has been scheduled on this | |
197 // Distributor, and correctly notify the deps server that the execution is | |
198 // complete. | |
199 type Distributor struct { | |
200 // RunError can be set to make Run return this error when it's invoked. | |
201 RunError error | |
202 // These values can be set to make Run return them when it's invoked. | |
203 TimeToStart time.Duration | |
204 TimeToRun time.Duration | |
205 TimeToStop time.Duration | |
206 | |
207 sync.Mutex | |
208 tasks map[distributor.Token]*DistributorData | |
209 } | |
210 | |
211 // MkToken makes a distributor Token out of an Execution_ID. In this | |
212 // implementation of a Distributor there's a 1:1 mapping between Execution_ID | |
213 // and distributor task. This is not always the case for real distributor | |
214 // implementations. | |
215 func MkToken(eid *dm.Execution_ID) distributor.Token { | |
216 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st, | |
217 eid.Attempt, eid.Id)) | |
218 } | |
219 | |
220 // Run implements distributor.D | |
221 func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.To ken, timeToStart, timeToRun, timeToStop time.Duration, err error) { | |
222 if f.RunError != nil { | |
dnj (Google)
2016/06/09 18:00:55
Suggest:
if err = f.RunError; err != nil { return
iannucci
2016/06/15 00:45:59
Done.
| |
223 err = f.RunError | |
224 return | |
225 } | |
226 timeToStart = f.TimeToStart | |
227 timeToRun = f.TimeToRun | |
228 timeToStop = f.TimeToStop | |
229 | |
230 f.Lock() | |
dnj (Google)
2016/06/09 18:00:55
nit: do all task creation outside of Lock, save Lo
iannucci
2016/06/15 00:45:59
done. this testing code changed a bunch
| |
231 exAuth := desc.ExecutionAuth() | |
232 tok = MkToken(exAuth.Id) | |
233 if f.tasks == nil { | |
234 f.tasks = map[distributor.Token]*DistributorData{} | |
235 } | |
236 tsk := &DistributorData{ | |
237 Auth: exAuth, | |
238 Desc: desc.Payload(), | |
239 State: desc.PreviousState(), | |
240 } | |
241 tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic() | |
242 panicIf(err) | |
243 f.tasks[tok] = tsk | |
244 f.Unlock() | |
245 return | |
246 } | |
247 | |
248 // Cancel implements distributor.D | |
249 func (f *Distributor) Cancel(tok distributor.Token) (err error) { | |
250 f.Lock() | |
251 if tsk, ok := f.tasks[tok]; ok { | |
dnj (Google)
2016/06/09 18:00:54
nit: defer f.Unlock()
iannucci
2016/06/15 00:45:59
Done.
| |
252 tsk.done = true | |
253 tsk.abnorm = &dm.AbnormalFinish{ | |
254 Status: dm.AbnormalFinish_CANCELLED, | |
255 Reason: "cancelled via Cancel()"} | |
256 } else { | |
257 err = fmt.Errorf("MISSING task %s", tok) | |
dnj (Google)
2016/06/09 18:00:55
nit: %q
iannucci
2016/06/15 00:45:59
Done.
| |
258 } | |
259 f.Unlock() | |
260 return | |
261 } | |
262 | |
263 // GetStatus implements distributor.D | |
264 func (f *Distributor) GetStatus(tok distributor.Token) (rslt *distributor.TaskRe sult, err error) { | |
265 f.Lock() | |
dnj (Google)
2016/06/09 18:00:55
nit: defer f.Unlock()
iannucci
2016/06/15 00:45:59
Done.
| |
266 if tsk, ok := f.tasks[tok]; ok { | |
267 if tsk.done { | |
268 if tsk.abnorm != nil { | |
269 rslt = &distributor.TaskResult{AbnormalFinish: t sk.abnorm} | |
270 } else { | |
271 rslt = &distributor.TaskResult{PersistentState: tsk.State} | |
272 } | |
273 } | |
274 } else { | |
275 rslt = &distributor.TaskResult{ | |
276 AbnormalFinish: &dm.AbnormalFinish{ | |
277 Status: dm.AbnormalFinish_MISSING, | |
278 Reason: fmt.Sprintf("unknown token: %s", tok)}, | |
279 } | |
280 } | |
281 f.Unlock() | |
282 return | |
283 } | |
284 | |
285 // InfoURL implements distributor.D | |
286 func (f *Distributor) InfoURL(tok distributor.Token) string { | |
287 return "https://info.example.com/" + string(tok) | |
288 } | |
289 | |
290 // HandleNotification implements distributor.D | |
291 func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *dis tributor.TaskResult, err error) { | |
292 return f.GetStatus(distributor.Token(n.Attrs["token"])) | |
293 } | |
294 | |
295 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most | |
296 // tests. It could be implemented if some new test required it, however. | |
297 func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notif ication, error) { | |
298 panic("not implemented") | |
299 } | |
300 | |
301 // Validate implements distributor.D (by returning a nil error for every | |
302 // payload). | |
303 func (f *Distributor) Validate(payload string) error { | |
304 return nil | |
305 } | |
306 | |
307 // RunTask allows you to run the task associated with the provided execution id. | |
308 // | |
309 // If the task corresponding to `eid` returns an error, or if the distributor | |
310 // itself actually has an error, this method will return an error. Notably, if | |
311 // `cb` returns an error, it will simply mark the corresponding task as FAILED, | |
312 // but will return nil here. | |
313 // | |
314 // If the task exists and hasn't been run yet, cb will be called, and can do | |
315 // anything that you may want to a test to do. Think of the callback as the | |
316 // recipe engine; it has the opportunity to do anything it wants to, interact | |
317 // with the deps server (or not), succeed (or not), etc. | |
318 // | |
319 // If the callback needs to maintain state between executions, Task.State is | |
320 // read+write; when the callback exits, the final value of Task.State will be | |
321 // passed back to the DM instance under test. A re-execution of the attempt will | |
322 // start with the new value. | |
323 func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) { | |
324 tok := MkToken(eid) | |
325 | |
326 f.Lock() | |
327 tsk := f.tasks[tok] | |
328 if tsk == nil { | |
329 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok) | |
330 } else { | |
331 if tsk.done { | |
332 err = fmt.Errorf("cannot RunTask(%q): running twice", to k) | |
333 } else { | |
334 tsk.done = true | |
335 } | |
336 } | |
337 f.Unlock() | |
338 | |
339 if err != nil { | |
340 return | |
341 } | |
342 | |
343 abnorm := (*dm.AbnormalFinish)(nil) | |
344 | |
345 usrTsk := &Task{ | |
346 tsk.Auth, | |
347 tsk.Desc, | |
348 tsk.State, | |
349 } | |
350 | |
351 defer func() { | |
352 f.Lock() | |
353 { | |
354 tsk.abnorm = abnorm | |
355 tsk.State = usrTsk.State | |
356 | |
357 if r := recover(); r != nil { | |
358 tsk.abnorm = &dm.AbnormalFinish{ | |
359 Status: dm.AbnormalFinish_CRASHED, | |
360 Reason: fmt.Sprintf("caught panic: %q", r), | |
361 } | |
362 } | |
363 } | |
364 f.Unlock() | |
365 | |
366 err = tumble.RunMutation(c, &distributor.NotifyExecution{ | |
367 CfgName: "fakeDistributor", | |
368 Notification: &distributor.Notification{ | |
369 ID: tsk.Auth.Id, | |
370 Attrs: map[string]string{"token": string(tok)}}, | |
371 }) | |
372 }() | |
373 | |
374 err = cb(usrTsk) | |
375 if err != nil { | |
376 err = nil | |
377 abnorm = &dm.AbnormalFinish{ | |
378 Status: dm.AbnormalFinish_FAILED, | |
379 Reason: fmt.Sprintf("cb error: %q", err), | |
380 } | |
381 } | |
382 return | |
383 } | |
384 | |
385 func panicIf(err error) { | |
386 if err != nil { | |
387 panic(err) | |
388 } | |
389 } | |
390 | |
391 var _ distributor.D = (*Distributor)(nil) | |
392 | |
393 // QuestDesc generates a normalized generic QuestDesc of the form: | |
394 // Quest_Desc{ | |
395 // DistributorConfigName: "fakeDistributor", | |
396 // JsonPayload: `{"name":"$name"}`, | |
397 // } | |
398 func QuestDesc(name string) *dm.Quest_Desc { | |
399 desc := &dm.Quest_Desc{ | |
400 DistributorConfigName: "fakeDistributor", | |
401 JsonPayload: fmt.Sprintf(`{"name": "%s"}`, name), | |
dnj (Google)
2016/06/09 18:00:55
This will have weird escaping issues if "name" has
iannucci
2016/06/15 00:45:59
Done.
| |
402 } | |
403 panicIf(desc.Normalize()) | |
404 return desc | |
405 } | |
406 | |
407 // WalkShouldReturn is a convey-style assertion factory to assert that a given | |
408 // WalkGraph request object results in the provided GraphData. | |
409 // | |
410 // If keepTimestamps (a singular, optional boolean) is provided and true, | |
411 // WalkShouldReturn will not remove timestamps from the compared GraphData. If | |
412 // it is absent or false, GraphData.PurgeTimestamps will be called on the | |
413 // returned GraphData before comparing it to the expected value. | |
414 // | |
415 // Use this function like: | |
416 // req := &dm.WalkGraphReq{...} | |
417 // So(req, WalkShouldReturn(c, s), &dm.GraphData{ | |
418 // ... | |
419 // }) | |
420 func WalkShouldReturn(c context.Context, s dm.DepsServer, keepTimestamps ...bool ) func(request interface{}, expect ...interface{}) string { | |
421 kt := len(keepTimestamps) > 0 && keepTimestamps[0] | |
422 if len(keepTimestamps) > 1 { | |
423 panic("may only specify 0 or 1 keepTimestamps values") | |
424 } | |
425 | |
426 normalize := func(gd *dm.GraphData) *dm.GraphData { | |
427 data, err := proto.Marshal(gd) | |
428 panicIf(err) | |
429 ret := &dm.GraphData{} | |
430 | |
431 panicIf(proto.Unmarshal(data, ret)) | |
432 | |
433 if !kt { | |
434 ret.PurgeTimestamps() | |
435 } | |
436 return ret | |
437 } | |
438 | |
439 return func(request interface{}, expect ...interface{}) string { | |
440 r := request.(*dm.WalkGraphReq) | |
dnj (Google)
2016/06/09 18:00:55
I guess you should check "expect" length here.
iannucci
2016/06/15 00:45:59
Done.
| |
441 e := expect[0].(*dm.GraphData) | |
442 ret, err := s.WalkGraph(c, r) | |
443 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" { | |
444 return nilExpect | |
445 } | |
446 return convey.ShouldResemble(normalize(ret), e) | |
447 } | |
448 } | |
OLD | NEW |