Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(591)

Side by Side Diff: dm/appengine/distributor/fake/fake.go

Issue 2347973003: Refactor distributor API so that methods always get the Quest_Desc too. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package fake 5 package fake
6 6
7 import ( 7 import (
8 "encoding/json" 8 "encoding/json"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
60 "acls.cfg": ` 60 "acls.cfg": `
61 readers: "reader_group" 61 readers: "reader_group"
62 writers: "writer_group" 62 writers: "writer_group"
63 `, 63 `,
64 }, 64 },
65 })) 65 }))
66 c = auth.WithState(c, &authtest.FakeState{ 66 c = auth.WithState(c, &authtest.FakeState{
67 Identity: identity.AnonymousIdentity, 67 Identity: identity.AnonymousIdentity,
68 }) 68 })
69 dist = &Distributor{} 69 dist = &Distributor{}
70 » reg := distributor.NewTestingRegistry(map[string]distributor.D{ 70 » reg := distributor.NewTestingRegistry(distributor.TestFactoryMap{
71 » » "fakeDistributor": dist, 71 » » "fakeDistributor": func(c context.Context, cfg *distributor.Conf ig) distributor.D {
72 » » » return &BoundDistributor{dist, c, cfg}
73 » » },
72 }, fn) 74 }, fn)
73 c = distributor.WithRegistry(c, reg) 75 c = distributor.WithRegistry(c, reg)
74 return 76 return
75 } 77 }
76 78
77 // DistributorData is the blob of data that the fake.Distributor keeps when DM 79 // DistributorData is the blob of data that the fake.Distributor keeps when DM
78 // calls its Run method. This is roughly equivalent to the state that 80 // calls its Run method. This is roughly equivalent to the state that
79 // a distributor (like swarming) would store in its own datastore about a job. 81 // a distributor (like swarming) would store in its own datastore about a job.
80 type DistributorData struct { 82 type DistributorData struct {
81 NotifyTopic pubsub.Topic 83 NotifyTopic pubsub.Topic
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 type Distributor struct { 223 type Distributor struct {
222 // RunError can be set to make Run return this error when it's invoked. 224 // RunError can be set to make Run return this error when it's invoked.
223 RunError error 225 RunError error
224 // This can be set to turn the distributor into a polling-based distribu tor. 226 // This can be set to turn the distributor into a polling-based distribu tor.
225 PollbackTime time.Duration 227 PollbackTime time.Duration
226 228
227 sync.Mutex 229 sync.Mutex
228 tasks map[distributor.Token]*DistributorData 230 tasks map[distributor.Token]*DistributorData
229 } 231 }
230 232
233 // BoundDistributor binds the fake.Distributor to a Context and
234 // a distributor.Config. It implements distributor.D.
235 type BoundDistributor struct {
236 *Distributor
237 c context.Context
238 cfg *distributor.Config
239 }
240
231 // MkToken makes a distributor Token out of an Execution_ID. In this 241 // MkToken makes a distributor Token out of an Execution_ID. In this
232 // implementation of a Distributor there's a 1:1 mapping between Execution_ID 242 // implementation of a Distributor there's a 1:1 mapping between Execution_ID
233 // and distributor task. This is not always the case for real distributor 243 // and distributor task. This is not always the case for real distributor
234 // implementations. 244 // implementations.
235 func MkToken(eid *dm.Execution_ID) distributor.Token { 245 func MkToken(eid *dm.Execution_ID) distributor.Token {
236 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st, 246 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st,
237 eid.Attempt, eid.Id)) 247 eid.Attempt, eid.Id))
238 } 248 }
239 249
240 // Run implements distributor.D 250 // Run implements distributor.D
241 func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.To ken, pollbackTime time.Duration, err error) { 251 func (d *BoundDistributor) Run(desc *dm.Quest_Desc, exAuth *dm.Execution_Auth, p rev *dm.JsonResult) (tok distributor.Token, pollbackTime time.Duration, err erro r) {
242 » if err = f.RunError; err != nil { 252 » if err = d.RunError; err != nil {
243 return 253 return
244 } 254 }
245 » pollbackTime = f.PollbackTime 255 » pollbackTime = d.PollbackTime
246 256
247 exAuth := desc.ExecutionAuth()
248 tok = MkToken(exAuth.Id) 257 tok = MkToken(exAuth.Id)
Vadim Sh. 2016/09/20 00:24:26 is it really that difficult to type two more lette
iannucci 2016/09/20 00:51:28 I think it's fine for a testing library... Mk is a
249 258
250 tsk := &DistributorData{ 259 tsk := &DistributorData{
251 Auth: exAuth, 260 Auth: exAuth,
252 » » Desc: desc.Payload(), 261 » » Desc: desc,
253 » » State: desc.PreviousResult(), 262 » » State: prev,
254 } 263 }
255 » tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic() 264 » tsk.NotifyTopic, tsk.NotifyAuth, err = d.cfg.PrepareTopic(d.c, exAuth.Id )
256 panicIf(err) 265 panicIf(err)
257 266
258 » f.Lock() 267 » d.Lock()
259 » defer f.Unlock() 268 » defer d.Unlock()
260 » if f.tasks == nil { 269 » if d.tasks == nil {
261 » » f.tasks = map[distributor.Token]*DistributorData{} 270 » » d.tasks = map[distributor.Token]*DistributorData{}
262 } 271 }
263 » f.tasks[tok] = tsk 272 » d.tasks[tok] = tsk
264 return 273 return
265 } 274 }
266 275
267 // Cancel implements distributor.D 276 // Cancel implements distributor.D
268 func (f *Distributor) Cancel(tok distributor.Token) (err error) { 277 func (d *BoundDistributor) Cancel(_ *dm.Quest_Desc, tok distributor.Token) (err error) {
269 » f.Lock() 278 » d.Lock()
270 » defer f.Unlock() 279 » defer d.Unlock()
271 » if tsk, ok := f.tasks[tok]; ok { 280 » if tsk, ok := d.tasks[tok]; ok {
272 tsk.done = true 281 tsk.done = true
273 tsk.abnorm = &dm.AbnormalFinish{ 282 tsk.abnorm = &dm.AbnormalFinish{
274 Status: dm.AbnormalFinish_CANCELLED, 283 Status: dm.AbnormalFinish_CANCELLED,
275 Reason: "cancelled via Cancel()"} 284 Reason: "cancelled via Cancel()"}
276 } else { 285 } else {
277 err = fmt.Errorf("MISSING task %q", tok) 286 err = fmt.Errorf("MISSING task %q", tok)
278 } 287 }
279 return 288 return
280 } 289 }
281 290
282 // GetStatus implements distributor.D 291 // GetStatus implements distributor.D
283 func (f *Distributor) GetStatus(tok distributor.Token) (rslt *dm.Result, err err or) { 292 func (d *BoundDistributor) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (r slt *dm.Result, err error) {
284 » f.Lock() 293 » d.Lock()
285 » defer f.Unlock() 294 » defer d.Unlock()
286 » if tsk, ok := f.tasks[tok]; ok { 295 » if tsk, ok := d.tasks[tok]; ok {
287 if tsk.done { 296 if tsk.done {
288 if tsk.abnorm != nil { 297 if tsk.abnorm != nil {
289 rslt = &dm.Result{AbnormalFinish: tsk.abnorm} 298 rslt = &dm.Result{AbnormalFinish: tsk.abnorm}
290 } else { 299 } else {
291 rslt = &dm.Result{Data: tsk.State} 300 rslt = &dm.Result{Data: tsk.State}
292 } 301 }
293 } 302 }
294 } else { 303 } else {
295 rslt = &dm.Result{ 304 rslt = &dm.Result{
296 AbnormalFinish: &dm.AbnormalFinish{ 305 AbnormalFinish: &dm.AbnormalFinish{
297 Status: dm.AbnormalFinish_MISSING, 306 Status: dm.AbnormalFinish_MISSING,
298 Reason: fmt.Sprintf("unknown token: %s", tok)}, 307 Reason: fmt.Sprintf("unknown token: %s", tok)},
299 } 308 }
300 } 309 }
301 return 310 return
302 } 311 }
303 312
304 // FakeURLPrefix is the url that all fake InfoURLs are prefixed with. 313 // FakeURLPrefix is the url that all fake InfoURLs are prefixed with.
305 const FakeURLPrefix = "https://info.example.com/" 314 const FakeURLPrefix = "https://info.example.com/"
306 315
307 // InfoURL builds a fake InfoURL for the given Execution_ID 316 // InfoURL builds a fake InfoURL for the given Execution_ID
308 func InfoURL(e *dm.Execution_ID) string { 317 func InfoURL(e *dm.Execution_ID) string {
309 return FakeURLPrefix + string(MkToken(e)) 318 return FakeURLPrefix + string(MkToken(e))
310 } 319 }
311 320
312 // InfoURL implements distributor.D 321 // InfoURL implements distributor.D
313 func (f *Distributor) InfoURL(tok distributor.Token) string { 322 func (d *BoundDistributor) InfoURL(tok distributor.Token) string {
314 return FakeURLPrefix + string(tok) 323 return FakeURLPrefix + string(tok)
315 } 324 }
316 325
317 // HandleNotification implements distributor.D 326 // HandleNotification implements distributor.D
318 func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *dm. Result, err error) { 327 func (d *BoundDistributor) HandleNotification(q *dm.Quest_Desc, n *distributor.N otification) (rslt *dm.Result, err error) {
319 » return f.GetStatus(distributor.Token(n.Attrs["token"])) 328 » return d.GetStatus(q, distributor.Token(n.Attrs["token"]))
320 } 329 }
321 330
322 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most 331 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most
323 // tests. It could be implemented if some new test required it, however. 332 // tests. It could be implemented if some new test required it, however.
324 func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notif ication, error) { 333 func (d *BoundDistributor) HandleTaskQueueTask(r *http.Request) ([]*distributor. Notification, error) {
325 panic("not implemented") 334 panic("not implemented")
326 } 335 }
327 336
328 // Validate implements distributor.D (by returning a nil error for every 337 // Validate implements distributor.D (by returning a nil error for every
329 // payload). 338 // payload).
330 func (f *Distributor) Validate(payload string) error { 339 func (d *BoundDistributor) Validate(payload string) error {
331 return nil 340 return nil
332 } 341 }
333 342
334 // RunTask allows you to run the task associated with the provided execution id. 343 // RunTask allows you to run the task associated with the provided execution id.
335 // 344 //
336 // If the task corresponding to `eid` returns an error, or if the distributor 345 // If the task corresponding to `eid` returns an error, or if the distributor
337 // itself actually has an error, this method will return an error. Notably, if 346 // itself actually has an error, this method will return an error. Notably, if
338 // `cb` returns an error, it will simply mark the corresponding task as FAILED, 347 // `cb` returns an error, it will simply mark the corresponding task as FAILED,
339 // but will return nil here. 348 // but will return nil here.
340 // 349 //
341 // If the task exists and hasn't been run yet, cb will be called, and can do 350 // If the task exists and hasn't been run yet, cb will be called, and can do
342 // anything that you may want to a test to do. Think of the callback as the 351 // anything that you may want to a test to do. Think of the callback as the
343 // recipe engine; it has the opportunity to do anything it wants to, interact 352 // recipe engine; it has the opportunity to do anything it wants to, interact
344 // with the deps server (or not), succeed (or not), etc. 353 // with the deps server (or not), succeed (or not), etc.
345 // 354 //
346 // If the callback needs to maintain state between executions, Task.State is 355 // If the callback needs to maintain state between executions, Task.State is
347 // read+write; when the callback exits, the final value of Task.State will be 356 // read+write; when the callback exits, the final value of Task.State will be
348 // passed back to the DM instance under test. A re-execution of the attempt will 357 // passed back to the DM instance under test. A re-execution of the attempt will
349 // start with the new value. 358 // start with the new value.
350 func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) { 359 func (d *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) {
351 tok := MkToken(eid) 360 tok := MkToken(eid)
352 361
353 » f.Lock() 362 » d.Lock()
354 » tsk := f.tasks[tok] 363 » tsk := d.tasks[tok]
355 if tsk == nil { 364 if tsk == nil {
356 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok) 365 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok)
357 } else { 366 } else {
358 if tsk.done { 367 if tsk.done {
359 err = fmt.Errorf("cannot RunTask(%q): running twice", to k) 368 err = fmt.Errorf("cannot RunTask(%q): running twice", to k)
360 } else { 369 } else {
361 tsk.done = true 370 tsk.done = true
362 } 371 }
363 } 372 }
364 » f.Unlock() 373 » d.Unlock()
365 374
366 if err != nil { 375 if err != nil {
367 return 376 return
368 } 377 }
369 378
370 abnorm := (*dm.AbnormalFinish)(nil) 379 abnorm := (*dm.AbnormalFinish)(nil)
371 380
372 usrTsk := &Task{ 381 usrTsk := &Task{
373 tsk.Auth, 382 tsk.Auth,
374 tsk.Desc, 383 tsk.Desc,
375 tsk.State, 384 tsk.State,
376 } 385 }
377 386
378 defer func() { 387 defer func() {
379 » » f.Lock() 388 » » d.Lock()
380 { 389 {
381 tsk.abnorm = abnorm 390 tsk.abnorm = abnorm
382 tsk.State = usrTsk.State 391 tsk.State = usrTsk.State
383 392
384 if r := recover(); r != nil { 393 if r := recover(); r != nil {
385 tsk.abnorm = &dm.AbnormalFinish{ 394 tsk.abnorm = &dm.AbnormalFinish{
386 Status: dm.AbnormalFinish_CRASHED, 395 Status: dm.AbnormalFinish_CRASHED,
387 Reason: fmt.Sprintf("caught panic: %q", r), 396 Reason: fmt.Sprintf("caught panic: %q", r),
388 } 397 }
389 } 398 }
390 } 399 }
391 » » f.Unlock() 400 » » d.Unlock()
392 401
393 err = tumble.RunMutation(c, &distributor.NotifyExecution{ 402 err = tumble.RunMutation(c, &distributor.NotifyExecution{
394 CfgName: "fakeDistributor", 403 CfgName: "fakeDistributor",
395 Notification: &distributor.Notification{ 404 Notification: &distributor.Notification{
396 ID: tsk.Auth.Id, 405 ID: tsk.Auth.Id,
397 Attrs: map[string]string{"token": string(tok)}}, 406 Attrs: map[string]string{"token": string(tok)}},
398 }) 407 })
399 }() 408 }()
400 409
401 err = cb(usrTsk) 410 err = cb(usrTsk)
402 if err != nil { 411 if err != nil {
403 err = nil 412 err = nil
404 abnorm = &dm.AbnormalFinish{ 413 abnorm = &dm.AbnormalFinish{
405 Status: dm.AbnormalFinish_FAILED, 414 Status: dm.AbnormalFinish_FAILED,
406 Reason: fmt.Sprintf("cb error: %q", err), 415 Reason: fmt.Sprintf("cb error: %q", err),
407 } 416 }
408 } 417 }
409 return 418 return
410 } 419 }
411 420
412 func panicIf(err error) { 421 func panicIf(err error) {
413 if err != nil { 422 if err != nil {
414 panic(err) 423 panic(err)
415 } 424 }
416 } 425 }
417 426
418 var _ distributor.D = (*Distributor)(nil) 427 var _ distributor.D = (*BoundDistributor)(nil)
419 428
420 // QuestDesc generates a normalized generic QuestDesc of the form: 429 // QuestDesc generates a normalized generic QuestDesc of the form:
421 // Quest_Desc{ 430 // Quest_Desc{
422 // DistributorConfigName: "fakeDistributor", 431 // DistributorConfigName: "fakeDistributor",
423 // Parameters: `{"name":"$name"}`, 432 // Parameters: `{"name":"$name"}`,
424 // DistributorParameters: "{}", 433 // DistributorParameters: "{}",
425 // } 434 // }
426 func QuestDesc(name string) *dm.Quest_Desc { 435 func QuestDesc(name string) *dm.Quest_Desc {
427 params, err := json.Marshal(struct { 436 params, err := json.Marshal(struct {
428 Name string `json:"name"` 437 Name string `json:"name"`
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
475 panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(ex pect))) 484 panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(ex pect)))
476 } 485 }
477 e := expect[0].(*dm.GraphData) 486 e := expect[0].(*dm.GraphData)
478 ret, err := s.WalkGraph(c, r) 487 ret, err := s.WalkGraph(c, r)
479 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" { 488 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" {
480 return nilExpect 489 return nilExpect
481 } 490 }
482 return convey.ShouldResemble(normalize(ret), e) 491 return convey.ShouldResemble(normalize(ret), e)
483 } 492 }
484 } 493 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698