| Index: impl/memory/taskqueue_test.go
|
| diff --git a/impl/memory/taskqueue_test.go b/impl/memory/taskqueue_test.go
|
| index b9b261adae8c70f6e3723f76cf387429bf3c1f04..1b9e354800db8cab52a02727f88fd6f655f32242 100644
|
| --- a/impl/memory/taskqueue_test.go
|
| +++ b/impl/memory/taskqueue_test.go
|
| @@ -11,15 +11,18 @@ import (
|
| "testing"
|
| "time"
|
|
|
| - dsS "github.com/luci/gae/service/datastore"
|
| + ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/gae/service/info"
|
| - tqS "github.com/luci/gae/service/taskqueue"
|
| + tq "github.com/luci/gae/service/taskqueue"
|
| +
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/common/data/rand/mathrand"
|
| +
|
| + "golang.org/x/net/context"
|
| +
|
| . "github.com/luci/luci-go/common/testing/assertions"
|
| . "github.com/smartystreets/goconvey/convey"
|
| - "golang.org/x/net/context"
|
| )
|
|
|
| func TestTaskQueue(t *testing.T) {
|
| @@ -31,43 +34,41 @@ func TestTaskQueue(t *testing.T) {
|
| c = mathrand.Set(c, rand.New(rand.NewSource(clock.Now(c).UnixNano())))
|
| c = Use(c)
|
|
|
| - tq := tqS.Get(c)
|
| - tqt := tq.Testable()
|
| + tqt := tq.GetTestable(c)
|
| So(tqt, ShouldNotBeNil)
|
|
|
| - So(tq, ShouldNotBeNil)
|
| + So(tq.Raw(c), ShouldNotBeNil)
|
|
|
| Convey("implements TQMultiReadWriter", func() {
|
| Convey("Add", func() {
|
| - t := &tqS.Task{Path: "/hello/world"}
|
| + t := &tq.Task{Path: "/hello/world"}
|
|
|
| Convey("works", func() {
|
| t.Delay = 4 * time.Second
|
| t.Header = http.Header{}
|
| t.Header.Add("Cat", "tabby")
|
| t.Payload = []byte("watwatwat")
|
| - t.RetryOptions = &tqS.RetryOptions{AgeLimit: 7 * time.Second}
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + t.RetryOptions = &tq.RetryOptions{AgeLimit: 7 * time.Second}
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
|
|
| name := "Z_UjshxM9ecyMQfGbZmUGOEcgxWU0_5CGLl_-RntudwAw2DqQ5-58bzJiWQN4OKzeuUb9O4JrPkUw2rOvk2Ax46THojnQ6avBQgZdrKcJmrwQ6o4qKfJdiyUbGXvy691yRfzLeQhs6cBhWrgf3wH-VPMcA4SC-zlbJ2U8An7I0zJQA5nBFnMNoMgT-2peGoay3rCSbj4z9VFFm9kS_i6JCaQH518ujLDSNCYdjTq6B6lcWrZAh0U_q3a1S2nXEwrKiw_t9MTNQFgAQZWyGBbvZQPmeRYtu8SPaWzTfd25v_YWgBuVL2rRSPSMvlDwE04nNdtvVzE8vNNiA1zRimmdzKeqATQF9_ReUvj4D7U8dcS703DZWfKMBLgBffY9jqCassOOOw77V72Oq5EVauUw3Qw0L6bBsfM9FtahTKUdabzRZjXUoze3EK4KXPt3-wdidau-8JrVf2XFocjjZbwHoxcGvbtT3b4nGLDlgwdC00bwaFBZWff"
|
| - So(tqt.GetScheduledTasks()["default"][name], ShouldResemble, &tqS.Task{
|
| + So(tqt.GetScheduledTasks()["default"][name], ShouldResemble, &tq.Task{
|
| ETA: now.Add(4 * time.Second),
|
| Header: http.Header{"Cat": []string{"tabby"}},
|
| Method: "POST",
|
| Name: name,
|
| Path: "/hello/world",
|
| Payload: []byte("watwatwat"),
|
| - RetryOptions: &tqS.RetryOptions{AgeLimit: 7 * time.Second},
|
| + RetryOptions: &tq.RetryOptions{AgeLimit: 7 * time.Second},
|
| })
|
| })
|
|
|
| Convey("picks up namespace", func() {
|
| - c, err := info.Get(c).Namespace("coolNamespace")
|
| + c, err := info.Namespace(c, "coolNamespace")
|
| So(err, ShouldBeNil)
|
| - tq = tqS.Get(c)
|
|
|
| - t := &tqS.Task{}
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + t := &tq.Task{}
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
| So(t.Header, ShouldResemble, http.Header{
|
| "X-Appengine-Current-Namespace": {"coolNamespace"},
|
| })
|
| @@ -75,11 +76,11 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("cannot add to bad queues", func() {
|
| - So(tq.Add(nil, "waaat").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
| + So(tq.Add(c, "waaat", nil).Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
|
|
| Convey("but you can add Queues when testing", func() {
|
| tqt.CreateQueue("waaat")
|
| - So(tq.Add(t, "waaat"), ShouldBeNil)
|
| + So(tq.Add(c, "waaat", t), ShouldBeNil)
|
|
|
| Convey("you just can't add them twice", func() {
|
| So(func() { tqt.CreateQueue("waaat") }, ShouldPanic)
|
| @@ -89,26 +90,26 @@ func TestTaskQueue(t *testing.T) {
|
|
|
| Convey("supplies a URL if it's missing", func() {
|
| t.Path = ""
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
| So(t.Path, ShouldEqual, "/_ah/queue/default")
|
| })
|
|
|
| Convey("cannot add twice", func() {
|
| t.Name = "bob"
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
|
|
| // can't add the same one twice!
|
| - So(tq.Add(t, ""), ShouldEqual, tqS.ErrTaskAlreadyAdded)
|
| + So(tq.Add(c, "", t), ShouldEqual, tq.ErrTaskAlreadyAdded)
|
| })
|
|
|
| Convey("cannot add deleted task", func() {
|
| t.Name = "bob"
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
|
|
| - So(tq.Delete(t, ""), ShouldBeNil)
|
| + So(tq.Delete(c, "", t), ShouldBeNil)
|
|
|
| // can't add a deleted task!
|
| - So(tq.Add(t, ""), ShouldEqual, tqS.ErrTaskAlreadyAdded)
|
| + So(tq.Add(c, "", t), ShouldEqual, tq.ErrTaskAlreadyAdded)
|
| })
|
|
|
| Convey("cannot set ETA+Delay", func() {
|
| @@ -116,34 +117,34 @@ func TestTaskQueue(t *testing.T) {
|
| tc.Add(time.Second)
|
| t.Delay = time.Hour
|
| So(func() {
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
| }, ShouldPanic)
|
| })
|
|
|
| Convey("must use a reasonable method", func() {
|
| t.Method = "Crystal"
|
| - So(tq.Add(t, "").Error(), ShouldContainSubstring, "bad method")
|
| + So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "bad method")
|
| })
|
|
|
| Convey("payload gets dumped for non POST/PUT methods", func() {
|
| t.Method = "HEAD"
|
| t.Payload = []byte("coool")
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
| So(t.Payload, ShouldBeNil)
|
| })
|
|
|
| Convey("invalid names are rejected", func() {
|
| t.Name = "happy times"
|
| - So(tq.Add(t, "").Error(), ShouldContainSubstring, "INVALID_TASK_NAME")
|
| + So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "INVALID_TASK_NAME")
|
| })
|
|
|
| Convey("AddMulti also works", func() {
|
| t2 := t.Duplicate()
|
| t2.Path = "/hi/city"
|
|
|
| - expect := []*tqS.Task{t, t2}
|
| + expect := []*tq.Task{t, t2}
|
|
|
| - So(tq.AddMulti(expect, "default"), ShouldBeNil)
|
| + So(tq.Add(c, "default", expect...), ShouldBeNil)
|
| So(len(expect), ShouldEqual, 2)
|
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 2)
|
|
|
| @@ -158,29 +159,29 @@ func TestTaskQueue(t *testing.T) {
|
| Convey("stats work too", func() {
|
| delay := -time.Second * 400
|
|
|
| - t := &tqS.Task{Path: "/somewhere"}
|
| + t := &tq.Task{Path: "/somewhere"}
|
| t.Delay = delay
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
|
|
| - stats, err := tq.Stats("")
|
| + stats, err := tq.Stats(c, "")
|
| So(err, ShouldBeNil)
|
| So(stats[0].Tasks, ShouldEqual, 3)
|
| So(stats[0].OldestETA, ShouldHappenOnOrBefore, clock.Now(c).Add(delay))
|
|
|
| - _, err = tq.Stats("noexist")
|
| + _, err = tq.Stats(c, "noexist")
|
| So(err.Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
| })
|
|
|
| Convey("can purge all tasks", func() {
|
| - So(tq.Add(&tqS.Task{Path: "/wut/nerbs"}, ""), ShouldBeNil)
|
| - So(tq.Purge(""), ShouldBeNil)
|
| + So(tq.Add(c, "", &tq.Task{Path: "/wut/nerbs"}), ShouldBeNil)
|
| + So(tq.Purge(c, ""), ShouldBeNil)
|
|
|
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0)
|
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 0)
|
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0)
|
|
|
| Convey("purging a queue which DNE fails", func() {
|
| - So(tq.Purge("noexist").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
| + So(tq.Purge(c, "noexist").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
| })
|
| })
|
|
|
| @@ -188,11 +189,11 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("Delete", func() {
|
| - t := &tqS.Task{Path: "/hello/world"}
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + t := &tq.Task{Path: "/hello/world"}
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
|
|
| Convey("works", func() {
|
| - err := tq.Delete(t, "")
|
| + err := tq.Delete(c, "", t)
|
| So(err, ShouldBeNil)
|
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0)
|
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 1)
|
| @@ -200,26 +201,26 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("cannot delete a task twice", func() {
|
| - So(tq.Delete(t, ""), ShouldBeNil)
|
| + So(tq.Delete(c, "", t), ShouldBeNil)
|
|
|
| - So(tq.Delete(t, "").Error(), ShouldContainSubstring, "TOMBSTONED_TASK")
|
| + So(tq.Delete(c, "", t).Error(), ShouldContainSubstring, "TOMBSTONED_TASK")
|
|
|
| Convey("but you can if you do a reset", func() {
|
| tqt.ResetTasks()
|
|
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| - So(tq.Delete(t, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
| + So(tq.Delete(c, "", t), ShouldBeNil)
|
| })
|
| })
|
|
|
| Convey("cannot delete from bogus queues", func() {
|
| - err := tq.Delete(t, "wat")
|
| + err := tq.Delete(c, "wat", t)
|
| So(err.Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
| })
|
|
|
| Convey("cannot delete a missing task", func() {
|
| t.Name = "tarntioarenstyw"
|
| - err := tq.Delete(t, "")
|
| + err := tq.Delete(c, "", t)
|
| So(err.Error(), ShouldContainSubstring, "UNKNOWN_TASK")
|
| })
|
|
|
| @@ -227,10 +228,10 @@ func TestTaskQueue(t *testing.T) {
|
| t2 := t.Duplicate()
|
| t2.Name = ""
|
| t2.Path = "/hi/city"
|
| - So(tq.Add(t2, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t2), ShouldBeNil)
|
|
|
| Convey("usually works", func() {
|
| - So(tq.DeleteMulti([]*tqS.Task{t, t2}, ""), ShouldBeNil)
|
| + So(tq.Delete(c, "", t, t2), ShouldBeNil)
|
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0)
|
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 2)
|
| })
|
| @@ -239,17 +240,17 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("works with transactions", func() {
|
| - t := &tqS.Task{Path: "/hello/world"}
|
| - So(tq.Add(t, ""), ShouldBeNil)
|
| + t := &tq.Task{Path: "/hello/world"}
|
| + So(tq.Add(c, "", t), ShouldBeNil)
|
|
|
| - t2 := &tqS.Task{Path: "/hi/city"}
|
| - So(tq.Add(t2, ""), ShouldBeNil)
|
| + t2 := &tq.Task{Path: "/hi/city"}
|
| + So(tq.Add(c, "", t2), ShouldBeNil)
|
|
|
| - So(tq.Delete(t2, ""), ShouldBeNil)
|
| + So(tq.Delete(c, "", t2), ShouldBeNil)
|
|
|
| Convey("can view regular tasks", func() {
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - tqt := tqS.GetRaw(c).Testable()
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + tqt := tq.Raw(c).GetTestable()
|
|
|
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t)
|
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2)
|
| @@ -259,13 +260,12 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("can add a new task", func() {
|
| - t3 := &tqS.Task{Path: "/sandwitch/victory"}
|
| + t3 := &tq.Task{Path: "/sandwitch/victory"}
|
|
|
| - err := dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - tq := tqS.Get(c)
|
| - tqt := tq.Testable()
|
| + err := ds.RunInTransaction(c, func(c context.Context) error {
|
| + tqt := tq.GetTestable(c)
|
|
|
| - So(tq.Add(t3, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t3), ShouldBeNil)
|
| So(t3.Name, ShouldEqual, "")
|
|
|
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t)
|
| @@ -289,15 +289,14 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("can add a new task (but reset the state in a test)", func() {
|
| - t3 := &tqS.Task{Path: "/sandwitch/victory"}
|
| -
|
| - ttq := tqS.Interface(nil)
|
| + t3 := &tq.Task{Path: "/sandwitch/victory"}
|
|
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - ttq = tqS.Get(c)
|
| - tqt := ttq.Testable()
|
| + var txnCtx context.Context
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + txnCtx = c
|
| + tqt := tq.GetTestable(c)
|
|
|
| - So(ttq.Add(t3, ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t3), ShouldBeNil)
|
|
|
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t)
|
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2)
|
| @@ -317,18 +316,17 @@ func TestTaskQueue(t *testing.T) {
|
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0)
|
|
|
| Convey("and reusing a closed context is bad times", func() {
|
| - So(ttq.Add(nil, "").Error(), ShouldContainSubstring, "expired")
|
| + So(tq.Add(txnCtx, "", nil).Error(), ShouldContainSubstring, "expired")
|
| })
|
| })
|
|
|
| Convey("you can AddMulti as well", func() {
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - tq := tqS.Get(c)
|
| - tqt := tq.Testable()
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + tqt := tq.GetTestable(c)
|
|
|
| t.Name = ""
|
| - tasks := []*tqS.Task{t.Duplicate(), t.Duplicate(), t.Duplicate()}
|
| - So(tq.AddMulti(tasks, ""), ShouldBeNil)
|
| + tasks := []*tq.Task{t.Duplicate(), t.Duplicate(), t.Duplicate()}
|
| + So(tq.Add(c, "", tasks...), ShouldBeNil)
|
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 1)
|
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 3)
|
| return nil
|
| @@ -338,22 +336,22 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("unless you add too many things", func() {
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| for i := 0; i < 5; i++ {
|
| - So(tqS.Get(c).Add(t.Duplicate(), ""), ShouldBeNil)
|
| + So(tq.Add(c, "", t.Duplicate()), ShouldBeNil)
|
| }
|
| - So(tqS.Get(c).Add(t, "").Error(), ShouldContainSubstring, "BAD_REQUEST")
|
| + So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "BAD_REQUEST")
|
| return nil
|
| }, nil), ShouldBeNil)
|
| })
|
|
|
| Convey("unless you Add to a bad queue", func() {
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - So(tqS.Get(c).Add(t, "meat").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + So(tq.Add(c, "meat", t).Error(), ShouldContainSubstring, "UNKNOWN_QUEUE")
|
|
|
| Convey("unless you add it!", func() {
|
| - tqS.GetRaw(c).Testable().CreateQueue("meat")
|
| - So(tqS.Get(c).Add(t, "meat"), ShouldBeNil)
|
| + tq.Raw(c).GetTestable().CreateQueue("meat")
|
| + So(tq.Add(c, "meat", t), ShouldBeNil)
|
| })
|
|
|
| return nil
|
| @@ -361,29 +359,30 @@ func TestTaskQueue(t *testing.T) {
|
| })
|
|
|
| Convey("No other features are available, however", func() {
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - So(tqS.Get(c).Delete(t, "").Error(), ShouldContainSubstring, "cannot DeleteMulti from a transaction")
|
| - So(tqS.Get(c).Purge("").Error(), ShouldContainSubstring, "cannot Purge from a transaction")
|
| - _, err := tqS.Get(c).Stats("")
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + So(tq.Delete(c, "", t).Error(), ShouldContainSubstring, "cannot DeleteMulti from a transaction")
|
| + So(tq.Purge(c, "").Error(), ShouldContainSubstring, "cannot Purge from a transaction")
|
| + _, err := tq.Stats(c, "")
|
| So(err.Error(), ShouldContainSubstring, "cannot Stats from a transaction")
|
| return nil
|
| }, nil), ShouldBeNil)
|
| })
|
|
|
| Convey("can get the non-transactional taskqueue context though", func() {
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - So(tqS.GetNoTxn(c).Delete(t, ""), ShouldBeNil)
|
| - So(tqS.GetNoTxn(c).Purge(""), ShouldBeNil)
|
| - _, err := tqS.GetNoTxn(c).Stats("")
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + noTxn := ds.WithoutTransaction(c)
|
| + So(tq.Delete(noTxn, "", t), ShouldBeNil)
|
| + So(tq.Purge(noTxn, ""), ShouldBeNil)
|
| + _, err := tq.Stats(noTxn, "")
|
| So(err, ShouldBeNil)
|
| return nil
|
| }, nil), ShouldBeNil)
|
| })
|
|
|
| Convey("adding a new task only happens if we don't errout", func() {
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - t3 := &tqS.Task{Path: "/sandwitch/victory"}
|
| - So(tqS.Get(c).Add(t3, ""), ShouldBeNil)
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + t3 := &tq.Task{Path: "/sandwitch/victory"}
|
| + So(tq.Add(c, "", t3), ShouldBeNil)
|
| return fmt.Errorf("nooooo")
|
| }, nil), ShouldErrLike, "nooooo")
|
|
|
| @@ -395,10 +394,8 @@ func TestTaskQueue(t *testing.T) {
|
| Convey("likewise, a panic doesn't schedule anything", func() {
|
| func() {
|
| defer func() { _ = recover() }()
|
| - So(dsS.Get(c).RunInTransaction(func(c context.Context) error {
|
| - tq := tqS.Get(c)
|
| -
|
| - So(tq.Add(&tqS.Task{Path: "/sandwitch/victory"}, ""), ShouldBeNil)
|
| + So(ds.RunInTransaction(c, func(c context.Context) error {
|
| + So(tq.Add(c, "", &tq.Task{Path: "/sandwitch/victory"}), ShouldBeNil)
|
|
|
| panic(fmt.Errorf("nooooo"))
|
| }, nil), ShouldBeNil)
|
|
|