Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(394)

Side by Side Diff: appengine/cmd/dm/distributor/fake/fake.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/distributor/distributor.go ('k') | appengine/cmd/dm/distributor/handlers.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package fake
6
7 import (
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "sync"
12 "time"
13
14 "github.com/golang/protobuf/proto"
15 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
16 "github.com/luci/luci-go/appengine/cmd/dm/model"
17 "github.com/luci/luci-go/appengine/tumble"
18 dm "github.com/luci/luci-go/common/api/dm/service/v1"
19 "github.com/luci/luci-go/common/gcloud/pubsub"
20 googlepb "github.com/luci/luci-go/common/proto/google"
21 "github.com/luci/luci-go/common/testing/assertions"
22 "github.com/luci/luci-go/server/secrets/testsecrets"
23 "github.com/smartystreets/goconvey/convey"
24 "golang.org/x/net/context"
25 )
26
27 // Setup creates a new combination of testing and context objects:
28 // * ttest - a tumble.Testing to allow you to control tumble's processing
29 // state
30 // * c - a context which includes a testing distributor registry, testsecrets,
31 // as well as everything that tumble.Testing.Context adds (datastore,
32 // memcache, etc.)
33 // * dist - a fake Distributor implementation with a RunTask method that
34 // allows your test to 'run' a scheduled task with the Distributor. This
35 // will automatically notify the deps service (by calling `fn`).
36 // * reg - a distributor Testing registry, pre-registerd with `dist` using the
37 // configuration name 'fakeDistributor'.
38 //
39 // You should pass mutate.FinishExecutionFn for fn. It's not done automatically
40 // in order to break an import cycle. You could provide your own, but YMMV.
41 //
42 // Usage:
43 // ttest, c, dist, reg := fake.Setup(mutate.FinishExecutionFn)
44 // s := deps.NewDecoratedServer(reg)
45 // # your tests
46 func Setup(fn distributor.FinishExecutionFn) (ttest *tumble.Testing, c context.C ontext, dist *Distributor, reg distributor.Registry) {
47 ttest = &tumble.Testing{}
48 c = ttest.Context()
49 c = testsecrets.Use(c)
50 dist = &Distributor{}
51 reg = distributor.NewTestingRegistry(map[string]distributor.D{
52 "fakeDistributor": dist,
53 }, fn)
54 c = distributor.WithRegistry(c, reg)
55 return
56 }
57
58 // DistributorData is the blob of data that the fake.Distributor keeps when DM
59 // calls its Run method. This is roughly equivalent to the state that
60 // a distributor (like swarming) would store in its own datastore about a job.
61 type DistributorData struct {
62 NotifyTopic pubsub.Topic
63 NotifyAuth string
64
65 Auth *dm.Execution_Auth
66 Desc *dm.Quest_Desc
67
68 State distributor.PersistentState
69
70 done bool
71 abnorm *dm.AbnormalFinish
72 }
73
74 // Task is the detail that the distributor task would get. This is roughly
75 // equivalent to the input that the swarming task/recipe engine would get.
76 type Task struct {
77 Auth *dm.Execution_Auth
78 Desc *dm.Quest_Desc
79 // State is read/writable.
80 State distributor.PersistentState
81 }
82
83 // Activate does the activation handshake with the provided DepsServer and
84 // returns an ActivatedTask.
85 func (t *Task) Activate(c context.Context, s dm.DepsServer) (*ActivatedTask, err or) {
86 newTok := model.MakeRandomToken(c, 32)
87 _, err := s.ActivateExecution(c, &dm.ActivateExecutionReq{
88 Auth: t.Auth, ExecutionToken: newTok})
89 if err != nil {
90 return nil, err
91 }
92
93 return &ActivatedTask{
94 s,
95 c,
96 &dm.Execution_Auth{Id: t.Auth.Id, Token: newTok},
97 t.Desc,
98 &t.State,
99 }, nil
100 }
101
102 // MustActivate does the same thing as Activate, but panics if err != nil.
103 func (t *Task) MustActivate(c context.Context, s dm.DepsServer) *ActivatedTask {
104 ret, err := t.Activate(c, s)
105 panicIf(err)
106 return ret
107 }
108
109 // ActivatedTask is like a Task, but exists after calling Task.MustActivate, and
110 // contains an activated authentication token. This may be used to either add
111 // new dependencies or to provide a finished result.
112 //
113 // The implementation of DepsServer also automatically populates all outgoing
114 // RPCs with the activated Auth value.
115 type ActivatedTask struct {
116 s dm.DepsServer
117 c context.Context
118
119 Auth *dm.Execution_Auth
120 Desc *dm.Quest_Desc
121 // State is read/writable.
122 State *distributor.PersistentState
123 }
124
125 // WalkGraph calls the bound DepsServer's WalkGraph method with the activated
126 // Auth field.
127 func (t *ActivatedTask) WalkGraph(req *dm.WalkGraphReq) (*dm.GraphData, error) {
128 newReq := *req
129 newReq.Auth = t.Auth
130 return t.s.WalkGraph(t.c, &newReq)
131 }
132
133 // EnsureGraphData calls the bound DepsServer's EnsureGraphData method with the
134 // activated Auth field in ForExecution.
135 func (t *ActivatedTask) EnsureGraphData(req *dm.EnsureGraphDataReq) (*dm.EnsureG raphDataRsp, error) {
136 newReq := *req
137 newReq.ForExecution = t.Auth
138 return t.s.EnsureGraphData(t.c, &newReq)
139 }
140
141 // DepOn is a shorthand for EnsureGraphData which allows you to depend on
142 // multiple existing quests by attempt id. The definitions for these quests must
143 // already have been added to the deps server (probably with an EnsureGraphData
144 // call).
145 func (t *ActivatedTask) DepOn(to ...*dm.Attempt_ID) (bool, error) {
146 req := &dm.EnsureGraphDataReq{Attempts: dm.NewAttemptList(nil)}
147 req.Attempts.AddAIDs(to...)
148
149 rsp, err := t.EnsureGraphData(req)
150 return rsp.ShouldHalt, err
151 }
152
153 // MustDepOn is the same as DepOn but will panic if DepOn would have returned
154 // a non-nil error.
155 func (t *ActivatedTask) MustDepOn(to ...*dm.Attempt_ID) (halt bool) {
156 halt, err := t.DepOn(to...)
157 panicIf(err)
158 return
159 }
160
161 // Finish calls FinishAttempt with the provided JSON body and optional
162 // expiration time.
163 //
164 // This will panic if you provide more than one expiration time (so don't do
165 // that).
166 func (t *ActivatedTask) Finish(resultJSON string, expire ...time.Time) {
167 req := &dm.FinishAttemptReq{
168 Auth: t.Auth,
169 JsonResult: resultJSON,
170 }
171 switch len(expire) {
172 case 0:
173 case 1:
174 req.Expiration = googlepb.NewTimestamp(expire[0])
175 default:
176 panic("may only specify 0 or 1 expire values")
177 }
178
179 _, err := t.s.FinishAttempt(t.c, req)
180 panicIf(err)
181 }
182
183 // WalkShouldReturn is a shorthand for the package-level WalkShouldReturn which
184 // binds the activated auth to the WalkGraph request, but otherwise behaves
185 // identically.
186 //
187 // Use this method like:
188 // req := &dm.WalkGraphReq{...}
189 // So(req, activated.WalkShouldReturn, &dm.GraphData{
190 // ...
191 // })
192 func (t *ActivatedTask) WalkShouldReturn(request interface{}, expect ...interfac e{}) string {
193 r := *request.(*dm.WalkGraphReq)
194 r.Auth = t.Auth
195 return WalkShouldReturn(t.c, t.s)(&r, expect...)
196 }
197
198 // Distributor implements distributor.D, and provides a method (RunTask) to
199 // allow a test to actually run a task which has been scheduled on this
200 // Distributor, and correctly notify the deps server that the execution is
201 // complete.
202 type Distributor struct {
203 // RunError can be set to make Run return this error when it's invoked.
204 RunError error
205 // These values can be set to make Run return them when it's invoked.
206 TimeToStart time.Duration
207 TimeToRun time.Duration
208 TimeToStop time.Duration
209
210 sync.Mutex
211 tasks map[distributor.Token]*DistributorData
212 }
213
214 // MkToken makes a distributor Token out of an Execution_ID. In this
215 // implementation of a Distributor there's a 1:1 mapping between Execution_ID
216 // and distributor task. This is not always the case for real distributor
217 // implementations.
218 func MkToken(eid *dm.Execution_ID) distributor.Token {
219 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st,
220 eid.Attempt, eid.Id))
221 }
222
223 // Run implements distributor.D
224 func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.To ken, timeToStart, timeToRun, timeToStop time.Duration, err error) {
225 if err = f.RunError; err != nil {
226 return
227 }
228 timeToStart = f.TimeToStart
229 timeToRun = f.TimeToRun
230 timeToStop = f.TimeToStop
231
232 exAuth := desc.ExecutionAuth()
233 tok = MkToken(exAuth.Id)
234
235 tsk := &DistributorData{
236 Auth: exAuth,
237 Desc: desc.Payload(),
238 State: desc.PreviousState(),
239 }
240 tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic()
241 panicIf(err)
242
243 f.Lock()
244 defer f.Unlock()
245 if f.tasks == nil {
246 f.tasks = map[distributor.Token]*DistributorData{}
247 }
248 f.tasks[tok] = tsk
249 return
250 }
251
252 // Cancel implements distributor.D
253 func (f *Distributor) Cancel(tok distributor.Token) (err error) {
254 f.Lock()
255 defer f.Unlock()
256 if tsk, ok := f.tasks[tok]; ok {
257 tsk.done = true
258 tsk.abnorm = &dm.AbnormalFinish{
259 Status: dm.AbnormalFinish_CANCELLED,
260 Reason: "cancelled via Cancel()"}
261 } else {
262 err = fmt.Errorf("MISSING task %q", tok)
263 }
264 return
265 }
266
267 // GetStatus implements distributor.D
268 func (f *Distributor) GetStatus(tok distributor.Token) (rslt *distributor.TaskRe sult, err error) {
269 f.Lock()
270 defer f.Unlock()
271 if tsk, ok := f.tasks[tok]; ok {
272 if tsk.done {
273 if tsk.abnorm != nil {
274 rslt = &distributor.TaskResult{AbnormalFinish: t sk.abnorm}
275 } else {
276 rslt = &distributor.TaskResult{PersistentState: tsk.State}
277 }
278 }
279 } else {
280 rslt = &distributor.TaskResult{
281 AbnormalFinish: &dm.AbnormalFinish{
282 Status: dm.AbnormalFinish_MISSING,
283 Reason: fmt.Sprintf("unknown token: %s", tok)},
284 }
285 }
286 return
287 }
288
289 // InfoURL implements distributor.D
290 func (f *Distributor) InfoURL(tok distributor.Token) string {
291 return "https://info.example.com/" + string(tok)
292 }
293
294 // HandleNotification implements distributor.D
295 func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *dis tributor.TaskResult, err error) {
296 return f.GetStatus(distributor.Token(n.Attrs["token"]))
297 }
298
299 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most
300 // tests. It could be implemented if some new test required it, however.
301 func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notif ication, error) {
302 panic("not implemented")
303 }
304
305 // Validate implements distributor.D (by returning a nil error for every
306 // payload).
307 func (f *Distributor) Validate(payload string) error {
308 return nil
309 }
310
311 // RunTask allows you to run the task associated with the provided execution id.
312 //
313 // If the task corresponding to `eid` returns an error, or if the distributor
314 // itself actually has an error, this method will return an error. Notably, if
315 // `cb` returns an error, it will simply mark the corresponding task as FAILED,
316 // but will return nil here.
317 //
318 // If the task exists and hasn't been run yet, cb will be called, and can do
319 // anything that you may want to a test to do. Think of the callback as the
320 // recipe engine; it has the opportunity to do anything it wants to, interact
321 // with the deps server (or not), succeed (or not), etc.
322 //
323 // If the callback needs to maintain state between executions, Task.State is
324 // read+write; when the callback exits, the final value of Task.State will be
325 // passed back to the DM instance under test. A re-execution of the attempt will
326 // start with the new value.
327 func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) {
328 tok := MkToken(eid)
329
330 f.Lock()
331 tsk := f.tasks[tok]
332 if tsk == nil {
333 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok)
334 } else {
335 if tsk.done {
336 err = fmt.Errorf("cannot RunTask(%q): running twice", to k)
337 } else {
338 tsk.done = true
339 }
340 }
341 f.Unlock()
342
343 if err != nil {
344 return
345 }
346
347 abnorm := (*dm.AbnormalFinish)(nil)
348
349 usrTsk := &Task{
350 tsk.Auth,
351 tsk.Desc,
352 tsk.State,
353 }
354
355 defer func() {
356 f.Lock()
357 {
358 tsk.abnorm = abnorm
359 tsk.State = usrTsk.State
360
361 if r := recover(); r != nil {
362 tsk.abnorm = &dm.AbnormalFinish{
363 Status: dm.AbnormalFinish_CRASHED,
364 Reason: fmt.Sprintf("caught panic: %q", r),
365 }
366 }
367 }
368 f.Unlock()
369
370 err = tumble.RunMutation(c, &distributor.NotifyExecution{
371 CfgName: "fakeDistributor",
372 Notification: &distributor.Notification{
373 ID: tsk.Auth.Id,
374 Attrs: map[string]string{"token": string(tok)}},
375 })
376 }()
377
378 err = cb(usrTsk)
379 if err != nil {
380 err = nil
381 abnorm = &dm.AbnormalFinish{
382 Status: dm.AbnormalFinish_FAILED,
383 Reason: fmt.Sprintf("cb error: %q", err),
384 }
385 }
386 return
387 }
388
389 func panicIf(err error) {
390 if err != nil {
391 panic(err)
392 }
393 }
394
395 var _ distributor.D = (*Distributor)(nil)
396
397 // QuestDesc generates a normalized generic QuestDesc of the form:
398 // Quest_Desc{
399 // DistributorConfigName: "fakeDistributor",
400 // JsonPayload: `{"name":"$name"}`,
401 // }
402 func QuestDesc(name string) *dm.Quest_Desc {
403 payload, err := json.Marshal(struct {
404 Name string `json:"name"`
405 }{name})
406 panicIf(err)
407 desc := &dm.Quest_Desc{
408 DistributorConfigName: "fakeDistributor",
409 JsonPayload: string(payload),
410 }
411 panicIf(desc.Normalize())
412 return desc
413 }
414
415 // WalkShouldReturn is a convey-style assertion factory to assert that a given
416 // WalkGraph request object results in the provided GraphData.
417 //
418 // If keepTimestamps (a singular, optional boolean) is provided and true,
419 // WalkShouldReturn will not remove timestamps from the compared GraphData. If
420 // it is absent or false, GraphData.PurgeTimestamps will be called on the
421 // returned GraphData before comparing it to the expected value.
422 //
423 // Use this function like:
424 // req := &dm.WalkGraphReq{...}
425 // So(req, WalkShouldReturn(c, s), &dm.GraphData{
426 // ...
427 // })
428 func WalkShouldReturn(c context.Context, s dm.DepsServer, keepTimestamps ...bool ) func(request interface{}, expect ...interface{}) string {
429 kt := len(keepTimestamps) > 0 && keepTimestamps[0]
430 if len(keepTimestamps) > 1 {
431 panic("may only specify 0 or 1 keepTimestamps values")
432 }
433
434 normalize := func(gd *dm.GraphData) *dm.GraphData {
435 data, err := proto.Marshal(gd)
436 panicIf(err)
437 ret := &dm.GraphData{}
438
439 panicIf(proto.Unmarshal(data, ret))
440
441 if !kt {
442 ret.PurgeTimestamps()
443 }
444 return ret
445 }
446
447 return func(request interface{}, expect ...interface{}) string {
448 r := request.(*dm.WalkGraphReq)
449 if len(expect) != 1 {
450 panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(ex pect)))
451 }
452 e := expect[0].(*dm.GraphData)
453 ret, err := s.WalkGraph(c, r)
454 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" {
455 return nilExpect
456 }
457 return convey.ShouldResemble(normalize(ret), e)
458 }
459 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/distributor/distributor.go ('k') | appengine/cmd/dm/distributor/handlers.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698