Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package fake | 5 package fake |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "encoding/json" | 8 "encoding/json" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 60 "acls.cfg": ` | 60 "acls.cfg": ` |
| 61 readers: "reader_group" | 61 readers: "reader_group" |
| 62 writers: "writer_group" | 62 writers: "writer_group" |
| 63 `, | 63 `, |
| 64 }, | 64 }, |
| 65 })) | 65 })) |
| 66 c = auth.WithState(c, &authtest.FakeState{ | 66 c = auth.WithState(c, &authtest.FakeState{ |
| 67 Identity: identity.AnonymousIdentity, | 67 Identity: identity.AnonymousIdentity, |
| 68 }) | 68 }) |
| 69 dist = &Distributor{} | 69 dist = &Distributor{} |
| 70 » reg := distributor.NewTestingRegistry(map[string]distributor.D{ | 70 » reg := distributor.NewTestingRegistry(distributor.TestFactoryMap{ |
| 71 » » "fakeDistributor": dist, | 71 » » "fakeDistributor": func(c context.Context, cfg *distributor.Conf ig) distributor.D { |
| 72 » » » return &BoundDistributor{dist, c, cfg} | |
| 73 » » }, | |
| 72 }, fn) | 74 }, fn) |
| 73 c = distributor.WithRegistry(c, reg) | 75 c = distributor.WithRegistry(c, reg) |
| 74 return | 76 return |
| 75 } | 77 } |
| 76 | 78 |
| 77 // DistributorData is the blob of data that the fake.Distributor keeps when DM | 79 // DistributorData is the blob of data that the fake.Distributor keeps when DM |
| 78 // calls its Run method. This is roughly equivalent to the state that | 80 // calls its Run method. This is roughly equivalent to the state that |
| 79 // a distributor (like swarming) would store in its own datastore about a job. | 81 // a distributor (like swarming) would store in its own datastore about a job. |
| 80 type DistributorData struct { | 82 type DistributorData struct { |
| 81 NotifyTopic pubsub.Topic | 83 NotifyTopic pubsub.Topic |
| (...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 221 type Distributor struct { | 223 type Distributor struct { |
| 222 // RunError can be set to make Run return this error when it's invoked. | 224 // RunError can be set to make Run return this error when it's invoked. |
| 223 RunError error | 225 RunError error |
| 224 // This can be set to turn the distributor into a polling-based distribu tor. | 226 // This can be set to turn the distributor into a polling-based distribu tor. |
| 225 PollbackTime time.Duration | 227 PollbackTime time.Duration |
| 226 | 228 |
| 227 sync.Mutex | 229 sync.Mutex |
| 228 tasks map[distributor.Token]*DistributorData | 230 tasks map[distributor.Token]*DistributorData |
| 229 } | 231 } |
| 230 | 232 |
| 233 // BoundDistributor binds the fake.Distributor to a Context and | |
| 234 // a distributor.Config. It implements distributor.D. | |
| 235 type BoundDistributor struct { | |
| 236 *Distributor | |
| 237 c context.Context | |
| 238 cfg *distributor.Config | |
| 239 } | |
| 240 | |
| 231 // MkToken makes a distributor Token out of an Execution_ID. In this | 241 // MkToken makes a distributor Token out of an Execution_ID. In this |
| 232 // implementation of a Distributor there's a 1:1 mapping between Execution_ID | 242 // implementation of a Distributor there's a 1:1 mapping between Execution_ID |
| 233 // and distributor task. This is not always the case for real distributor | 243 // and distributor task. This is not always the case for real distributor |
| 234 // implementations. | 244 // implementations. |
| 235 func MkToken(eid *dm.Execution_ID) distributor.Token { | 245 func MkToken(eid *dm.Execution_ID) distributor.Token { |
| 236 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st, | 246 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st, |
| 237 eid.Attempt, eid.Id)) | 247 eid.Attempt, eid.Id)) |
| 238 } | 248 } |
| 239 | 249 |
| 240 // Run implements distributor.D | 250 // Run implements distributor.D |
| 241 func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.To ken, pollbackTime time.Duration, err error) { | 251 func (d *BoundDistributor) Run(desc *dm.Quest_Desc, exAuth *dm.Execution_Auth, p rev *dm.JsonResult) (tok distributor.Token, pollbackTime time.Duration, err erro r) { |
| 242 » if err = f.RunError; err != nil { | 252 » if err = d.RunError; err != nil { |
| 243 return | 253 return |
| 244 } | 254 } |
| 245 » pollbackTime = f.PollbackTime | 255 » pollbackTime = d.PollbackTime |
| 246 | 256 |
| 247 exAuth := desc.ExecutionAuth() | |
| 248 tok = MkToken(exAuth.Id) | 257 tok = MkToken(exAuth.Id) |
|
Vadim Sh.
2016/09/20 00:24:26
is it really that difficult to type two more lette
iannucci
2016/09/20 00:51:28
I think it's fine for a testing library... Mk is a
| |
| 249 | 258 |
| 250 tsk := &DistributorData{ | 259 tsk := &DistributorData{ |
| 251 Auth: exAuth, | 260 Auth: exAuth, |
| 252 » » Desc: desc.Payload(), | 261 » » Desc: desc, |
| 253 » » State: desc.PreviousResult(), | 262 » » State: prev, |
| 254 } | 263 } |
| 255 » tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic() | 264 » tsk.NotifyTopic, tsk.NotifyAuth, err = d.cfg.PrepareTopic(d.c, exAuth.Id ) |
| 256 panicIf(err) | 265 panicIf(err) |
| 257 | 266 |
| 258 » f.Lock() | 267 » d.Lock() |
| 259 » defer f.Unlock() | 268 » defer d.Unlock() |
| 260 » if f.tasks == nil { | 269 » if d.tasks == nil { |
| 261 » » f.tasks = map[distributor.Token]*DistributorData{} | 270 » » d.tasks = map[distributor.Token]*DistributorData{} |
| 262 } | 271 } |
| 263 » f.tasks[tok] = tsk | 272 » d.tasks[tok] = tsk |
| 264 return | 273 return |
| 265 } | 274 } |
| 266 | 275 |
| 267 // Cancel implements distributor.D | 276 // Cancel implements distributor.D |
| 268 func (f *Distributor) Cancel(tok distributor.Token) (err error) { | 277 func (d *BoundDistributor) Cancel(_ *dm.Quest_Desc, tok distributor.Token) (err error) { |
| 269 » f.Lock() | 278 » d.Lock() |
| 270 » defer f.Unlock() | 279 » defer d.Unlock() |
| 271 » if tsk, ok := f.tasks[tok]; ok { | 280 » if tsk, ok := d.tasks[tok]; ok { |
| 272 tsk.done = true | 281 tsk.done = true |
| 273 tsk.abnorm = &dm.AbnormalFinish{ | 282 tsk.abnorm = &dm.AbnormalFinish{ |
| 274 Status: dm.AbnormalFinish_CANCELLED, | 283 Status: dm.AbnormalFinish_CANCELLED, |
| 275 Reason: "cancelled via Cancel()"} | 284 Reason: "cancelled via Cancel()"} |
| 276 } else { | 285 } else { |
| 277 err = fmt.Errorf("MISSING task %q", tok) | 286 err = fmt.Errorf("MISSING task %q", tok) |
| 278 } | 287 } |
| 279 return | 288 return |
| 280 } | 289 } |
| 281 | 290 |
| 282 // GetStatus implements distributor.D | 291 // GetStatus implements distributor.D |
| 283 func (f *Distributor) GetStatus(tok distributor.Token) (rslt *dm.Result, err err or) { | 292 func (d *BoundDistributor) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (r slt *dm.Result, err error) { |
| 284 » f.Lock() | 293 » d.Lock() |
| 285 » defer f.Unlock() | 294 » defer d.Unlock() |
| 286 » if tsk, ok := f.tasks[tok]; ok { | 295 » if tsk, ok := d.tasks[tok]; ok { |
| 287 if tsk.done { | 296 if tsk.done { |
| 288 if tsk.abnorm != nil { | 297 if tsk.abnorm != nil { |
| 289 rslt = &dm.Result{AbnormalFinish: tsk.abnorm} | 298 rslt = &dm.Result{AbnormalFinish: tsk.abnorm} |
| 290 } else { | 299 } else { |
| 291 rslt = &dm.Result{Data: tsk.State} | 300 rslt = &dm.Result{Data: tsk.State} |
| 292 } | 301 } |
| 293 } | 302 } |
| 294 } else { | 303 } else { |
| 295 rslt = &dm.Result{ | 304 rslt = &dm.Result{ |
| 296 AbnormalFinish: &dm.AbnormalFinish{ | 305 AbnormalFinish: &dm.AbnormalFinish{ |
| 297 Status: dm.AbnormalFinish_MISSING, | 306 Status: dm.AbnormalFinish_MISSING, |
| 298 Reason: fmt.Sprintf("unknown token: %s", tok)}, | 307 Reason: fmt.Sprintf("unknown token: %s", tok)}, |
| 299 } | 308 } |
| 300 } | 309 } |
| 301 return | 310 return |
| 302 } | 311 } |
| 303 | 312 |
| 304 // FakeURLPrefix is the url that all fake InfoURLs are prefixed with. | 313 // FakeURLPrefix is the url that all fake InfoURLs are prefixed with. |
| 305 const FakeURLPrefix = "https://info.example.com/" | 314 const FakeURLPrefix = "https://info.example.com/" |
| 306 | 315 |
| 307 // InfoURL builds a fake InfoURL for the given Execution_ID | 316 // InfoURL builds a fake InfoURL for the given Execution_ID |
| 308 func InfoURL(e *dm.Execution_ID) string { | 317 func InfoURL(e *dm.Execution_ID) string { |
| 309 return FakeURLPrefix + string(MkToken(e)) | 318 return FakeURLPrefix + string(MkToken(e)) |
| 310 } | 319 } |
| 311 | 320 |
| 312 // InfoURL implements distributor.D | 321 // InfoURL implements distributor.D |
| 313 func (f *Distributor) InfoURL(tok distributor.Token) string { | 322 func (d *BoundDistributor) InfoURL(tok distributor.Token) string { |
| 314 return FakeURLPrefix + string(tok) | 323 return FakeURLPrefix + string(tok) |
| 315 } | 324 } |
| 316 | 325 |
| 317 // HandleNotification implements distributor.D | 326 // HandleNotification implements distributor.D |
| 318 func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *dm. Result, err error) { | 327 func (d *BoundDistributor) HandleNotification(q *dm.Quest_Desc, n *distributor.N otification) (rslt *dm.Result, err error) { |
| 319 » return f.GetStatus(distributor.Token(n.Attrs["token"])) | 328 » return d.GetStatus(q, distributor.Token(n.Attrs["token"])) |
| 320 } | 329 } |
| 321 | 330 |
| 322 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most | 331 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most |
| 323 // tests. It could be implemented if some new test required it, however. | 332 // tests. It could be implemented if some new test required it, however. |
| 324 func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notif ication, error) { | 333 func (d *BoundDistributor) HandleTaskQueueTask(r *http.Request) ([]*distributor. Notification, error) { |
| 325 panic("not implemented") | 334 panic("not implemented") |
| 326 } | 335 } |
| 327 | 336 |
| 328 // Validate implements distributor.D (by returning a nil error for every | 337 // Validate implements distributor.D (by returning a nil error for every |
| 329 // payload). | 338 // payload). |
| 330 func (f *Distributor) Validate(payload string) error { | 339 func (d *BoundDistributor) Validate(payload string) error { |
| 331 return nil | 340 return nil |
| 332 } | 341 } |
| 333 | 342 |
| 334 // RunTask allows you to run the task associated with the provided execution id. | 343 // RunTask allows you to run the task associated with the provided execution id. |
| 335 // | 344 // |
| 336 // If the task corresponding to `eid` returns an error, or if the distributor | 345 // If the task corresponding to `eid` returns an error, or if the distributor |
| 337 // itself actually has an error, this method will return an error. Notably, if | 346 // itself actually has an error, this method will return an error. Notably, if |
| 338 // `cb` returns an error, it will simply mark the corresponding task as FAILED, | 347 // `cb` returns an error, it will simply mark the corresponding task as FAILED, |
| 339 // but will return nil here. | 348 // but will return nil here. |
| 340 // | 349 // |
| 341 // If the task exists and hasn't been run yet, cb will be called, and can do | 350 // If the task exists and hasn't been run yet, cb will be called, and can do |
| 342 // anything that you may want to a test to do. Think of the callback as the | 351 // anything that you may want to a test to do. Think of the callback as the |
| 343 // recipe engine; it has the opportunity to do anything it wants to, interact | 352 // recipe engine; it has the opportunity to do anything it wants to, interact |
| 344 // with the deps server (or not), succeed (or not), etc. | 353 // with the deps server (or not), succeed (or not), etc. |
| 345 // | 354 // |
| 346 // If the callback needs to maintain state between executions, Task.State is | 355 // If the callback needs to maintain state between executions, Task.State is |
| 347 // read+write; when the callback exits, the final value of Task.State will be | 356 // read+write; when the callback exits, the final value of Task.State will be |
| 348 // passed back to the DM instance under test. A re-execution of the attempt will | 357 // passed back to the DM instance under test. A re-execution of the attempt will |
| 349 // start with the new value. | 358 // start with the new value. |
| 350 func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) { | 359 func (d *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) { |
| 351 tok := MkToken(eid) | 360 tok := MkToken(eid) |
| 352 | 361 |
| 353 » f.Lock() | 362 » d.Lock() |
| 354 » tsk := f.tasks[tok] | 363 » tsk := d.tasks[tok] |
| 355 if tsk == nil { | 364 if tsk == nil { |
| 356 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok) | 365 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok) |
| 357 } else { | 366 } else { |
| 358 if tsk.done { | 367 if tsk.done { |
| 359 err = fmt.Errorf("cannot RunTask(%q): running twice", to k) | 368 err = fmt.Errorf("cannot RunTask(%q): running twice", to k) |
| 360 } else { | 369 } else { |
| 361 tsk.done = true | 370 tsk.done = true |
| 362 } | 371 } |
| 363 } | 372 } |
| 364 » f.Unlock() | 373 » d.Unlock() |
| 365 | 374 |
| 366 if err != nil { | 375 if err != nil { |
| 367 return | 376 return |
| 368 } | 377 } |
| 369 | 378 |
| 370 abnorm := (*dm.AbnormalFinish)(nil) | 379 abnorm := (*dm.AbnormalFinish)(nil) |
| 371 | 380 |
| 372 usrTsk := &Task{ | 381 usrTsk := &Task{ |
| 373 tsk.Auth, | 382 tsk.Auth, |
| 374 tsk.Desc, | 383 tsk.Desc, |
| 375 tsk.State, | 384 tsk.State, |
| 376 } | 385 } |
| 377 | 386 |
| 378 defer func() { | 387 defer func() { |
| 379 » » f.Lock() | 388 » » d.Lock() |
| 380 { | 389 { |
| 381 tsk.abnorm = abnorm | 390 tsk.abnorm = abnorm |
| 382 tsk.State = usrTsk.State | 391 tsk.State = usrTsk.State |
| 383 | 392 |
| 384 if r := recover(); r != nil { | 393 if r := recover(); r != nil { |
| 385 tsk.abnorm = &dm.AbnormalFinish{ | 394 tsk.abnorm = &dm.AbnormalFinish{ |
| 386 Status: dm.AbnormalFinish_CRASHED, | 395 Status: dm.AbnormalFinish_CRASHED, |
| 387 Reason: fmt.Sprintf("caught panic: %q", r), | 396 Reason: fmt.Sprintf("caught panic: %q", r), |
| 388 } | 397 } |
| 389 } | 398 } |
| 390 } | 399 } |
| 391 » » f.Unlock() | 400 » » d.Unlock() |
| 392 | 401 |
| 393 err = tumble.RunMutation(c, &distributor.NotifyExecution{ | 402 err = tumble.RunMutation(c, &distributor.NotifyExecution{ |
| 394 CfgName: "fakeDistributor", | 403 CfgName: "fakeDistributor", |
| 395 Notification: &distributor.Notification{ | 404 Notification: &distributor.Notification{ |
| 396 ID: tsk.Auth.Id, | 405 ID: tsk.Auth.Id, |
| 397 Attrs: map[string]string{"token": string(tok)}}, | 406 Attrs: map[string]string{"token": string(tok)}}, |
| 398 }) | 407 }) |
| 399 }() | 408 }() |
| 400 | 409 |
| 401 err = cb(usrTsk) | 410 err = cb(usrTsk) |
| 402 if err != nil { | 411 if err != nil { |
| 403 err = nil | 412 err = nil |
| 404 abnorm = &dm.AbnormalFinish{ | 413 abnorm = &dm.AbnormalFinish{ |
| 405 Status: dm.AbnormalFinish_FAILED, | 414 Status: dm.AbnormalFinish_FAILED, |
| 406 Reason: fmt.Sprintf("cb error: %q", err), | 415 Reason: fmt.Sprintf("cb error: %q", err), |
| 407 } | 416 } |
| 408 } | 417 } |
| 409 return | 418 return |
| 410 } | 419 } |
| 411 | 420 |
| 412 func panicIf(err error) { | 421 func panicIf(err error) { |
| 413 if err != nil { | 422 if err != nil { |
| 414 panic(err) | 423 panic(err) |
| 415 } | 424 } |
| 416 } | 425 } |
| 417 | 426 |
| 418 var _ distributor.D = (*Distributor)(nil) | 427 var _ distributor.D = (*BoundDistributor)(nil) |
| 419 | 428 |
| 420 // QuestDesc generates a normalized generic QuestDesc of the form: | 429 // QuestDesc generates a normalized generic QuestDesc of the form: |
| 421 // Quest_Desc{ | 430 // Quest_Desc{ |
| 422 // DistributorConfigName: "fakeDistributor", | 431 // DistributorConfigName: "fakeDistributor", |
| 423 // Parameters: `{"name":"$name"}`, | 432 // Parameters: `{"name":"$name"}`, |
| 424 // DistributorParameters: "{}", | 433 // DistributorParameters: "{}", |
| 425 // } | 434 // } |
| 426 func QuestDesc(name string) *dm.Quest_Desc { | 435 func QuestDesc(name string) *dm.Quest_Desc { |
| 427 params, err := json.Marshal(struct { | 436 params, err := json.Marshal(struct { |
| 428 Name string `json:"name"` | 437 Name string `json:"name"` |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 475 panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(ex pect))) | 484 panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(ex pect))) |
| 476 } | 485 } |
| 477 e := expect[0].(*dm.GraphData) | 486 e := expect[0].(*dm.GraphData) |
| 478 ret, err := s.WalkGraph(c, r) | 487 ret, err := s.WalkGraph(c, r) |
| 479 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" { | 488 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" { |
| 480 return nilExpect | 489 return nilExpect |
| 481 } | 490 } |
| 482 return convey.ShouldResemble(normalize(ret), e) | 491 return convey.ShouldResemble(normalize(ret), e) |
| 483 } | 492 } |
| 484 } | 493 } |
| OLD | NEW |