| Index: scheduler/appengine/engine/tq/tq_test.go
|
| diff --git a/scheduler/appengine/engine/tq/tq_test.go b/scheduler/appengine/engine/tq/tq_test.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..e3176d07a3c001f861b6c414fd49aff5f7115153
|
| --- /dev/null
|
| +++ b/scheduler/appengine/engine/tq/tq_test.go
|
| @@ -0,0 +1,234 @@
|
| +// Copyright 2017 The LUCI Authors.
|
| +//
|
| +// Licensed under the Apache License, Version 2.0 (the "License");
|
| +// you may not use this file except in compliance with the License.
|
| +// You may obtain a copy of the License at
|
| +//
|
| +// http://www.apache.org/licenses/LICENSE-2.0
|
| +//
|
| +// Unless required by applicable law or agreed to in writing, software
|
| +// distributed under the License is distributed on an "AS IS" BASIS,
|
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +// See the License for the specific language governing permissions and
|
| +// limitations under the License.
|
| +
|
| +package tq
|
| +
|
| +import (
|
| + "bytes"
|
| + "fmt"
|
| + "net/http/httptest"
|
| + "testing"
|
| + "time"
|
| +
|
| + "golang.org/x/net/context"
|
| +
|
| + "github.com/golang/protobuf/proto"
|
| + "github.com/golang/protobuf/ptypes/duration"
|
| + "github.com/golang/protobuf/ptypes/empty"
|
| +
|
| + "github.com/luci/gae/impl/memory"
|
| + "github.com/luci/gae/service/taskqueue"
|
| +
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/retry/transient"
|
| + "github.com/luci/luci-go/server/router"
|
| +
|
| + . "github.com/smartystreets/goconvey/convey"
|
| +)
|
| +
|
| +var epoch = time.Unix(1500000000, 0).UTC()
|
| +
|
| +func TestDispatcher(t *testing.T) {
|
| + t.Parallel()
|
| +
|
| + Convey("With dispatcher", t, func() {
|
| + ctx := memory.Use(context.Background())
|
| + ctx = clock.Set(ctx, testclock.New(epoch))
|
| + taskqueue.GetTestable(ctx).CreateQueue("another-q")
|
| +
|
| + d := Dispatcher{}
|
| + r := router.New()
|
| +
|
| + installRoutes := func() {
|
| + d.InstallRoutes(r, router.NewMiddlewareChain(func(c *router.Context, next router.Handler) {
|
| + c.Context = ctx
|
| + next(c)
|
| + }))
|
| + }
|
| +
|
| + Convey("Single task", func() {
|
| + var calls []proto.Message
|
| + handler := func(c context.Context, payload proto.Message, execCount int) error {
|
| + calls = append(calls, payload)
|
| + return nil
|
| + }
|
| +
|
| + // Abuse some well-known proto type to simplify the test. It's doesn't
|
| + // matter what proto type we use here as long as it is registered in
|
| + // protobuf type registry.
|
| + d.RegisterTask(&duration.Duration{}, handler, "", nil)
|
| + installRoutes()
|
| +
|
| + err := d.AddTask(ctx, &Task{
|
| + Payload: &duration.Duration{Seconds: 123},
|
| + DeduplicationKey: "abc",
|
| + Title: "abc-def",
|
| + Delay: 30 * time.Second,
|
| + })
|
| + So(err, ShouldBeNil)
|
| +
|
| + // Added the task.
|
| + expectedPath := "/internal/tasks/default/abc-def"
|
| + expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
|
| + expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`)
|
| + tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
|
| + So(tasks, ShouldResemble, taskqueue.QueueData{
|
| + "default": map[string]*taskqueue.Task{
|
| + expectedName: {
|
| + Path: expectedPath,
|
| + Payload: expectedBody,
|
| + Name: expectedName,
|
| + Method: "POST",
|
| + ETA: epoch.Add(30 * time.Second),
|
| + },
|
| + },
|
| + "another-q": {},
|
| + })
|
| +
|
| + // Readd a task with same dedup key. Should be silently ignored.
|
| + err = d.AddTask(ctx, &Task{
|
| + Payload: &duration.Duration{Seconds: 123},
|
| + DeduplicationKey: "abc",
|
| + })
|
| + So(err, ShouldBeNil)
|
| +
|
| + // No new tasks.
|
| + tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
|
| + So(len(tasks["default"]), ShouldResemble, 1)
|
| +
|
| + // Execute the task.
|
| + req := httptest.NewRequest("POST", "http://example.com"+expectedPath, bytes.NewReader(expectedBody))
|
| + rw := httptest.NewRecorder()
|
| + r.ServeHTTP(rw, req)
|
| +
|
| + // Executed.
|
| + So(calls, ShouldResemble, []proto.Message{
|
| + &duration.Duration{Seconds: 123},
|
| + })
|
| + So(rw.Code, ShouldEqual, 200)
|
| + })
|
| +
|
| + Convey("Many tasks", func() {
|
| + handler := func(c context.Context, payload proto.Message, execCount int) error { return nil }
|
| + d.RegisterTask(&duration.Duration{}, handler, "default", nil)
|
| + d.RegisterTask(&empty.Empty{}, handler, "another-q", nil)
|
| + installRoutes()
|
| +
|
| + t := []*Task{}
|
| + for i := 0; i < 200; i++ {
|
| + var task *Task
|
| +
|
| + if i%2 == 0 {
|
| + task = &Task{
|
| + DeduplicationKey: fmt.Sprintf("%d", i/2),
|
| + Payload: &duration.Duration{},
|
| + Delay: time.Duration(i) * time.Second,
|
| + }
|
| + } else {
|
| + task = &Task{
|
| + DeduplicationKey: fmt.Sprintf("%d", (i-1)/2),
|
| + Payload: &empty.Empty{},
|
| + Delay: time.Duration(i) * time.Second,
|
| + }
|
| + }
|
| +
|
| + t = append(t, task)
|
| +
|
| + // Mix in some duplicates.
|
| + if i > 0 && i%100 == 0 {
|
| + t = append(t, task)
|
| + }
|
| + }
|
| + err := d.AddTask(ctx, t...)
|
| + So(err, ShouldBeNil)
|
| +
|
| + // Added all the tasks.
|
| + allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
|
| + delaysDefault := map[time.Duration]struct{}{}
|
| + for _, task := range allTasks["default"] {
|
| + delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
|
| + }
|
| + delaysAnotherQ := map[time.Duration]struct{}{}
|
| + for _, task := range allTasks["another-q"] {
|
| + delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
|
| + }
|
| + So(len(delaysDefault), ShouldEqual, 100)
|
| + So(len(delaysAnotherQ), ShouldEqual, 100)
|
| + })
|
| +
|
| + Convey("Execution errors", func() {
|
| + var returnErr error
|
| + panicNow := false
|
| + handler := func(c context.Context, payload proto.Message, execCount int) error {
|
| + if panicNow {
|
| + panic("must not be called")
|
| + }
|
| + return returnErr
|
| + }
|
| +
|
| + d.RegisterTask(&duration.Duration{}, handler, "", nil)
|
| + installRoutes()
|
| +
|
| + goodBody := `{"type":"google.protobuf.Duration","body":"123.000s"}`
|
| +
|
| + execute := func(body string) *httptest.ResponseRecorder {
|
| + req := httptest.NewRequest(
|
| + "POST",
|
| + "http://example.com/internal/tasks/default/abc-def",
|
| + bytes.NewReader([]byte(body)))
|
| + rw := httptest.NewRecorder()
|
| + r.ServeHTTP(rw, req)
|
| + return rw
|
| + }
|
| +
|
| + // Error conditions inside the task body.
|
| +
|
| + returnErr = nil
|
| + rw := execute(goodBody)
|
| + So(rw.Code, ShouldEqual, 200)
|
| + So(rw.Body.String(), ShouldEqual, "OK\n")
|
| +
|
| + returnErr = fmt.Errorf("fatal err")
|
| + rw = execute(goodBody)
|
| + So(rw.Code, ShouldEqual, 202) // no retry!
|
| + So(rw.Body.String(), ShouldEqual, "Fatal error: fatal err\n")
|
| +
|
| + returnErr = transient.Tag.Apply(fmt.Errorf("transient err"))
|
| + rw = execute(goodBody)
|
| + So(rw.Code, ShouldEqual, 500) // 500 for retry
|
| + So(rw.Body.String(), ShouldEqual, "Transient error: transient err\n")
|
| +
|
| + // Error conditions when routing the task.
|
| +
|
| + panicNow = true
|
| +
|
| + rw = execute("not a json")
|
| + So(rw.Code, ShouldEqual, 202) // no retry!
|
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| +
|
| + rw = execute(`{"type":"google.protobuf.Duration"}`)
|
| + So(rw.Code, ShouldEqual, 202) // no retry!
|
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| +
|
| + rw = execute(`{"type":"google.protobuf.Duration","body":"blah"}`)
|
| + So(rw.Code, ShouldEqual, 202) // no retry!
|
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| +
|
| + rw = execute(`{"type":"unknown.proto.type","body":"{}"}`)
|
| + So(rw.Code, ShouldEqual, 202) // no retry!
|
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| + })
|
| + })
|
| +}
|
|
|