Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package fake | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "net/http" | |
| 10 "sync" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/golang/protobuf/proto" | |
| 14 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
| 15 "github.com/luci/luci-go/appengine/cmd/dm/model" | |
| 16 "github.com/luci/luci-go/appengine/tumble" | |
| 17 dm "github.com/luci/luci-go/common/api/dm/service/v1" | |
| 18 googlepb "github.com/luci/luci-go/common/proto/google" | |
| 19 "github.com/luci/luci-go/common/testing/assertions" | |
| 20 "github.com/luci/luci-go/server/secrets/testsecrets" | |
| 21 "github.com/smartystreets/goconvey/convey" | |
| 22 "golang.org/x/net/context" | |
| 23 ) | |
| 24 | |
| 25 // Setup creates a new combination of testing and context objects: | |
| 26 // * ttest - a tumble.Testing to allow you to control tumble's processing | |
| 27 // state | |
| 28 // * c - a context which includes a testing distributor registry, testsecrets, | |
| 29 // as well as everything that tumble.Testing.Context adds (datastore, | |
| 30 // memcache, etc.) | |
| 31 // * dist - a fake Distributor implementation with a RunTask method that | |
| 32 // allows your test to 'run' a scheduled task with the Distributor. This | |
| 33 // will automatically notify the deps service (by calling `fn`). | |
| 34 // * reg - a distributor Testing registry, pre-registerd with `dist` using the | |
| 35 // configuration name 'fakeDistributor'. | |
| 36 // | |
| 37 // You should pass mutate.FinishExecutionFn for fn. It's not done automatically | |
| 38 // in order to break an import cycle. You could provide your own, but YMMV. | |
| 39 // | |
| 40 // Usage: | |
| 41 // ttest, c, dist, reg := fake.Setup(mutate.FinishExecutionFn) | |
| 42 // s := deps.NewDecoratedServer(reg) | |
| 43 // # your tests | |
| 44 func Setup(fn distributor.FinishExecutionFn) (ttest *tumble.Testing, c context.C ontext, dist *Distributor, reg distributor.Registry) { | |
| 45 ttest = &tumble.Testing{} | |
| 46 c = ttest.Context() | |
| 47 c = testsecrets.Use(c) | |
| 48 dist = &Distributor{} | |
| 49 reg = distributor.NewTestingRegistry(map[string]distributor.D{ | |
| 50 "fakeDistributor": dist, | |
| 51 }, fn) | |
| 52 c = distributor.WithRegistry(c, reg) | |
| 53 return | |
| 54 } | |
| 55 | |
| 56 // DistributorData is the blob of data that the fake.Distributor keeps when DM | |
| 57 // calls its Run method. This is roughly equivalent to the state that | |
| 58 // a distributor (like swarming) would store in its own datastore about a job. | |
| 59 type DistributorData struct { | |
| 60 NotifyTopic string | |
| 61 NotifyAuth string | |
| 62 | |
| 63 Auth *dm.Execution_Auth | |
| 64 Desc *dm.Quest_Desc | |
| 65 | |
| 66 State distributor.PersistentState | |
| 67 | |
| 68 done bool | |
| 69 abnorm *dm.AbnormalFinish | |
| 70 } | |
| 71 | |
| 72 // Task is the detail that the distributor task would get. This is roughly | |
| 73 // equivalent to the input that the swarming task/recipe engine would get. | |
| 74 type Task struct { | |
| 75 Auth *dm.Execution_Auth | |
| 76 Desc *dm.Quest_Desc | |
| 77 // State is read/writable. | |
| 78 State distributor.PersistentState | |
| 79 } | |
| 80 | |
| 81 // Activate does the activation handshake with the provided DepsServer and | |
| 82 // returns an ActivatedTask. | |
| 83 func (t *Task) Activate(c context.Context, s dm.DepsServer) (*ActivatedTask, err or) { | |
| 84 newTok := model.MakeRandomToken(c, 32) | |
| 85 _, err := s.ActivateExecution(c, &dm.ActivateExecutionReq{ | |
| 86 Auth: t.Auth, ExecutionToken: newTok}) | |
| 87 if err != nil { | |
| 88 return nil, err | |
| 89 } | |
| 90 | |
| 91 return &ActivatedTask{ | |
| 92 s, | |
| 93 c, | |
| 94 &dm.Execution_Auth{Id: t.Auth.Id, Token: newTok}, | |
| 95 t.Desc, | |
| 96 &t.State, | |
| 97 }, nil | |
| 98 } | |
| 99 | |
| 100 // MustActivate does the same thing as Activate, but panics if err != nil. | |
| 101 func (t *Task) MustActivate(c context.Context, s dm.DepsServer) *ActivatedTask { | |
| 102 ret, err := t.Activate(c, s) | |
| 103 panicIf(err) | |
| 104 return ret | |
| 105 } | |
| 106 | |
| 107 // ActivatedTask is like a Task, but exists after calling Task.MustActivate, and | |
| 108 // contains an activated authentication token. This may be used to either add | |
| 109 // new dependencies or to provide a finished result. | |
| 110 // | |
| 111 // The implementation of DepsServer also automatically populates all outgoing | |
| 112 // RPCs with the activated Auth value. | |
| 113 type ActivatedTask struct { | |
| 114 s dm.DepsServer | |
| 115 c context.Context | |
| 116 | |
| 117 Auth *dm.Execution_Auth | |
| 118 Desc *dm.Quest_Desc | |
| 119 // State is read/writable. | |
| 120 State *distributor.PersistentState | |
| 121 } | |
| 122 | |
| 123 // WalkGraph calls the bound DepsServer's WalkGraph method with the activated | |
| 124 // Auth field. | |
| 125 func (t *ActivatedTask) WalkGraph(req *dm.WalkGraphReq) (*dm.GraphData, error) { | |
| 126 newReq := *req | |
| 127 newReq.Auth = t.Auth | |
| 128 return t.s.WalkGraph(t.c, &newReq) | |
| 129 } | |
| 130 | |
| 131 // EnsureGraphData calls the bound DepsServer's EnsureGraphData method with the | |
| 132 // activated Auth field in ForExecution. | |
| 133 func (t *ActivatedTask) EnsureGraphData(req *dm.EnsureGraphDataReq) (*dm.EnsureG raphDataRsp, error) { | |
| 134 newReq := *req | |
| 135 newReq.ForExecution = t.Auth | |
| 136 return t.s.EnsureGraphData(t.c, &newReq) | |
| 137 } | |
| 138 | |
| 139 // DepOn is a shorthand for EnsureGraphData which allows you to depend on | |
| 140 // multiple existing quests by attempt id. The definitions for these quests must | |
| 141 // already have been added to the deps server (probably with an EnsureGraphData | |
| 142 // call). | |
| 143 func (t *ActivatedTask) DepOn(to ...*dm.Attempt_ID) (halt bool, err error) { | |
|
dnj (Google)
2016/06/09 18:00:55
nit: don't use named return parameters here if you
iannucci
2016/06/15 00:45:59
Done.
| |
| 144 req := &dm.EnsureGraphDataReq{Attempts: dm.NewAttemptList(nil)} | |
| 145 req.Attempts.AddAIDs(to...) | |
| 146 | |
| 147 rsp, err := t.EnsureGraphData(req) | |
| 148 return rsp.ShouldHalt, err | |
| 149 } | |
| 150 | |
| 151 // MustDepOn is the same as DepOn but will panic if DepOn would have returned | |
| 152 // a non-nil error. | |
| 153 func (t *ActivatedTask) MustDepOn(to ...*dm.Attempt_ID) (halt bool) { | |
| 154 halt, err := t.DepOn(to...) | |
| 155 panicIf(err) | |
| 156 return | |
| 157 } | |
| 158 | |
| 159 // Finish calls FinishAttempt with the provided json body and optional | |
|
dnj (Google)
2016/06/09 18:00:54
nit: JSON
iannucci
2016/06/15 00:45:59
Done.
| |
| 160 // expiration time. | |
| 161 // | |
| 162 // This will panic if you provide more than one expiration time (so don't do | |
| 163 // that). | |
| 164 func (t *ActivatedTask) Finish(resultJSON string, expire ...time.Time) { | |
| 165 req := &dm.FinishAttemptReq{ | |
| 166 Auth: t.Auth, | |
| 167 JsonResult: resultJSON, | |
| 168 } | |
| 169 if len(expire) > 0 { | |
|
dnj (Google)
2016/06/09 18:00:55
Suggest: switch len(expire) { case 0: case 1: defa
iannucci
2016/06/15 00:45:59
Done.
| |
| 170 if len(expire) > 1 { | |
| 171 panic("may only specify 0 or 1 expire values") | |
| 172 } | |
| 173 req.Expiration = googlepb.NewTimestamp(expire[0]) | |
| 174 } | |
| 175 | |
| 176 _, err := t.s.FinishAttempt(t.c, req) | |
| 177 panicIf(err) | |
| 178 } | |
| 179 | |
| 180 // WalkShouldReturn is a shorthand for the package-level WalkShouldReturn which | |
| 181 // binds the activated auth to the WalkGraph request, but otherwise behaves | |
| 182 // identically. | |
| 183 // | |
| 184 // Use this method like: | |
| 185 // req := &dm.WalkGraphReq{...} | |
| 186 // So(req, activated.WalkShouldReturn, &dm.GraphData{ | |
| 187 // ... | |
| 188 // }) | |
| 189 func (t *ActivatedTask) WalkShouldReturn(request interface{}, expect ...interfac e{}) string { | |
| 190 r := *request.(*dm.WalkGraphReq) | |
| 191 r.Auth = t.Auth | |
| 192 return WalkShouldReturn(t.c, t.s)(&r, expect...) | |
| 193 } | |
| 194 | |
| 195 // Distributor implements distributor.D, and provides a method (RunTask) to | |
| 196 // allow a test to actually run a task which has been scheduled on this | |
| 197 // Distributor, and correctly notify the deps server that the execution is | |
| 198 // complete. | |
| 199 type Distributor struct { | |
| 200 // RunError can be set to make Run return this error when it's invoked. | |
| 201 RunError error | |
| 202 // These values can be set to make Run return them when it's invoked. | |
| 203 TimeToStart time.Duration | |
| 204 TimeToRun time.Duration | |
| 205 TimeToStop time.Duration | |
| 206 | |
| 207 sync.Mutex | |
| 208 tasks map[distributor.Token]*DistributorData | |
| 209 } | |
| 210 | |
| 211 // MkToken makes a distributor Token out of an Execution_ID. In this | |
| 212 // implementation of a Distributor there's a 1:1 mapping between Execution_ID | |
| 213 // and distributor task. This is not always the case for real distributor | |
| 214 // implementations. | |
| 215 func MkToken(eid *dm.Execution_ID) distributor.Token { | |
| 216 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que st, | |
| 217 eid.Attempt, eid.Id)) | |
| 218 } | |
| 219 | |
| 220 // Run implements distributor.D | |
| 221 func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.To ken, timeToStart, timeToRun, timeToStop time.Duration, err error) { | |
| 222 if f.RunError != nil { | |
|
dnj (Google)
2016/06/09 18:00:55
Suggest:
if err = f.RunError; err != nil { return
iannucci
2016/06/15 00:45:59
Done.
| |
| 223 err = f.RunError | |
| 224 return | |
| 225 } | |
| 226 timeToStart = f.TimeToStart | |
| 227 timeToRun = f.TimeToRun | |
| 228 timeToStop = f.TimeToStop | |
| 229 | |
| 230 f.Lock() | |
|
dnj (Google)
2016/06/09 18:00:55
nit: do all task creation outside of Lock, save Lo
iannucci
2016/06/15 00:45:59
done. this testing code changed a bunch
| |
| 231 exAuth := desc.ExecutionAuth() | |
| 232 tok = MkToken(exAuth.Id) | |
| 233 if f.tasks == nil { | |
| 234 f.tasks = map[distributor.Token]*DistributorData{} | |
| 235 } | |
| 236 tsk := &DistributorData{ | |
| 237 Auth: exAuth, | |
| 238 Desc: desc.Payload(), | |
| 239 State: desc.PreviousState(), | |
| 240 } | |
| 241 tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic() | |
| 242 panicIf(err) | |
| 243 f.tasks[tok] = tsk | |
| 244 f.Unlock() | |
| 245 return | |
| 246 } | |
| 247 | |
| 248 // Cancel implements distributor.D | |
| 249 func (f *Distributor) Cancel(tok distributor.Token) (err error) { | |
| 250 f.Lock() | |
| 251 if tsk, ok := f.tasks[tok]; ok { | |
|
dnj (Google)
2016/06/09 18:00:54
nit: defer f.Unlock()
iannucci
2016/06/15 00:45:59
Done.
| |
| 252 tsk.done = true | |
| 253 tsk.abnorm = &dm.AbnormalFinish{ | |
| 254 Status: dm.AbnormalFinish_CANCELLED, | |
| 255 Reason: "cancelled via Cancel()"} | |
| 256 } else { | |
| 257 err = fmt.Errorf("MISSING task %s", tok) | |
|
dnj (Google)
2016/06/09 18:00:55
nit: %q
iannucci
2016/06/15 00:45:59
Done.
| |
| 258 } | |
| 259 f.Unlock() | |
| 260 return | |
| 261 } | |
| 262 | |
| 263 // GetStatus implements distributor.D | |
| 264 func (f *Distributor) GetStatus(tok distributor.Token) (rslt *distributor.TaskRe sult, err error) { | |
| 265 f.Lock() | |
|
dnj (Google)
2016/06/09 18:00:55
nit: defer f.Unlock()
iannucci
2016/06/15 00:45:59
Done.
| |
| 266 if tsk, ok := f.tasks[tok]; ok { | |
| 267 if tsk.done { | |
| 268 if tsk.abnorm != nil { | |
| 269 rslt = &distributor.TaskResult{AbnormalFinish: t sk.abnorm} | |
| 270 } else { | |
| 271 rslt = &distributor.TaskResult{PersistentState: tsk.State} | |
| 272 } | |
| 273 } | |
| 274 } else { | |
| 275 rslt = &distributor.TaskResult{ | |
| 276 AbnormalFinish: &dm.AbnormalFinish{ | |
| 277 Status: dm.AbnormalFinish_MISSING, | |
| 278 Reason: fmt.Sprintf("unknown token: %s", tok)}, | |
| 279 } | |
| 280 } | |
| 281 f.Unlock() | |
| 282 return | |
| 283 } | |
| 284 | |
| 285 // InfoURL implements distributor.D | |
| 286 func (f *Distributor) InfoURL(tok distributor.Token) string { | |
| 287 return "https://info.example.com/" + string(tok) | |
| 288 } | |
| 289 | |
| 290 // HandleNotification implements distributor.D | |
| 291 func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *dis tributor.TaskResult, err error) { | |
| 292 return f.GetStatus(distributor.Token(n.Attrs["token"])) | |
| 293 } | |
| 294 | |
| 295 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most | |
| 296 // tests. It could be implemented if some new test required it, however. | |
| 297 func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notif ication, error) { | |
| 298 panic("not implemented") | |
| 299 } | |
| 300 | |
| 301 // Validate implements distributor.D (by returning a nil error for every | |
| 302 // payload). | |
| 303 func (f *Distributor) Validate(payload string) error { | |
| 304 return nil | |
| 305 } | |
| 306 | |
| 307 // RunTask allows you to run the task associated with the provided execution id. | |
| 308 // | |
| 309 // If the task corresponding to `eid` returns an error, or if the distributor | |
| 310 // itself actually has an error, this method will return an error. Notably, if | |
| 311 // `cb` returns an error, it will simply mark the corresponding task as FAILED, | |
| 312 // but will return nil here. | |
| 313 // | |
| 314 // If the task exists and hasn't been run yet, cb will be called, and can do | |
| 315 // anything that you may want to a test to do. Think of the callback as the | |
| 316 // recipe engine; it has the opportunity to do anything it wants to, interact | |
| 317 // with the deps server (or not), succeed (or not), etc. | |
| 318 // | |
| 319 // If the callback needs to maintain state between executions, Task.State is | |
| 320 // read+write; when the callback exits, the final value of Task.State will be | |
| 321 // passed back to the DM instance under test. A re-execution of the attempt will | |
| 322 // start with the new value. | |
| 323 func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(* Task) error) (err error) { | |
| 324 tok := MkToken(eid) | |
| 325 | |
| 326 f.Lock() | |
| 327 tsk := f.tasks[tok] | |
| 328 if tsk == nil { | |
| 329 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok) | |
| 330 } else { | |
| 331 if tsk.done { | |
| 332 err = fmt.Errorf("cannot RunTask(%q): running twice", to k) | |
| 333 } else { | |
| 334 tsk.done = true | |
| 335 } | |
| 336 } | |
| 337 f.Unlock() | |
| 338 | |
| 339 if err != nil { | |
| 340 return | |
| 341 } | |
| 342 | |
| 343 abnorm := (*dm.AbnormalFinish)(nil) | |
| 344 | |
| 345 usrTsk := &Task{ | |
| 346 tsk.Auth, | |
| 347 tsk.Desc, | |
| 348 tsk.State, | |
| 349 } | |
| 350 | |
| 351 defer func() { | |
| 352 f.Lock() | |
| 353 { | |
| 354 tsk.abnorm = abnorm | |
| 355 tsk.State = usrTsk.State | |
| 356 | |
| 357 if r := recover(); r != nil { | |
| 358 tsk.abnorm = &dm.AbnormalFinish{ | |
| 359 Status: dm.AbnormalFinish_CRASHED, | |
| 360 Reason: fmt.Sprintf("caught panic: %q", r), | |
| 361 } | |
| 362 } | |
| 363 } | |
| 364 f.Unlock() | |
| 365 | |
| 366 err = tumble.RunMutation(c, &distributor.NotifyExecution{ | |
| 367 CfgName: "fakeDistributor", | |
| 368 Notification: &distributor.Notification{ | |
| 369 ID: tsk.Auth.Id, | |
| 370 Attrs: map[string]string{"token": string(tok)}}, | |
| 371 }) | |
| 372 }() | |
| 373 | |
| 374 err = cb(usrTsk) | |
| 375 if err != nil { | |
| 376 err = nil | |
| 377 abnorm = &dm.AbnormalFinish{ | |
| 378 Status: dm.AbnormalFinish_FAILED, | |
| 379 Reason: fmt.Sprintf("cb error: %q", err), | |
| 380 } | |
| 381 } | |
| 382 return | |
| 383 } | |
| 384 | |
| 385 func panicIf(err error) { | |
| 386 if err != nil { | |
| 387 panic(err) | |
| 388 } | |
| 389 } | |
| 390 | |
| 391 var _ distributor.D = (*Distributor)(nil) | |
| 392 | |
| 393 // QuestDesc generates a normalized generic QuestDesc of the form: | |
| 394 // Quest_Desc{ | |
| 395 // DistributorConfigName: "fakeDistributor", | |
| 396 // JsonPayload: `{"name":"$name"}`, | |
| 397 // } | |
| 398 func QuestDesc(name string) *dm.Quest_Desc { | |
| 399 desc := &dm.Quest_Desc{ | |
| 400 DistributorConfigName: "fakeDistributor", | |
| 401 JsonPayload: fmt.Sprintf(`{"name": "%s"}`, name), | |
|
dnj (Google)
2016/06/09 18:00:55
This will have weird escaping issues if "name" has
iannucci
2016/06/15 00:45:59
Done.
| |
| 402 } | |
| 403 panicIf(desc.Normalize()) | |
| 404 return desc | |
| 405 } | |
| 406 | |
| 407 // WalkShouldReturn is a convey-style assertion factory to assert that a given | |
| 408 // WalkGraph request object results in the provided GraphData. | |
| 409 // | |
| 410 // If keepTimestamps (a singular, optional boolean) is provided and true, | |
| 411 // WalkShouldReturn will not remove timestamps from the compared GraphData. If | |
| 412 // it is absent or false, GraphData.PurgeTimestamps will be called on the | |
| 413 // returned GraphData before comparing it to the expected value. | |
| 414 // | |
| 415 // Use this function like: | |
| 416 // req := &dm.WalkGraphReq{...} | |
| 417 // So(req, WalkShouldReturn(c, s), &dm.GraphData{ | |
| 418 // ... | |
| 419 // }) | |
| 420 func WalkShouldReturn(c context.Context, s dm.DepsServer, keepTimestamps ...bool ) func(request interface{}, expect ...interface{}) string { | |
| 421 kt := len(keepTimestamps) > 0 && keepTimestamps[0] | |
| 422 if len(keepTimestamps) > 1 { | |
| 423 panic("may only specify 0 or 1 keepTimestamps values") | |
| 424 } | |
| 425 | |
| 426 normalize := func(gd *dm.GraphData) *dm.GraphData { | |
| 427 data, err := proto.Marshal(gd) | |
| 428 panicIf(err) | |
| 429 ret := &dm.GraphData{} | |
| 430 | |
| 431 panicIf(proto.Unmarshal(data, ret)) | |
| 432 | |
| 433 if !kt { | |
| 434 ret.PurgeTimestamps() | |
| 435 } | |
| 436 return ret | |
| 437 } | |
| 438 | |
| 439 return func(request interface{}, expect ...interface{}) string { | |
| 440 r := request.(*dm.WalkGraphReq) | |
|
dnj (Google)
2016/06/09 18:00:55
I guess you should check "expect" length here.
iannucci
2016/06/15 00:45:59
Done.
| |
| 441 e := expect[0].(*dm.GraphData) | |
| 442 ret, err := s.WalkGraph(c, r) | |
| 443 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" { | |
| 444 return nilExpect | |
| 445 } | |
| 446 return convey.ShouldResemble(normalize(ret), e) | |
| 447 } | |
| 448 } | |
| OLD | NEW |