Index: appengine/cmd/dm/distributor/fake/fake.go |
diff --git a/appengine/cmd/dm/distributor/fake/fake.go b/appengine/cmd/dm/distributor/fake/fake.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8c867b5076a34e3883107e2b227c1b54e034ea13 |
--- /dev/null |
+++ b/appengine/cmd/dm/distributor/fake/fake.go |
@@ -0,0 +1,459 @@ |
+// Copyright 2015 The LUCI Authors. All rights reserved. |
+// Use of this source code is governed under the Apache License, Version 2.0 |
+// that can be found in the LICENSE file. |
+ |
+package fake |
+ |
+import ( |
+ "encoding/json" |
+ "fmt" |
+ "net/http" |
+ "sync" |
+ "time" |
+ |
+ "github.com/golang/protobuf/proto" |
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
+ "github.com/luci/luci-go/appengine/cmd/dm/model" |
+ "github.com/luci/luci-go/appengine/tumble" |
+ dm "github.com/luci/luci-go/common/api/dm/service/v1" |
+ "github.com/luci/luci-go/common/gcloud/pubsub" |
+ googlepb "github.com/luci/luci-go/common/proto/google" |
+ "github.com/luci/luci-go/common/testing/assertions" |
+ "github.com/luci/luci-go/server/secrets/testsecrets" |
+ "github.com/smartystreets/goconvey/convey" |
+ "golang.org/x/net/context" |
+) |
+ |
+// Setup creates a new combination of testing and context objects: |
+// * ttest - a tumble.Testing to allow you to control tumble's processing |
+// state |
+// * c - a context which includes a testing distributor registry, testsecrets, |
+// as well as everything that tumble.Testing.Context adds (datastore, |
+// memcache, etc.) |
+// * dist - a fake Distributor implementation with a RunTask method that |
+// allows your test to 'run' a scheduled task with the Distributor. This |
+// will automatically notify the deps service (by calling `fn`). |
+// * reg - a distributor Testing registry, pre-registerd with `dist` using the |
+// configuration name 'fakeDistributor'. |
+// |
+// You should pass mutate.FinishExecutionFn for fn. It's not done automatically |
+// in order to break an import cycle. You could provide your own, but YMMV. |
+// |
+// Usage: |
+// ttest, c, dist, reg := fake.Setup(mutate.FinishExecutionFn) |
+// s := deps.NewDecoratedServer(reg) |
+// # your tests |
+func Setup(fn distributor.FinishExecutionFn) (ttest *tumble.Testing, c context.Context, dist *Distributor, reg distributor.Registry) { |
+ ttest = &tumble.Testing{} |
+ c = ttest.Context() |
+ c = testsecrets.Use(c) |
+ dist = &Distributor{} |
+ reg = distributor.NewTestingRegistry(map[string]distributor.D{ |
+ "fakeDistributor": dist, |
+ }, fn) |
+ c = distributor.WithRegistry(c, reg) |
+ return |
+} |
+ |
+// DistributorData is the blob of data that the fake.Distributor keeps when DM |
+// calls its Run method. This is roughly equivalent to the state that |
+// a distributor (like swarming) would store in its own datastore about a job. |
+type DistributorData struct { |
+ NotifyTopic pubsub.Topic |
+ NotifyAuth string |
+ |
+ Auth *dm.Execution_Auth |
+ Desc *dm.Quest_Desc |
+ |
+ State distributor.PersistentState |
+ |
+ done bool |
+ abnorm *dm.AbnormalFinish |
+} |
+ |
+// Task is the detail that the distributor task would get. This is roughly |
+// equivalent to the input that the swarming task/recipe engine would get. |
+type Task struct { |
+ Auth *dm.Execution_Auth |
+ Desc *dm.Quest_Desc |
+ // State is read/writable. |
+ State distributor.PersistentState |
+} |
+ |
+// Activate does the activation handshake with the provided DepsServer and |
+// returns an ActivatedTask. |
+func (t *Task) Activate(c context.Context, s dm.DepsServer) (*ActivatedTask, error) { |
+ newTok := model.MakeRandomToken(c, 32) |
+ _, err := s.ActivateExecution(c, &dm.ActivateExecutionReq{ |
+ Auth: t.Auth, ExecutionToken: newTok}) |
+ if err != nil { |
+ return nil, err |
+ } |
+ |
+ return &ActivatedTask{ |
+ s, |
+ c, |
+ &dm.Execution_Auth{Id: t.Auth.Id, Token: newTok}, |
+ t.Desc, |
+ &t.State, |
+ }, nil |
+} |
+ |
+// MustActivate does the same thing as Activate, but panics if err != nil. |
+func (t *Task) MustActivate(c context.Context, s dm.DepsServer) *ActivatedTask { |
+ ret, err := t.Activate(c, s) |
+ panicIf(err) |
+ return ret |
+} |
+ |
+// ActivatedTask is like a Task, but exists after calling Task.MustActivate, and |
+// contains an activated authentication token. This may be used to either add |
+// new dependencies or to provide a finished result. |
+// |
+// The implementation of DepsServer also automatically populates all outgoing |
+// RPCs with the activated Auth value. |
+type ActivatedTask struct { |
+ s dm.DepsServer |
+ c context.Context |
+ |
+ Auth *dm.Execution_Auth |
+ Desc *dm.Quest_Desc |
+ // State is read/writable. |
+ State *distributor.PersistentState |
+} |
+ |
+// WalkGraph calls the bound DepsServer's WalkGraph method with the activated |
+// Auth field. |
+func (t *ActivatedTask) WalkGraph(req *dm.WalkGraphReq) (*dm.GraphData, error) { |
+ newReq := *req |
+ newReq.Auth = t.Auth |
+ return t.s.WalkGraph(t.c, &newReq) |
+} |
+ |
+// EnsureGraphData calls the bound DepsServer's EnsureGraphData method with the |
+// activated Auth field in ForExecution. |
+func (t *ActivatedTask) EnsureGraphData(req *dm.EnsureGraphDataReq) (*dm.EnsureGraphDataRsp, error) { |
+ newReq := *req |
+ newReq.ForExecution = t.Auth |
+ return t.s.EnsureGraphData(t.c, &newReq) |
+} |
+ |
+// DepOn is a shorthand for EnsureGraphData which allows you to depend on |
+// multiple existing quests by attempt id. The definitions for these quests must |
+// already have been added to the deps server (probably with an EnsureGraphData |
+// call). |
+func (t *ActivatedTask) DepOn(to ...*dm.Attempt_ID) (bool, error) { |
+ req := &dm.EnsureGraphDataReq{Attempts: dm.NewAttemptList(nil)} |
+ req.Attempts.AddAIDs(to...) |
+ |
+ rsp, err := t.EnsureGraphData(req) |
+ return rsp.ShouldHalt, err |
+} |
+ |
+// MustDepOn is the same as DepOn but will panic if DepOn would have returned |
+// a non-nil error. |
+func (t *ActivatedTask) MustDepOn(to ...*dm.Attempt_ID) (halt bool) { |
+ halt, err := t.DepOn(to...) |
+ panicIf(err) |
+ return |
+} |
+ |
+// Finish calls FinishAttempt with the provided JSON body and optional |
+// expiration time. |
+// |
+// This will panic if you provide more than one expiration time (so don't do |
+// that). |
+func (t *ActivatedTask) Finish(resultJSON string, expire ...time.Time) { |
+ req := &dm.FinishAttemptReq{ |
+ Auth: t.Auth, |
+ JsonResult: resultJSON, |
+ } |
+ switch len(expire) { |
+ case 0: |
+ case 1: |
+ req.Expiration = googlepb.NewTimestamp(expire[0]) |
+ default: |
+ panic("may only specify 0 or 1 expire values") |
+ } |
+ |
+ _, err := t.s.FinishAttempt(t.c, req) |
+ panicIf(err) |
+} |
+ |
+// WalkShouldReturn is a shorthand for the package-level WalkShouldReturn which |
+// binds the activated auth to the WalkGraph request, but otherwise behaves |
+// identically. |
+// |
+// Use this method like: |
+// req := &dm.WalkGraphReq{...} |
+// So(req, activated.WalkShouldReturn, &dm.GraphData{ |
+// ... |
+// }) |
+func (t *ActivatedTask) WalkShouldReturn(request interface{}, expect ...interface{}) string { |
+ r := *request.(*dm.WalkGraphReq) |
+ r.Auth = t.Auth |
+ return WalkShouldReturn(t.c, t.s)(&r, expect...) |
+} |
+ |
+// Distributor implements distributor.D, and provides a method (RunTask) to |
+// allow a test to actually run a task which has been scheduled on this |
+// Distributor, and correctly notify the deps server that the execution is |
+// complete. |
+type Distributor struct { |
+ // RunError can be set to make Run return this error when it's invoked. |
+ RunError error |
+ // These values can be set to make Run return them when it's invoked. |
+ TimeToStart time.Duration |
+ TimeToRun time.Duration |
+ TimeToStop time.Duration |
+ |
+ sync.Mutex |
+ tasks map[distributor.Token]*DistributorData |
+} |
+ |
+// MkToken makes a distributor Token out of an Execution_ID. In this |
+// implementation of a Distributor there's a 1:1 mapping between Execution_ID |
+// and distributor task. This is not always the case for real distributor |
+// implementations. |
+func MkToken(eid *dm.Execution_ID) distributor.Token { |
+ return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Quest, |
+ eid.Attempt, eid.Id)) |
+} |
+ |
+// Run implements distributor.D |
+func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.Token, timeToStart, timeToRun, timeToStop time.Duration, err error) { |
+ if err = f.RunError; err != nil { |
+ return |
+ } |
+ timeToStart = f.TimeToStart |
+ timeToRun = f.TimeToRun |
+ timeToStop = f.TimeToStop |
+ |
+ exAuth := desc.ExecutionAuth() |
+ tok = MkToken(exAuth.Id) |
+ |
+ tsk := &DistributorData{ |
+ Auth: exAuth, |
+ Desc: desc.Payload(), |
+ State: desc.PreviousState(), |
+ } |
+ tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic() |
+ panicIf(err) |
+ |
+ f.Lock() |
+ defer f.Unlock() |
+ if f.tasks == nil { |
+ f.tasks = map[distributor.Token]*DistributorData{} |
+ } |
+ f.tasks[tok] = tsk |
+ return |
+} |
+ |
+// Cancel implements distributor.D |
+func (f *Distributor) Cancel(tok distributor.Token) (err error) { |
+ f.Lock() |
+ defer f.Unlock() |
+ if tsk, ok := f.tasks[tok]; ok { |
+ tsk.done = true |
+ tsk.abnorm = &dm.AbnormalFinish{ |
+ Status: dm.AbnormalFinish_CANCELLED, |
+ Reason: "cancelled via Cancel()"} |
+ } else { |
+ err = fmt.Errorf("MISSING task %q", tok) |
+ } |
+ return |
+} |
+ |
+// GetStatus implements distributor.D |
+func (f *Distributor) GetStatus(tok distributor.Token) (rslt *distributor.TaskResult, err error) { |
+ f.Lock() |
+ defer f.Unlock() |
+ if tsk, ok := f.tasks[tok]; ok { |
+ if tsk.done { |
+ if tsk.abnorm != nil { |
+ rslt = &distributor.TaskResult{AbnormalFinish: tsk.abnorm} |
+ } else { |
+ rslt = &distributor.TaskResult{PersistentState: tsk.State} |
+ } |
+ } |
+ } else { |
+ rslt = &distributor.TaskResult{ |
+ AbnormalFinish: &dm.AbnormalFinish{ |
+ Status: dm.AbnormalFinish_MISSING, |
+ Reason: fmt.Sprintf("unknown token: %s", tok)}, |
+ } |
+ } |
+ return |
+} |
+ |
+// InfoURL implements distributor.D |
+func (f *Distributor) InfoURL(tok distributor.Token) string { |
+ return "https://info.example.com/" + string(tok) |
+} |
+ |
+// HandleNotification implements distributor.D |
+func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *distributor.TaskResult, err error) { |
+ return f.GetStatus(distributor.Token(n.Attrs["token"])) |
+} |
+ |
+// HandleTaskQueueTask is not implemented, and shouldn't be needed for most |
+// tests. It could be implemented if some new test required it, however. |
+func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notification, error) { |
+ panic("not implemented") |
+} |
+ |
+// Validate implements distributor.D (by returning a nil error for every |
+// payload). |
+func (f *Distributor) Validate(payload string) error { |
+ return nil |
+} |
+ |
+// RunTask allows you to run the task associated with the provided execution id. |
+// |
+// If the task corresponding to `eid` returns an error, or if the distributor |
+// itself actually has an error, this method will return an error. Notably, if |
+// `cb` returns an error, it will simply mark the corresponding task as FAILED, |
+// but will return nil here. |
+// |
+// If the task exists and hasn't been run yet, cb will be called, and can do |
+// anything that you may want to a test to do. Think of the callback as the |
+// recipe engine; it has the opportunity to do anything it wants to, interact |
+// with the deps server (or not), succeed (or not), etc. |
+// |
+// If the callback needs to maintain state between executions, Task.State is |
+// read+write; when the callback exits, the final value of Task.State will be |
+// passed back to the DM instance under test. A re-execution of the attempt will |
+// start with the new value. |
+func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(*Task) error) (err error) { |
+ tok := MkToken(eid) |
+ |
+ f.Lock() |
+ tsk := f.tasks[tok] |
+ if tsk == nil { |
+ err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok) |
+ } else { |
+ if tsk.done { |
+ err = fmt.Errorf("cannot RunTask(%q): running twice", tok) |
+ } else { |
+ tsk.done = true |
+ } |
+ } |
+ f.Unlock() |
+ |
+ if err != nil { |
+ return |
+ } |
+ |
+ abnorm := (*dm.AbnormalFinish)(nil) |
+ |
+ usrTsk := &Task{ |
+ tsk.Auth, |
+ tsk.Desc, |
+ tsk.State, |
+ } |
+ |
+ defer func() { |
+ f.Lock() |
+ { |
+ tsk.abnorm = abnorm |
+ tsk.State = usrTsk.State |
+ |
+ if r := recover(); r != nil { |
+ tsk.abnorm = &dm.AbnormalFinish{ |
+ Status: dm.AbnormalFinish_CRASHED, |
+ Reason: fmt.Sprintf("caught panic: %q", r), |
+ } |
+ } |
+ } |
+ f.Unlock() |
+ |
+ err = tumble.RunMutation(c, &distributor.NotifyExecution{ |
+ CfgName: "fakeDistributor", |
+ Notification: &distributor.Notification{ |
+ ID: tsk.Auth.Id, |
+ Attrs: map[string]string{"token": string(tok)}}, |
+ }) |
+ }() |
+ |
+ err = cb(usrTsk) |
+ if err != nil { |
+ err = nil |
+ abnorm = &dm.AbnormalFinish{ |
+ Status: dm.AbnormalFinish_FAILED, |
+ Reason: fmt.Sprintf("cb error: %q", err), |
+ } |
+ } |
+ return |
+} |
+ |
+func panicIf(err error) { |
+ if err != nil { |
+ panic(err) |
+ } |
+} |
+ |
+var _ distributor.D = (*Distributor)(nil) |
+ |
+// QuestDesc generates a normalized generic QuestDesc of the form: |
+// Quest_Desc{ |
+// DistributorConfigName: "fakeDistributor", |
+// JsonPayload: `{"name":"$name"}`, |
+// } |
+func QuestDesc(name string) *dm.Quest_Desc { |
+ payload, err := json.Marshal(struct { |
+ Name string `json:"name"` |
+ }{name}) |
+ panicIf(err) |
+ desc := &dm.Quest_Desc{ |
+ DistributorConfigName: "fakeDistributor", |
+ JsonPayload: string(payload), |
+ } |
+ panicIf(desc.Normalize()) |
+ return desc |
+} |
+ |
+// WalkShouldReturn is a convey-style assertion factory to assert that a given |
+// WalkGraph request object results in the provided GraphData. |
+// |
+// If keepTimestamps (a singular, optional boolean) is provided and true, |
+// WalkShouldReturn will not remove timestamps from the compared GraphData. If |
+// it is absent or false, GraphData.PurgeTimestamps will be called on the |
+// returned GraphData before comparing it to the expected value. |
+// |
+// Use this function like: |
+// req := &dm.WalkGraphReq{...} |
+// So(req, WalkShouldReturn(c, s), &dm.GraphData{ |
+// ... |
+// }) |
+func WalkShouldReturn(c context.Context, s dm.DepsServer, keepTimestamps ...bool) func(request interface{}, expect ...interface{}) string { |
+ kt := len(keepTimestamps) > 0 && keepTimestamps[0] |
+ if len(keepTimestamps) > 1 { |
+ panic("may only specify 0 or 1 keepTimestamps values") |
+ } |
+ |
+ normalize := func(gd *dm.GraphData) *dm.GraphData { |
+ data, err := proto.Marshal(gd) |
+ panicIf(err) |
+ ret := &dm.GraphData{} |
+ |
+ panicIf(proto.Unmarshal(data, ret)) |
+ |
+ if !kt { |
+ ret.PurgeTimestamps() |
+ } |
+ return ret |
+ } |
+ |
+ return func(request interface{}, expect ...interface{}) string { |
+ r := request.(*dm.WalkGraphReq) |
+ if len(expect) != 1 { |
+ panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(expect))) |
+ } |
+ e := expect[0].(*dm.GraphData) |
+ ret, err := s.WalkGraph(c, r) |
+ if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" { |
+ return nilExpect |
+ } |
+ return convey.ShouldResemble(normalize(ret), e) |
+ } |
+} |