Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(309)

Unified Diff: appengine/cmd/dm/distributor/fake/fake.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/dm/distributor/distributor.go ('k') | appengine/cmd/dm/distributor/handlers.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/dm/distributor/fake/fake.go
diff --git a/appengine/cmd/dm/distributor/fake/fake.go b/appengine/cmd/dm/distributor/fake/fake.go
new file mode 100644
index 0000000000000000000000000000000000000000..8c867b5076a34e3883107e2b227c1b54e034ea13
--- /dev/null
+++ b/appengine/cmd/dm/distributor/fake/fake.go
@@ -0,0 +1,459 @@
+// Copyright 2015 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package fake
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor"
+ "github.com/luci/luci-go/appengine/cmd/dm/model"
+ "github.com/luci/luci-go/appengine/tumble"
+ dm "github.com/luci/luci-go/common/api/dm/service/v1"
+ "github.com/luci/luci-go/common/gcloud/pubsub"
+ googlepb "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/testing/assertions"
+ "github.com/luci/luci-go/server/secrets/testsecrets"
+ "github.com/smartystreets/goconvey/convey"
+ "golang.org/x/net/context"
+)
+
+// Setup creates a new combination of testing and context objects:
+// * ttest - a tumble.Testing to allow you to control tumble's processing
+// state
+// * c - a context which includes a testing distributor registry, testsecrets,
+// as well as everything that tumble.Testing.Context adds (datastore,
+// memcache, etc.)
+// * dist - a fake Distributor implementation with a RunTask method that
+// allows your test to 'run' a scheduled task with the Distributor. This
+// will automatically notify the deps service (by calling `fn`).
+// * reg - a distributor Testing registry, pre-registerd with `dist` using the
+// configuration name 'fakeDistributor'.
+//
+// You should pass mutate.FinishExecutionFn for fn. It's not done automatically
+// in order to break an import cycle. You could provide your own, but YMMV.
+//
+// Usage:
+// ttest, c, dist, reg := fake.Setup(mutate.FinishExecutionFn)
+// s := deps.NewDecoratedServer(reg)
+// # your tests
+func Setup(fn distributor.FinishExecutionFn) (ttest *tumble.Testing, c context.Context, dist *Distributor, reg distributor.Registry) {
+ ttest = &tumble.Testing{}
+ c = ttest.Context()
+ c = testsecrets.Use(c)
+ dist = &Distributor{}
+ reg = distributor.NewTestingRegistry(map[string]distributor.D{
+ "fakeDistributor": dist,
+ }, fn)
+ c = distributor.WithRegistry(c, reg)
+ return
+}
+
+// DistributorData is the blob of data that the fake.Distributor keeps when DM
+// calls its Run method. This is roughly equivalent to the state that
+// a distributor (like swarming) would store in its own datastore about a job.
+type DistributorData struct {
+ NotifyTopic pubsub.Topic
+ NotifyAuth string
+
+ Auth *dm.Execution_Auth
+ Desc *dm.Quest_Desc
+
+ State distributor.PersistentState
+
+ done bool
+ abnorm *dm.AbnormalFinish
+}
+
+// Task is the detail that the distributor task would get. This is roughly
+// equivalent to the input that the swarming task/recipe engine would get.
+type Task struct {
+ Auth *dm.Execution_Auth
+ Desc *dm.Quest_Desc
+ // State is read/writable.
+ State distributor.PersistentState
+}
+
+// Activate does the activation handshake with the provided DepsServer and
+// returns an ActivatedTask.
+func (t *Task) Activate(c context.Context, s dm.DepsServer) (*ActivatedTask, error) {
+ newTok := model.MakeRandomToken(c, 32)
+ _, err := s.ActivateExecution(c, &dm.ActivateExecutionReq{
+ Auth: t.Auth, ExecutionToken: newTok})
+ if err != nil {
+ return nil, err
+ }
+
+ return &ActivatedTask{
+ s,
+ c,
+ &dm.Execution_Auth{Id: t.Auth.Id, Token: newTok},
+ t.Desc,
+ &t.State,
+ }, nil
+}
+
+// MustActivate does the same thing as Activate, but panics if err != nil.
+func (t *Task) MustActivate(c context.Context, s dm.DepsServer) *ActivatedTask {
+ ret, err := t.Activate(c, s)
+ panicIf(err)
+ return ret
+}
+
+// ActivatedTask is like a Task, but exists after calling Task.MustActivate, and
+// contains an activated authentication token. This may be used to either add
+// new dependencies or to provide a finished result.
+//
+// The implementation of DepsServer also automatically populates all outgoing
+// RPCs with the activated Auth value.
+type ActivatedTask struct {
+ s dm.DepsServer
+ c context.Context
+
+ Auth *dm.Execution_Auth
+ Desc *dm.Quest_Desc
+ // State is read/writable.
+ State *distributor.PersistentState
+}
+
+// WalkGraph calls the bound DepsServer's WalkGraph method with the activated
+// Auth field.
+func (t *ActivatedTask) WalkGraph(req *dm.WalkGraphReq) (*dm.GraphData, error) {
+ newReq := *req
+ newReq.Auth = t.Auth
+ return t.s.WalkGraph(t.c, &newReq)
+}
+
+// EnsureGraphData calls the bound DepsServer's EnsureGraphData method with the
+// activated Auth field in ForExecution.
+func (t *ActivatedTask) EnsureGraphData(req *dm.EnsureGraphDataReq) (*dm.EnsureGraphDataRsp, error) {
+ newReq := *req
+ newReq.ForExecution = t.Auth
+ return t.s.EnsureGraphData(t.c, &newReq)
+}
+
+// DepOn is a shorthand for EnsureGraphData which allows you to depend on
+// multiple existing quests by attempt id. The definitions for these quests must
+// already have been added to the deps server (probably with an EnsureGraphData
+// call).
+func (t *ActivatedTask) DepOn(to ...*dm.Attempt_ID) (bool, error) {
+ req := &dm.EnsureGraphDataReq{Attempts: dm.NewAttemptList(nil)}
+ req.Attempts.AddAIDs(to...)
+
+ rsp, err := t.EnsureGraphData(req)
+ return rsp.ShouldHalt, err
+}
+
+// MustDepOn is the same as DepOn but will panic if DepOn would have returned
+// a non-nil error.
+func (t *ActivatedTask) MustDepOn(to ...*dm.Attempt_ID) (halt bool) {
+ halt, err := t.DepOn(to...)
+ panicIf(err)
+ return
+}
+
+// Finish calls FinishAttempt with the provided JSON body and optional
+// expiration time.
+//
+// This will panic if you provide more than one expiration time (so don't do
+// that).
+func (t *ActivatedTask) Finish(resultJSON string, expire ...time.Time) {
+ req := &dm.FinishAttemptReq{
+ Auth: t.Auth,
+ JsonResult: resultJSON,
+ }
+ switch len(expire) {
+ case 0:
+ case 1:
+ req.Expiration = googlepb.NewTimestamp(expire[0])
+ default:
+ panic("may only specify 0 or 1 expire values")
+ }
+
+ _, err := t.s.FinishAttempt(t.c, req)
+ panicIf(err)
+}
+
+// WalkShouldReturn is a shorthand for the package-level WalkShouldReturn which
+// binds the activated auth to the WalkGraph request, but otherwise behaves
+// identically.
+//
+// Use this method like:
+// req := &dm.WalkGraphReq{...}
+// So(req, activated.WalkShouldReturn, &dm.GraphData{
+// ...
+// })
+func (t *ActivatedTask) WalkShouldReturn(request interface{}, expect ...interface{}) string {
+ r := *request.(*dm.WalkGraphReq)
+ r.Auth = t.Auth
+ return WalkShouldReturn(t.c, t.s)(&r, expect...)
+}
+
+// Distributor implements distributor.D, and provides a method (RunTask) to
+// allow a test to actually run a task which has been scheduled on this
+// Distributor, and correctly notify the deps server that the execution is
+// complete.
+type Distributor struct {
+ // RunError can be set to make Run return this error when it's invoked.
+ RunError error
+ // These values can be set to make Run return them when it's invoked.
+ TimeToStart time.Duration
+ TimeToRun time.Duration
+ TimeToStop time.Duration
+
+ sync.Mutex
+ tasks map[distributor.Token]*DistributorData
+}
+
+// MkToken makes a distributor Token out of an Execution_ID. In this
+// implementation of a Distributor there's a 1:1 mapping between Execution_ID
+// and distributor task. This is not always the case for real distributor
+// implementations.
+func MkToken(eid *dm.Execution_ID) distributor.Token {
+ return distributor.Token(fmt.Sprintf("fakeDistributor:%s|%d|%d", eid.Quest,
+ eid.Attempt, eid.Id))
+}
+
+// Run implements distributor.D
+func (f *Distributor) Run(desc *distributor.TaskDescription) (tok distributor.Token, timeToStart, timeToRun, timeToStop time.Duration, err error) {
+ if err = f.RunError; err != nil {
+ return
+ }
+ timeToStart = f.TimeToStart
+ timeToRun = f.TimeToRun
+ timeToStop = f.TimeToStop
+
+ exAuth := desc.ExecutionAuth()
+ tok = MkToken(exAuth.Id)
+
+ tsk := &DistributorData{
+ Auth: exAuth,
+ Desc: desc.Payload(),
+ State: desc.PreviousState(),
+ }
+ tsk.NotifyTopic, tsk.NotifyAuth, err = desc.PrepareTopic()
+ panicIf(err)
+
+ f.Lock()
+ defer f.Unlock()
+ if f.tasks == nil {
+ f.tasks = map[distributor.Token]*DistributorData{}
+ }
+ f.tasks[tok] = tsk
+ return
+}
+
+// Cancel implements distributor.D
+func (f *Distributor) Cancel(tok distributor.Token) (err error) {
+ f.Lock()
+ defer f.Unlock()
+ if tsk, ok := f.tasks[tok]; ok {
+ tsk.done = true
+ tsk.abnorm = &dm.AbnormalFinish{
+ Status: dm.AbnormalFinish_CANCELLED,
+ Reason: "cancelled via Cancel()"}
+ } else {
+ err = fmt.Errorf("MISSING task %q", tok)
+ }
+ return
+}
+
+// GetStatus implements distributor.D
+func (f *Distributor) GetStatus(tok distributor.Token) (rslt *distributor.TaskResult, err error) {
+ f.Lock()
+ defer f.Unlock()
+ if tsk, ok := f.tasks[tok]; ok {
+ if tsk.done {
+ if tsk.abnorm != nil {
+ rslt = &distributor.TaskResult{AbnormalFinish: tsk.abnorm}
+ } else {
+ rslt = &distributor.TaskResult{PersistentState: tsk.State}
+ }
+ }
+ } else {
+ rslt = &distributor.TaskResult{
+ AbnormalFinish: &dm.AbnormalFinish{
+ Status: dm.AbnormalFinish_MISSING,
+ Reason: fmt.Sprintf("unknown token: %s", tok)},
+ }
+ }
+ return
+}
+
+// InfoURL implements distributor.D
+func (f *Distributor) InfoURL(tok distributor.Token) string {
+ return "https://info.example.com/" + string(tok)
+}
+
+// HandleNotification implements distributor.D
+func (f *Distributor) HandleNotification(n *distributor.Notification) (rslt *distributor.TaskResult, err error) {
+ return f.GetStatus(distributor.Token(n.Attrs["token"]))
+}
+
+// HandleTaskQueueTask is not implemented, and shouldn't be needed for most
+// tests. It could be implemented if some new test required it, however.
+func (f *Distributor) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notification, error) {
+ panic("not implemented")
+}
+
+// Validate implements distributor.D (by returning a nil error for every
+// payload).
+func (f *Distributor) Validate(payload string) error {
+ return nil
+}
+
+// RunTask allows you to run the task associated with the provided execution id.
+//
+// If the task corresponding to `eid` returns an error, or if the distributor
+// itself actually has an error, this method will return an error. Notably, if
+// `cb` returns an error, it will simply mark the corresponding task as FAILED,
+// but will return nil here.
+//
+// If the task exists and hasn't been run yet, cb will be called, and can do
+// anything that you may want to a test to do. Think of the callback as the
+// recipe engine; it has the opportunity to do anything it wants to, interact
+// with the deps server (or not), succeed (or not), etc.
+//
+// If the callback needs to maintain state between executions, Task.State is
+// read+write; when the callback exits, the final value of Task.State will be
+// passed back to the DM instance under test. A re-execution of the attempt will
+// start with the new value.
+func (f *Distributor) RunTask(c context.Context, eid *dm.Execution_ID, cb func(*Task) error) (err error) {
+ tok := MkToken(eid)
+
+ f.Lock()
+ tsk := f.tasks[tok]
+ if tsk == nil {
+ err = fmt.Errorf("cannot RunTask(%q): doesn't exist", tok)
+ } else {
+ if tsk.done {
+ err = fmt.Errorf("cannot RunTask(%q): running twice", tok)
+ } else {
+ tsk.done = true
+ }
+ }
+ f.Unlock()
+
+ if err != nil {
+ return
+ }
+
+ abnorm := (*dm.AbnormalFinish)(nil)
+
+ usrTsk := &Task{
+ tsk.Auth,
+ tsk.Desc,
+ tsk.State,
+ }
+
+ defer func() {
+ f.Lock()
+ {
+ tsk.abnorm = abnorm
+ tsk.State = usrTsk.State
+
+ if r := recover(); r != nil {
+ tsk.abnorm = &dm.AbnormalFinish{
+ Status: dm.AbnormalFinish_CRASHED,
+ Reason: fmt.Sprintf("caught panic: %q", r),
+ }
+ }
+ }
+ f.Unlock()
+
+ err = tumble.RunMutation(c, &distributor.NotifyExecution{
+ CfgName: "fakeDistributor",
+ Notification: &distributor.Notification{
+ ID: tsk.Auth.Id,
+ Attrs: map[string]string{"token": string(tok)}},
+ })
+ }()
+
+ err = cb(usrTsk)
+ if err != nil {
+ err = nil
+ abnorm = &dm.AbnormalFinish{
+ Status: dm.AbnormalFinish_FAILED,
+ Reason: fmt.Sprintf("cb error: %q", err),
+ }
+ }
+ return
+}
+
+func panicIf(err error) {
+ if err != nil {
+ panic(err)
+ }
+}
+
+var _ distributor.D = (*Distributor)(nil)
+
+// QuestDesc generates a normalized generic QuestDesc of the form:
+// Quest_Desc{
+// DistributorConfigName: "fakeDistributor",
+// JsonPayload: `{"name":"$name"}`,
+// }
+func QuestDesc(name string) *dm.Quest_Desc {
+ payload, err := json.Marshal(struct {
+ Name string `json:"name"`
+ }{name})
+ panicIf(err)
+ desc := &dm.Quest_Desc{
+ DistributorConfigName: "fakeDistributor",
+ JsonPayload: string(payload),
+ }
+ panicIf(desc.Normalize())
+ return desc
+}
+
+// WalkShouldReturn is a convey-style assertion factory to assert that a given
+// WalkGraph request object results in the provided GraphData.
+//
+// If keepTimestamps (a singular, optional boolean) is provided and true,
+// WalkShouldReturn will not remove timestamps from the compared GraphData. If
+// it is absent or false, GraphData.PurgeTimestamps will be called on the
+// returned GraphData before comparing it to the expected value.
+//
+// Use this function like:
+// req := &dm.WalkGraphReq{...}
+// So(req, WalkShouldReturn(c, s), &dm.GraphData{
+// ...
+// })
+func WalkShouldReturn(c context.Context, s dm.DepsServer, keepTimestamps ...bool) func(request interface{}, expect ...interface{}) string {
+ kt := len(keepTimestamps) > 0 && keepTimestamps[0]
+ if len(keepTimestamps) > 1 {
+ panic("may only specify 0 or 1 keepTimestamps values")
+ }
+
+ normalize := func(gd *dm.GraphData) *dm.GraphData {
+ data, err := proto.Marshal(gd)
+ panicIf(err)
+ ret := &dm.GraphData{}
+
+ panicIf(proto.Unmarshal(data, ret))
+
+ if !kt {
+ ret.PurgeTimestamps()
+ }
+ return ret
+ }
+
+ return func(request interface{}, expect ...interface{}) string {
+ r := request.(*dm.WalkGraphReq)
+ if len(expect) != 1 {
+ panic(fmt.Errorf("expected 1 arg on rhs, got %d", len(expect)))
+ }
+ e := expect[0].(*dm.GraphData)
+ ret, err := s.WalkGraph(c, r)
+ if nilExpect := assertions.ShouldErrLike(err, nil); nilExpect != "" {
+ return nilExpect
+ }
+ return convey.ShouldResemble(normalize(ret), e)
+ }
+}
« no previous file with comments | « appengine/cmd/dm/distributor/distributor.go ('k') | appengine/cmd/dm/distributor/handlers.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698