Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(252)

Side by Side Diff: appengine/cmd/dm/distributor/fake/fake.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package fake
6
7 import (
8 "fmt"
9 "net/http"
10 "sync"
11 "time"
12
13 "github.com/golang/protobuf/proto"
14 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
15 "github.com/luci/luci-go/appengine/cmd/dm/model"
16 "github.com/luci/luci-go/appengine/tumble"
17 dm "github.com/luci/luci-go/common/api/dm/service/v1"
18 googlepb "github.com/luci/luci-go/common/proto/google"
19 "github.com/luci/luci-go/common/testing/assertions"
20 "github.com/luci/luci-go/server/secrets/testsecrets"
21 "github.com/smartystreets/goconvey/convey"
22 "golang.org/x/net/context"
23 )
24
25 // Setup creates a new combination of testing and context objects:
26 // * ttest - a tumble.Testing to allow you to control tumble's processing
27 // state
28 // * c - a context which includes a testing distributor registry, testsecrets,
29 // as well as everything that tumble.Testing.Context adds (datastore,
30 // memcache, etc.)
31 // * dist - a fake Distributor implementation with a RunTask method that
32 // allows your test to 'run' a scheduled task with the Distributor. This
33 // will automatically notify the deps service (by calling `fn`).
34 // * reg - a distributor Testing registry, pre-registerd with `dist` using the
35 // configuration name 'fakeDistributor'.
36 //
37 // You should pass mutate.FinishExecutionFn for fn. It's not done automatically
38 // in order to break an import cycle. You could provide your own, but YMMV.
39 //
40 // Usage:
41 // ttest, c, dist, reg := fake.Setup(mutate.FinishExecutionFn)
42 // s := deps.NewDecoratedServer(reg)
43 // # your tests
44 func Setup(fn distributor.FinishExecutionFn) (ttest *tumble.Testing, c context.C ontext, dist *Distributor, reg distributor.Registry) {
45 ttest = &tumble.Testing{}
46 c = ttest.Context()
47 c = testsecrets.Use(c)
48 dist = &Distributor{}
49 reg = distributor.NewTestingRegistry(map[string]distributor.D{
50 "fakeDistributor": dist,
51 }, fn)
52 c = distributor.WithRegistry(c, reg)
53 return
54 }
55
56 // DistributorData is the blob of data that the fake.Distributor keeps when DM
57 // calls its Run method. This is roughly equivalent to the state that
58 // a distributor (like swarming) would store in its own datastore about a job.
59 type DistributorData struct {
60 NotifyTopic string
61 NotifyAuth string
62
63 Auth *dm.Execution_Auth
64 Desc *dm.Quest_Desc
65
66 State distributor.PersistentState
67
68 done bool
69 abnorm *dm.AbnormalFinish
70 }
71
72 // Task is the detail that the distributor task would get. This is roughly
73 // equivalent to the input that the swarming task/recipe engine would get.
74 type Task struct {
75 Auth *dm.Execution_Auth
76 Desc *dm.Quest_Desc
77 // State is read/writable.
78 State distributor.PersistentState
79 }
80
81 // Activate does the activation handshake with the provided DepsServer and
82 // returns an ActivatedTask.
83 func (t *Task) Activate(c context.Context, s dm.DepsServer) (*ActivatedTask, err or) {
84 newTok := model.MakeRandomToken(c, 32)
85 _, err := s.ActivateExecution(c, &dm.ActivateExecutionReq{
86 Auth: t.Auth, ExecutionToken: newTok})
87 if err != nil {
88 return nil, err
89 }
90
91 return &ActivatedTask{
92 s,
93 c,
94 &dm.Execution_Auth{Id: t.Auth.Id, Token: newTok},
95 t.Desc,
96 &t.State,
97 }, nil
98 }
99
100 // MustActivate does the same thing as Activate, but panics if err != nil.
101 func (t *Task) MustActivate(c context.Context, s dm.DepsServer) *ActivatedTask {
102 ret, err := t.Activate(c, s)
103 panicIf(err)
104 return ret
105 }
106
107 // ActivatedTask is like a Task, but exists after calling Task.MustActivate, and
108 // contains an activated authentication token. This may be used to either add
109 // new dependencies or to provide a finished result.
110 //
111 // The implementation of DepsServer also automatically populates all outgoing
112 // RPCs with the activated Auth value.
113 type ActivatedTask struct {
114 s dm.DepsServer
115 c context.Context
116
117 Auth *dm.Execution_Auth
118 Desc *dm.Quest_Desc
119 // State is read/writable.
120 State *distributor.PersistentState
121 }
122
123 // WalkGraph calls the bound DepsServer's WalkGraph method with the activated
124 // Auth field.
125 func (t *ActivatedTask) WalkGraph(req *dm.WalkGraphReq) (*dm.GraphData, error) {
126 newReq := *req
127 newReq.Auth = t.Auth
128 return t.s.WalkGraph(t.c, &newReq)
129 }
130
131 // EnsureGraphData calls the bound DepsServer's EnsureGraphData method with the
132 // activated Auth field in ForExecution.
133 func (t *ActivatedTask) EnsureGraphData(req *dm.EnsureGraphDataReq) (*dm.EnsureG raphDataRsp, error) {
134 newReq := *req
135 newReq.ForExecution = t.Auth
136 return t.s.EnsureGraphData(t.c, &newReq)
137 }
138
139 // DepOn is a shorthand for EnsureGraphData which allows you to depend on
140 // multiple existing quests by attempt id. The definitions for these quests must
141 // already have been added to the deps server (probably with an EnsureGraphData
142 // call).
143 func (t *ActivatedTask) DepOn(to ...*dm.Attempt_ID) (halt bool, err error) {
dnj (Google) 2016/06/09 18:00:55 nit: don't use named return parameters here if you
iannucci 2016/06/15 00:45:59 Done.
144 req := &dm.EnsureGraphDataReq{Attempts: dm.NewAttemptList(nil)}
145 req.Attempts.AddAIDs(to...)
146
147 rsp, err := t.EnsureGraphData(req)
148 return rsp.ShouldHalt, err
149 }
150
151 // MustDepOn is the same as DepOn but will panic if DepOn would have returned
152 // a non-nil error.
153 func (t *ActivatedTask) MustDepOn(to ...*dm.Attempt_ID) (halt bool) {
154 halt, err := t.DepOn(to...)
155 panicIf(err)
156 return
157 }
158
159 // Finish calls FinishAttempt with the provided json body and optional
dnj (Google) 2016/06/09 18:00:54 nit: JSON
iannucci 2016/06/15 00:45:59 Done.
160 // expiration time.
161 //
162 // This will panic if you provide more than one expiration time (so don't do
163 // that).
164 func (t *ActivatedTask) Finish(resultJSON string, expire ...time.Time) {
165 req := &dm.FinishAttemptReq{
166 Auth: t.Auth,
167 JsonResult: resultJSON,
168 }
169 if len(expire) > 0 {
dnj (Google) 2016/06/09 18:00:55 Suggest: switch len(expire) { case 0: case 1: defa
iannucci 2016/06/15 00:45:59 Done.
170 if len(expire) > 1 {
171 panic("may only specify 0 or 1 expire values")
172 }
173 req.Expiration = googlepb.NewTimestamp(expire[0])
174 }
175
176 _, err := t.s.FinishAttempt(t.c, req)
177 panicIf(err)
178 }
179
180 // WalkShouldReturn is a shorthand for the package-level WalkShouldReturn which
181 // binds the activated auth to the WalkGraph request, but otherwise behaves
182 // identically.
183 //
184 // Use this method like:
185 // req := &dm.WalkGraphReq{...}
186 // So(req, activated.WalkShouldReturn, &dm.GraphData{
187 // ...
188 // })
189 func (t *ActivatedTask) WalkShouldReturn(request interface{}, expect ...interfac e{}) string {
190 r := *request.(*dm.WalkGraphReq)
191 r.Auth = t.Auth
192 return WalkShouldReturn(t.c, t.s)(&r, expect...)
193 }
194
195 // Distributor implements distributor.D, and provides a method (RunTask) to
196 // allow a test to actually run a task which has been scheduled on this
197 // Distributor, and correctly notify the deps server that the execution is
198 // complete.
199 type Distributor struct {
200 // RunError can be set to make Run return this error when it's invoked.
201 RunError error
202 // These values can be set to make Run return them when it's invoked.
203 TimeToStart time.Duration
204 TimeToRun time.Duration
205 TimeToStop time.Duration
206
207 sync.Mutex
208 tasks map[distributor.Token]*DistributorData
209 }
210
211 // MkToken makes a distributor Token out of an Execution_ID. In this
212 // implementation of a Distributor there's a 1:1 mapping between Execution_ID
213 // and distributor task. This is not always the case for real distributor
214 // implementations.
215 func MkToken(eid *dm.Execution_ID) distributor.Token {
216 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st,
217 eid.Attempt, eid.Id))
218 }
219
220 // Run implements distributor.D
221 func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.To ken, timeToStart, timeToRun, timeToStop time.Duration, err error) {
222 if f.RunError != nil {
dnj (Google) 2016/06/09 18:00:55 Suggest: if err = f.RunError; err != nil { return
iannucci 2016/06/15 00:45:59 Done.
223 err = f.RunError
224 return
225 }
226 timeToStart = f.TimeToStart
227 timeToRun = f.TimeToRun
228 timeToStop = f.TimeToStop
229
230 f.Lock()
dnj (Google) 2016/06/09 18:00:55 nit: do all task creation outside of Lock, save Lo
iannucci 2016/06/15 00:45:59 done. this testing code changed a bunch
231 exAuth := desc.ExecutionAuth()
232 tok = MkToken(exAuth.Id)
233 if f.tasks == nil {
234 f.tasks = map[distributor.Token]*DistributorData{}
235 }
236 tsk := &DistributorData{
237 Auth: exAuth,
238 Desc: desc.Payload(),
239 State: desc.PreviousState(),
240 }
241 tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic()
242 panicIf(err)
243 f.tasks[tok] = tsk
244 f.Unlock()
245 return
246 }
247
248 // Cancel implements distributor.D
249 func (f *Distributor) Cancel(tok distributor.Token) (err error) {
250 f.Lock()
251 if tsk, ok := f.tasks[tok]; ok {
dnj (Google) 2016/06/09 18:00:54 nit: defer f.Unlock()
iannucci 2016/06/15 00:45:59 Done.
252 tsk.done = true
253 tsk.abnorm = &dm.AbnormalFinish{
254 Status: dm.AbnormalFinish_CANCELLED,
255 Reason: "cancelled via Cancel()"}
256 } else {
257 err = fmt.Errorf("MISSING task %s", tok)
dnj (Google) 2016/06/09 18:00:55 nit: %q
iannucci 2016/06/15 00:45:59 Done.
258 }
259 f.Unlock()
260 return
261 }
262
263 // GetStatus implements distributor.D
264 func (f *Distributor) GetStatus(tok distributor.Token) (rslt *distributor.TaskRe sult, err error) {
265 f.Lock()
dnj (Google) 2016/06/09 18:00:55 nit: defer f.Unlock()
iannucci 2016/06/15 00:45:59 Done.
266 if tsk, ok := f.tasks[tok]; ok {
267 if tsk.done {
268 if tsk.abnorm != nil {
269 rslt = &distributor.TaskResult{AbnormalFinish: t sk.abnorm}
270 } else {
271 rslt = &distributor.TaskResult{PersistentState: tsk.State}
272 }
273 }
274 } else {
275 rslt = &distributor.TaskResult{
276 AbnormalFinish: &dm.AbnormalFinish{
277 Status: dm.AbnormalFinish_MISSING,
278 Reason: fmt.Sprintf("unknown token: %s", tok)},
279 }
280 }
281 f.Unlock()
282 return
283 }
284
285 // InfoURL implements distributor.D
286 func (f *Distributor) InfoURL(tok distributor.Token) string {
287 return "https://info.example.com/" + string(tok)
288 }
289
290 // HandleNotification implements distributor.D
291 func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *dis tributor.TaskResult, err error) {
292 return f.GetStatus(distributor.Token(n.Attrs["token"]))
293 }
294
295 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most
296 // tests. It could be implemented if some new test required it, however.
297 func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notif ication, error) {
298 panic("not implemented")
299 }
300
301 // Validate implements distributor.D (by returning a nil error for every
302 // payload).
303 func (f *Distributor) Validate(payload string) error {
304 return nil
305 }
306
307 // RunTask allows you to run the task associated with the provided execution id.
308 //
309 // If the task corresponding to `eid` returns an error, or if the distributor
310 // itself actually has an error, this method will return an error. Notably, if
311 // `cb` returns an error, it will simply mark the corresponding task as FAILED,
312 // but will return nil here.
313 //
314 // If the task exists and hasn't been run yet, cb will be called, and can do
315 // anything that you may want to a test to do. Think of the callback as the
316 // recipe engine; it has the opportunity to do anything it wants to, interact
317 // with the deps server (or not), succeed (or not), etc.
318 //
319 // If the callback needs to maintain state between executions, Task.State is
320 // read+write; when the callback exits, the final value of Task.State will be
321 // passed back to the DM instance under test. A re-execution of the attempt will
322 // start with the new value.
323 func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) {
324 tok := MkToken(eid)
325
326 f.Lock()
327 tsk := f.tasks[tok]
328 if tsk == nil {
329 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok)
330 } else {
331 if tsk.done {
332 err = fmt.Errorf("cannot RunTask(%q): running twice", to k)
333 } else {
334 tsk.done = true
335 }
336 }
337 f.Unlock()
338
339 if err != nil {
340 return
341 }
342
343 abnorm := (*dm.AbnormalFinish)(nil)
344
345 usrTsk := &Task{
346 tsk.Auth,
347 tsk.Desc,
348 tsk.State,
349 }
350
351 defer func() {
352 f.Lock()
353 {
354 tsk.abnorm = abnorm
355 tsk.State = usrTsk.State
356
357 if r := recover(); r != nil {
358 tsk.abnorm = &dm.AbnormalFinish{
359 Status: dm.AbnormalFinish_CRASHED,
360 Reason: fmt.Sprintf("caught panic: %q", r),
361 }
362 }
363 }
364 f.Unlock()
365
366 err = tumble.RunMutation(c, &distributor.NotifyExecution{
367 CfgName: "fakeDistributor",
368 Notification: &distributor.Notification{
369 ID: tsk.Auth.Id,
370 Attrs: map[string]string{"token": string(tok)}},
371 })
372 }()
373
374 err = cb(usrTsk)
375 if err != nil {
376 err = nil
377 abnorm = &dm.AbnormalFinish{
378 Status: dm.AbnormalFinish_FAILED,
379 Reason: fmt.Sprintf("cb error: %q", err),
380 }
381 }
382 return
383 }
384
385 func panicIf(err error) {
386 if err != nil {
387 panic(err)
388 }
389 }
390
391 var _ distributor.D = (*Distributor)(nil)
392
393 // QuestDesc generates a normalized generic QuestDesc of the form:
394 // Quest_Desc{
395 // DistributorConfigName: "fakeDistributor",
396 // JsonPayload: `{"name":"$name"}`,
397 // }
398 func QuestDesc(name string) *dm.Quest_Desc {
399 desc := &dm.Quest_Desc{
400 DistributorConfigName: "fakeDistributor",
401 JsonPayload: fmt.Sprintf(`{"name": "%s"}`, name),
dnj (Google) 2016/06/09 18:00:55 This will have weird escaping issues if "name" has
iannucci 2016/06/15 00:45:59 Done.
402 }
403 panicIf(desc.Normalize())
404 return desc
405 }
406
407 // WalkShouldReturn is a convey-style assertion factory to assert that a given
408 // WalkGraph request object results in the provided GraphData.
409 //
410 // If keepTimestamps (a singular, optional boolean) is provided and true,
411 // WalkShouldReturn will not remove timestamps from the compared GraphData. If
412 // it is absent or false, GraphData.PurgeTimestamps will be called on the
413 // returned GraphData before comparing it to the expected value.
414 //
415 // Use this function like:
416 // req := &dm.WalkGraphReq{...}
417 // So(req, WalkShouldReturn(c, s), &dm.GraphData{
418 // ...
419 // })
420 func WalkShouldReturn(c context.Context, s dm.DepsServer, keepTimestamps ...bool ) func(request interface{}, expect ...interface{}) string {
421 kt := len(keepTimestamps) > 0 && keepTimestamps[0]
422 if len(keepTimestamps) > 1 {
423 panic("may only specify 0 or 1 keepTimestamps values")
424 }
425
426 normalize := func(gd *dm.GraphData) *dm.GraphData {
427 data, err := proto.Marshal(gd)
428 panicIf(err)
429 ret := &dm.GraphData{}
430
431 panicIf(proto.Unmarshal(data, ret))
432
433 if !kt {
434 ret.PurgeTimestamps()
435 }
436 return ret
437 }
438
439 return func(request interface{}, expect ...interface{}) string {
440 r := request.(*dm.WalkGraphReq)
dnj (Google) 2016/06/09 18:00:55 I guess you should check "expect" length here.
iannucci 2016/06/15 00:45:59 Done.
441 e := expect[0].(*dm.GraphData)
442 ret, err := s.WalkGraph(c, r)
443 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" {
444 return nilExpect
445 }
446 return convey.ShouldResemble(normalize(ret), e)
447 }
448 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698