OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package fake |
| 6 |
| 7 import ( |
| 8 "encoding/json" |
| 9 "fmt" |
| 10 "net/http" |
| 11 "sync" |
| 12 "time" |
| 13 |
| 14 "github.com/golang/protobuf/proto" |
| 15 "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
| 16 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 17 "github.com/luci/luci-go/appengine/tumble" |
| 18 dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| 19 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 20 googlepb "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/testing/assertions" |
| 22 "github.com/luci/luci-go/server/secrets/testsecrets" |
| 23 "github.com/smartystreets/goconvey/convey" |
| 24 "golang.org/x/net/context" |
| 25 ) |
| 26 |
| 27 // Setup creates a new combination of testing and context objects: |
| 28 // * ttest - a tumble.Testing to allow you to control tumble's processing |
| 29 // state |
| 30 // * c - a context which includes a testing distributor registry, testsecrets, |
| 31 // as well as everything that tumble.Testing.Context adds (datastore, |
| 32 // memcache, etc.) |
| 33 // * dist - a fake Distributor implementation with a RunTask method that |
| 34 // allows your test to 'run' a scheduled task with the Distributor. This |
| 35 // will automatically notify the deps service (by calling `fn`). |
| 36 // * reg - a distributor Testing registry, pre-registerd with `dist` using the |
| 37 // configuration name 'fakeDistributor'. |
| 38 // |
| 39 // You should pass mutate.FinishExecutionFn for fn. It's not done automatically |
| 40 // in order to break an import cycle. You could provide your own, but YMMV. |
| 41 // |
| 42 // Usage: |
| 43 // ttest, c, dist, reg := fake.Setup(mutate.FinishExecutionFn) |
| 44 // s := deps.NewDecoratedServer(reg) |
| 45 // # your tests |
| 46 func Setup(fn distributor.FinishExecutionFn) (ttest *tumble.Testing, c context.C
ontext, dist *Distributor, reg distributor.Registry) { |
| 47 ttest = &tumble.Testing{} |
| 48 c = ttest.Context() |
| 49 c = testsecrets.Use(c) |
| 50 dist = &Distributor{} |
| 51 reg = distributor.NewTestingRegistry(map[string]distributor.D{ |
| 52 "fakeDistributor": dist, |
| 53 }, fn) |
| 54 c = distributor.WithRegistry(c, reg) |
| 55 return |
| 56 } |
| 57 |
| 58 // DistributorData is the blob of data that the fake.Distributor keeps when DM |
| 59 // calls its Run method. This is roughly equivalent to the state that |
| 60 // a distributor (like swarming) would store in its own datastore about a job. |
| 61 type DistributorData struct { |
| 62 NotifyTopic pubsub.Topic |
| 63 NotifyAuth string |
| 64 |
| 65 Auth *dm.Execution_Auth |
| 66 Desc *dm.Quest_Desc |
| 67 |
| 68 State distributor.PersistentState |
| 69 |
| 70 done bool |
| 71 abnorm *dm.AbnormalFinish |
| 72 } |
| 73 |
| 74 // Task is the detail that the distributor task would get. This is roughly |
| 75 // equivalent to the input that the swarming task/recipe engine would get. |
| 76 type Task struct { |
| 77 Auth *dm.Execution_Auth |
| 78 Desc *dm.Quest_Desc |
| 79 // State is read/writable. |
| 80 State distributor.PersistentState |
| 81 } |
| 82 |
| 83 // Activate does the activation handshake with the provided DepsServer and |
| 84 // returns an ActivatedTask. |
| 85 func (t *Task) Activate(c context.Context, s dm.DepsServer) (*ActivatedTask, err
or) { |
| 86 newTok := model.MakeRandomToken(c, 32) |
| 87 _, err := s.ActivateExecution(c, &dm.ActivateExecutionReq{ |
| 88 Auth: t.Auth, ExecutionToken: newTok}) |
| 89 if err != nil { |
| 90 return nil, err |
| 91 } |
| 92 |
| 93 return &ActivatedTask{ |
| 94 s, |
| 95 c, |
| 96 &dm.Execution_Auth{Id: t.Auth.Id, Token: newTok}, |
| 97 t.Desc, |
| 98 &t.State, |
| 99 }, nil |
| 100 } |
| 101 |
| 102 // MustActivate does the same thing as Activate, but panics if err != nil. |
| 103 func (t *Task) MustActivate(c context.Context, s dm.DepsServer) *ActivatedTask { |
| 104 ret, err := t.Activate(c, s) |
| 105 panicIf(err) |
| 106 return ret |
| 107 } |
| 108 |
| 109 // ActivatedTask is like a Task, but exists after calling Task.MustActivate, and |
| 110 // contains an activated authentication token. This may be used to either add |
| 111 // new dependencies or to provide a finished result. |
| 112 // |
| 113 // The implementation of DepsServer also automatically populates all outgoing |
| 114 // RPCs with the activated Auth value. |
| 115 type ActivatedTask struct { |
| 116 s dm.DepsServer |
| 117 c context.Context |
| 118 |
| 119 Auth *dm.Execution_Auth |
| 120 Desc *dm.Quest_Desc |
| 121 // State is read/writable. |
| 122 State *distributor.PersistentState |
| 123 } |
| 124 |
| 125 // WalkGraph calls the bound DepsServer's WalkGraph method with the activated |
| 126 // Auth field. |
| 127 func (t *ActivatedTask) WalkGraph(req *dm.WalkGraphReq) (*dm.GraphData, error) { |
| 128 newReq := *req |
| 129 newReq.Auth = t.Auth |
| 130 return t.s.WalkGraph(t.c, &newReq) |
| 131 } |
| 132 |
| 133 // EnsureGraphData calls the bound DepsServer's EnsureGraphData method with the |
| 134 // activated Auth field in ForExecution. |
| 135 func (t *ActivatedTask) EnsureGraphData(req *dm.EnsureGraphDataReq) (*dm.EnsureG
raphDataRsp, error) { |
| 136 newReq := *req |
| 137 newReq.ForExecution = t.Auth |
| 138 return t.s.EnsureGraphData(t.c, &newReq) |
| 139 } |
| 140 |
| 141 // DepOn is a shorthand for EnsureGraphData which allows you to depend on |
| 142 // multiple existing quests by attempt id. The definitions for these quests must |
| 143 // already have been added to the deps server (probably with an EnsureGraphData |
| 144 // call). |
| 145 func (t *ActivatedTask) DepOn(to ...*dm.Attempt_ID) (bool, error) { |
| 146 req := &dm.EnsureGraphDataReq{Attempts: dm.NewAttemptList(nil)} |
| 147 req.Attempts.AddAIDs(to...) |
| 148 |
| 149 rsp, err := t.EnsureGraphData(req) |
| 150 return rsp.ShouldHalt, err |
| 151 } |
| 152 |
| 153 // MustDepOn is the same as DepOn but will panic if DepOn would have returned |
| 154 // a non-nil error. |
| 155 func (t *ActivatedTask) MustDepOn(to ...*dm.Attempt_ID) (halt bool) { |
| 156 halt, err := t.DepOn(to...) |
| 157 panicIf(err) |
| 158 return |
| 159 } |
| 160 |
| 161 // Finish calls FinishAttempt with the provided JSON body and optional |
| 162 // expiration time. |
| 163 // |
| 164 // This will panic if you provide more than one expiration time (so don't do |
| 165 // that). |
| 166 func (t *ActivatedTask) Finish(resultJSON string, expire ...time.Time) { |
| 167 req := &dm.FinishAttemptReq{ |
| 168 Auth: t.Auth, |
| 169 JsonResult: resultJSON, |
| 170 } |
| 171 switch len(expire) { |
| 172 case 0: |
| 173 case 1: |
| 174 req.Expiration = googlepb.NewTimestamp(expire[0]) |
| 175 default: |
| 176 panic("may only specify 0 or 1 expire values") |
| 177 } |
| 178 |
| 179 _, err := t.s.FinishAttempt(t.c, req) |
| 180 panicIf(err) |
| 181 } |
| 182 |
| 183 // WalkShouldReturn is a shorthand for the package-level WalkShouldReturn which |
| 184 // binds the activated auth to the WalkGraph request, but otherwise behaves |
| 185 // identically. |
| 186 // |
| 187 // Use this method like: |
| 188 // req := &dm.WalkGraphReq{...} |
| 189 // So(req, activated.WalkShouldReturn, &dm.GraphData{ |
| 190 // ... |
| 191 // }) |
| 192 func (t *ActivatedTask) WalkShouldReturn(request interface{}, expect ...interfac
e{}) string { |
| 193 r := *request.(*dm.WalkGraphReq) |
| 194 r.Auth = t.Auth |
| 195 return WalkShouldReturn(t.c, t.s)(&r, expect...) |
| 196 } |
| 197 |
| 198 // Distributor implements distributor.D, and provides a method (RunTask) to |
| 199 // allow a test to actually run a task which has been scheduled on this |
| 200 // Distributor, and correctly notify the deps server that the execution is |
| 201 // complete. |
| 202 type Distributor struct { |
| 203 // RunError can be set to make Run return this error when it's invoked. |
| 204 RunError error |
| 205 // These values can be set to make Run return them when it's invoked. |
| 206 TimeToStart time.Duration |
| 207 TimeToRun time.Duration |
| 208 TimeToStop time.Duration |
| 209 |
| 210 sync.Mutex |
| 211 tasks map[distributor.Token]*DistributorData |
| 212 } |
| 213 |
| 214 // MkToken makes a distributor Token out of an Execution_ID. In this |
| 215 // implementation of a Distributor there's a 1:1 mapping between Execution_ID |
| 216 // and distributor task. This is not always the case for real distributor |
| 217 // implementations. |
| 218 func MkToken(eid *dm.Execution_ID) distributor.Token { |
| 219 return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Que
st, |
| 220 eid.Attempt, eid.Id)) |
| 221 } |
| 222 |
| 223 // Run implements distributor.D |
| 224 func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.To
ken, timeToStart, timeToRun, timeToStop time.Duration, err error) { |
| 225 if err = f.RunError; err != nil { |
| 226 return |
| 227 } |
| 228 timeToStart = f.TimeToStart |
| 229 timeToRun = f.TimeToRun |
| 230 timeToStop = f.TimeToStop |
| 231 |
| 232 exAuth := desc.ExecutionAuth() |
| 233 tok = MkToken(exAuth.Id) |
| 234 |
| 235 tsk := &DistributorData{ |
| 236 Auth: exAuth, |
| 237 Desc: desc.Payload(), |
| 238 State: desc.PreviousState(), |
| 239 } |
| 240 tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic() |
| 241 panicIf(err) |
| 242 |
| 243 f.Lock() |
| 244 defer f.Unlock() |
| 245 if f.tasks == nil { |
| 246 f.tasks = map[distributor.Token]*DistributorData{} |
| 247 } |
| 248 f.tasks[tok] = tsk |
| 249 return |
| 250 } |
| 251 |
| 252 // Cancel implements distributor.D |
| 253 func (f *Distributor) Cancel(tok distributor.Token) (err error) { |
| 254 f.Lock() |
| 255 defer f.Unlock() |
| 256 if tsk, ok := f.tasks[tok]; ok { |
| 257 tsk.done = true |
| 258 tsk.abnorm = &dm.AbnormalFinish{ |
| 259 Status: dm.AbnormalFinish_CANCELLED, |
| 260 Reason: "cancelled via Cancel()"} |
| 261 } else { |
| 262 err = fmt.Errorf("MISSING task %q", tok) |
| 263 } |
| 264 return |
| 265 } |
| 266 |
| 267 // GetStatus implements distributor.D |
| 268 func (f *Distributor) GetStatus(tok distributor.Token) (rslt *distributor.TaskRe
sult, err error) { |
| 269 f.Lock() |
| 270 defer f.Unlock() |
| 271 if tsk, ok := f.tasks[tok]; ok { |
| 272 if tsk.done { |
| 273 if tsk.abnorm != nil { |
| 274 rslt = &distributor.TaskResult{AbnormalFinish: t
sk.abnorm} |
| 275 } else { |
| 276 rslt = &distributor.TaskResult{PersistentState:
tsk.State} |
| 277 } |
| 278 } |
| 279 } else { |
| 280 rslt = &distributor.TaskResult{ |
| 281 AbnormalFinish: &dm.AbnormalFinish{ |
| 282 Status: dm.AbnormalFinish_MISSING, |
| 283 Reason: fmt.Sprintf("unknown token: %s", tok)}, |
| 284 } |
| 285 } |
| 286 return |
| 287 } |
| 288 |
| 289 // InfoURL implements distributor.D |
| 290 func (f *Distributor) InfoURL(tok distributor.Token) string { |
| 291 return "https://info.example.com/" + string(tok) |
| 292 } |
| 293 |
| 294 // HandleNotification implements distributor.D |
| 295 func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *dis
tributor.TaskResult, err error) { |
| 296 return f.GetStatus(distributor.Token(n.Attrs["token"])) |
| 297 } |
| 298 |
| 299 // HandleTaskQueueTask is not implemented, and shouldn't be needed for most |
| 300 // tests. It could be implemented if some new test required it, however. |
| 301 func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notif
ication, error) { |
| 302 panic("not implemented") |
| 303 } |
| 304 |
| 305 // Validate implements distributor.D (by returning a nil error for every |
| 306 // payload). |
| 307 func (f *Distributor) Validate(payload string) error { |
| 308 return nil |
| 309 } |
| 310 |
| 311 // RunTask allows you to run the task associated with the provided execution id. |
| 312 // |
| 313 // If the task corresponding to `eid` returns an error, or if the distributor |
| 314 // itself actually has an error, this method will return an error. Notably, if |
| 315 // `cb` returns an error, it will simply mark the corresponding task as FAILED, |
| 316 // but will return nil here. |
| 317 // |
| 318 // If the task exists and hasn't been run yet, cb will be called, and can do |
| 319 // anything that you may want to a test to do. Think of the callback as the |
| 320 // recipe engine; it has the opportunity to do anything it wants to, interact |
| 321 // with the deps server (or not), succeed (or not), etc. |
| 322 // |
| 323 // If the callback needs to maintain state between executions, Task.State is |
| 324 // read+write; when the callback exits, the final value of Task.State will be |
| 325 // passed back to the DM instance under test. A re-execution of the attempt will |
| 326 // start with the new value. |
| 327 func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(*
Task) error) (err error) { |
| 328 tok := MkToken(eid) |
| 329 |
| 330 f.Lock() |
| 331 tsk := f.tasks[tok] |
| 332 if tsk == nil { |
| 333 err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok) |
| 334 } else { |
| 335 if tsk.done { |
| 336 err = fmt.Errorf("cannot RunTask(%q): running twice", to
k) |
| 337 } else { |
| 338 tsk.done = true |
| 339 } |
| 340 } |
| 341 f.Unlock() |
| 342 |
| 343 if err != nil { |
| 344 return |
| 345 } |
| 346 |
| 347 abnorm := (*dm.AbnormalFinish)(nil) |
| 348 |
| 349 usrTsk := &Task{ |
| 350 tsk.Auth, |
| 351 tsk.Desc, |
| 352 tsk.State, |
| 353 } |
| 354 |
| 355 defer func() { |
| 356 f.Lock() |
| 357 { |
| 358 tsk.abnorm = abnorm |
| 359 tsk.State = usrTsk.State |
| 360 |
| 361 if r := recover(); r != nil { |
| 362 tsk.abnorm = &dm.AbnormalFinish{ |
| 363 Status: dm.AbnormalFinish_CRASHED, |
| 364 Reason: fmt.Sprintf("caught panic: %q",
r), |
| 365 } |
| 366 } |
| 367 } |
| 368 f.Unlock() |
| 369 |
| 370 err = tumble.RunMutation(c, &distributor.NotifyExecution{ |
| 371 CfgName: "fakeDistributor", |
| 372 Notification: &distributor.Notification{ |
| 373 ID: tsk.Auth.Id, |
| 374 Attrs: map[string]string{"token": string(tok)}}, |
| 375 }) |
| 376 }() |
| 377 |
| 378 err = cb(usrTsk) |
| 379 if err != nil { |
| 380 err = nil |
| 381 abnorm = &dm.AbnormalFinish{ |
| 382 Status: dm.AbnormalFinish_FAILED, |
| 383 Reason: fmt.Sprintf("cb error: %q", err), |
| 384 } |
| 385 } |
| 386 return |
| 387 } |
| 388 |
| 389 func panicIf(err error) { |
| 390 if err != nil { |
| 391 panic(err) |
| 392 } |
| 393 } |
| 394 |
| 395 var _ distributor.D = (*Distributor)(nil) |
| 396 |
| 397 // QuestDesc generates a normalized generic QuestDesc of the form: |
| 398 // Quest_Desc{ |
| 399 // DistributorConfigName: "fakeDistributor", |
| 400 // JsonPayload: `{"name":"$name"}`, |
| 401 // } |
| 402 func QuestDesc(name string) *dm.Quest_Desc { |
| 403 payload, err := json.Marshal(struct { |
| 404 Name string `json:"name"` |
| 405 }{name}) |
| 406 panicIf(err) |
| 407 desc := &dm.Quest_Desc{ |
| 408 DistributorConfigName: "fakeDistributor", |
| 409 JsonPayload: string(payload), |
| 410 } |
| 411 panicIf(desc.Normalize()) |
| 412 return desc |
| 413 } |
| 414 |
| 415 // WalkShouldReturn is a convey-style assertion factory to assert that a given |
| 416 // WalkGraph request object results in the provided GraphData. |
| 417 // |
| 418 // If keepTimestamps (a singular, optional boolean) is provided and true, |
| 419 // WalkShouldReturn will not remove timestamps from the compared GraphData. If |
| 420 // it is absent or false, GraphData.PurgeTimestamps will be called on the |
| 421 // returned GraphData before comparing it to the expected value. |
| 422 // |
| 423 // Use this function like: |
| 424 // req := &dm.WalkGraphReq{...} |
| 425 // So(req, WalkShouldReturn(c, s), &dm.GraphData{ |
| 426 // ... |
| 427 // }) |
| 428 func WalkShouldReturn(c context.Context, s dm.DepsServer, keepTimestamps ...bool
) func(request interface{}, expect ...interface{}) string { |
| 429 kt := len(keepTimestamps) > 0 && keepTimestamps[0] |
| 430 if len(keepTimestamps) > 1 { |
| 431 panic("may only specify 0 or 1 keepTimestamps values") |
| 432 } |
| 433 |
| 434 normalize := func(gd *dm.GraphData) *dm.GraphData { |
| 435 data, err := proto.Marshal(gd) |
| 436 panicIf(err) |
| 437 ret := &dm.GraphData{} |
| 438 |
| 439 panicIf(proto.Unmarshal(data, ret)) |
| 440 |
| 441 if !kt { |
| 442 ret.PurgeTimestamps() |
| 443 } |
| 444 return ret |
| 445 } |
| 446 |
| 447 return func(request interface{}, expect ...interface{}) string { |
| 448 r := request.(*dm.WalkGraphReq) |
| 449 if len(expect) != 1 { |
| 450 panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(ex
pect))) |
| 451 } |
| 452 e := expect[0].(*dm.GraphData) |
| 453 ret, err := s.WalkGraph(c, r) |
| 454 if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect !=
"" { |
| 455 return nilExpect |
| 456 } |
| 457 return convey.ShouldResemble(normalize(ret), e) |
| 458 } |
| 459 } |
OLD | NEW |