Index: impl/memory/taskqueue_test.go |
diff --git a/impl/memory/taskqueue_test.go b/impl/memory/taskqueue_test.go |
index b9b261adae8c70f6e3723f76cf387429bf3c1f04..1b9e354800db8cab52a02727f88fd6f655f32242 100644 |
--- a/impl/memory/taskqueue_test.go |
+++ b/impl/memory/taskqueue_test.go |
@@ -11,15 +11,18 @@ import ( |
"testing" |
"time" |
- dsS "github.com/luci/gae/service/datastore" |
+ ds "github.com/luci/gae/service/datastore" |
"github.com/luci/gae/service/info" |
- tqS "github.com/luci/gae/service/taskqueue" |
+ tq "github.com/luci/gae/service/taskqueue" |
+ |
"github.com/luci/luci-go/common/clock" |
"github.com/luci/luci-go/common/clock/testclock" |
"github.com/luci/luci-go/common/data/rand/mathrand" |
+ |
+ "golang.org/x/net/context" |
+ |
. "github.com/luci/luci-go/common/testing/assertions" |
. "github.com/smartystreets/goconvey/convey" |
- "golang.org/x/net/context" |
) |
func TestTaskQueue(t *testing.T) { |
@@ -31,43 +34,41 @@ func TestTaskQueue(t *testing.T) { |
c = mathrand.Set(c, rand.New(rand.NewSource(clock.Now(c).UnixNano()))) |
c = Use(c) |
- tq := tqS.Get(c) |
- tqt := tq.Testable() |
+ tqt := tq.GetTestable(c) |
So(tqt, ShouldNotBeNil) |
- So(tq, ShouldNotBeNil) |
+ So(tq.Raw(c), ShouldNotBeNil) |
Convey("implements TQMultiReadWriter", func() { |
Convey("Add", func() { |
- t := &tqS.Task{Path: "/hello/world"} |
+ t := &tq.Task{Path: "/hello/world"} |
Convey("works", func() { |
t.Delay = 4 * time.Second |
t.Header = http.Header{} |
t.Header.Add("Cat", "tabby") |
t.Payload = []byte("watwatwat") |
- t.RetryOptions = &tqS.RetryOptions{AgeLimit: 7 * time.Second} |
- So(tq.Add(t, ""), ShouldBeNil) |
+ t.RetryOptions = &tq.RetryOptions{AgeLimit: 7 * time.Second} |
+ So(tq.Add(c, "", t), ShouldBeNil) |
name := "Z_UjshxM9ecyMQfGbZmUGOEcgxWU0_5CGLl_-RntudwAw2DqQ5-58bzJiWQN4OKzeuUb9O4JrPkUw2rOvk2Ax46THojnQ6avBQgZdrKcJmrwQ6o4qKfJdiyUbGXvy691yRfzLeQhs6cBhWrgf3wH-VPMcA4SC-zlbJ2U8An7I0zJQA5nBFnMNoMgT-2peGoay3rCSbj4z9VFFm9kS_i6JCaQH518ujLDSNCYdjTq6B6lcWrZAh0U_q3a1S2nXEwrKiw_t9MTNQFgAQZWyGBbvZQPmeRYtu8SPaWzTfd25v_YWgBuVL2rRSPSMvlDwE04nNdtvVzE8vNNiA1zRimmdzKeqATQF9_ReUvj4D7U8dcS703DZWfKMBLgBffY9jqCassOOOw77V72Oq5EVauUw3Qw0L6bBsfM9FtahTKUdabzRZjXUoze3EK4KXPt3-wdidau-8JrVf2XFocjjZbwHoxcGvbtT3b4nGLDlgwdC00bwaFBZWff" |
- So(tqt.GetScheduledTasks()["default"][name], ShouldResemble, &tqS.Task{ |
+ So(tqt.GetScheduledTasks()["default"][name], ShouldResemble, &tq.Task{ |
ETA: now.Add(4 * time.Second), |
Header: http.Header{"Cat": []string{"tabby"}}, |
Method: "POST", |
Name: name, |
Path: "/hello/world", |
Payload: []byte("watwatwat"), |
- RetryOptions: &tqS.RetryOptions{AgeLimit: 7 * time.Second}, |
+ RetryOptions: &tq.RetryOptions{AgeLimit: 7 * time.Second}, |
}) |
}) |
Convey("picks up namespace", func() { |
- c, err := info.Get(c).Namespace("coolNamespace") |
+ c, err := info.Namespace(c, "coolNamespace") |
So(err, ShouldBeNil) |
- tq = tqS.Get(c) |
- t := &tqS.Task{} |
- So(tq.Add(t, ""), ShouldBeNil) |
+ t := &tq.Task{} |
+ So(tq.Add(c, "", t), ShouldBeNil) |
So(t.Header, ShouldResemble, http.Header{ |
"X-Appengine-Current-Namespace": {"coolNamespace"}, |
}) |
@@ -75,11 +76,11 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("cannot add to bad queues", func() { |
- So(tq.Add(nil, "waaat").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
+ So(tq.Add(c, "waaat", nil).Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
Convey("but you can add Queues when testing", func() { |
tqt.CreateQueue("waaat") |
- So(tq.Add(t, "waaat"), ShouldBeNil) |
+ So(tq.Add(c, "waaat", t), ShouldBeNil) |
Convey("you just can't add them twice", func() { |
So(func() { tqt.CreateQueue("waaat") }, ShouldPanic) |
@@ -89,26 +90,26 @@ func TestTaskQueue(t *testing.T) { |
Convey("supplies a URL if it's missing", func() { |
t.Path = "" |
- So(tq.Add(t, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t), ShouldBeNil) |
So(t.Path, ShouldEqual, "/_ah/queue/default") |
}) |
Convey("cannot add twice", func() { |
t.Name = "bob" |
- So(tq.Add(t, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t), ShouldBeNil) |
// can't add the same one twice! |
- So(tq.Add(t, ""), ShouldEqual, tqS.ErrTaskAlreadyAdded) |
+ So(tq.Add(c, "", t), ShouldEqual, tq.ErrTaskAlreadyAdded) |
}) |
Convey("cannot add deleted task", func() { |
t.Name = "bob" |
- So(tq.Add(t, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t), ShouldBeNil) |
- So(tq.Delete(t, ""), ShouldBeNil) |
+ So(tq.Delete(c, "", t), ShouldBeNil) |
// can't add a deleted task! |
- So(tq.Add(t, ""), ShouldEqual, tqS.ErrTaskAlreadyAdded) |
+ So(tq.Add(c, "", t), ShouldEqual, tq.ErrTaskAlreadyAdded) |
}) |
Convey("cannot set ETA+Delay", func() { |
@@ -116,34 +117,34 @@ func TestTaskQueue(t *testing.T) { |
tc.Add(time.Second) |
t.Delay = time.Hour |
So(func() { |
- So(tq.Add(t, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t), ShouldBeNil) |
}, ShouldPanic) |
}) |
Convey("must use a reasonable method", func() { |
t.Method = "Crystal" |
- So(tq.Add(t, "").Error(), ShouldContainSubstring, "bad method") |
+ So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "bad method") |
}) |
Convey("payload gets dumped for non POST/PUT methods", func() { |
t.Method = "HEAD" |
t.Payload = []byte("coool") |
- So(tq.Add(t, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t), ShouldBeNil) |
So(t.Payload, ShouldBeNil) |
}) |
Convey("invalid names are rejected", func() { |
t.Name = "happy times" |
- So(tq.Add(t, "").Error(), ShouldContainSubstring, "INVALID_TASK_NAME") |
+ So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "INVALID_TASK_NAME") |
}) |
Convey("AddMulti also works", func() { |
t2 := t.Duplicate() |
t2.Path = "/hi/city" |
- expect := []*tqS.Task{t, t2} |
+ expect := []*tq.Task{t, t2} |
- So(tq.AddMulti(expect, "default"), ShouldBeNil) |
+ So(tq.Add(c, "default", expect...), ShouldBeNil) |
So(len(expect), ShouldEqual, 2) |
So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 2) |
@@ -158,29 +159,29 @@ func TestTaskQueue(t *testing.T) { |
Convey("stats work too", func() { |
delay := -time.Second * 400 |
- t := &tqS.Task{Path: "/somewhere"} |
+ t := &tq.Task{Path: "/somewhere"} |
t.Delay = delay |
- So(tq.Add(t, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t), ShouldBeNil) |
- stats, err := tq.Stats("") |
+ stats, err := tq.Stats(c, "") |
So(err, ShouldBeNil) |
So(stats[0].Tasks, ShouldEqual, 3) |
So(stats[0].OldestETA, ShouldHappenOnOrBefore, clock.Now(c).Add(delay)) |
- _, err = tq.Stats("noexist") |
+ _, err = tq.Stats(c, "noexist") |
So(err.Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
}) |
Convey("can purge all tasks", func() { |
- So(tq.Add(&tqS.Task{Path: "/wut/nerbs"}, ""), ShouldBeNil) |
- So(tq.Purge(""), ShouldBeNil) |
+ So(tq.Add(c, "", &tq.Task{Path: "/wut/nerbs"}), ShouldBeNil) |
+ So(tq.Purge(c, ""), ShouldBeNil) |
So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 0) |
So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0) |
Convey("purging a queue which DNE fails", func() { |
- So(tq.Purge("noexist").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
+ So(tq.Purge(c, "noexist").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
}) |
}) |
@@ -188,11 +189,11 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("Delete", func() { |
- t := &tqS.Task{Path: "/hello/world"} |
- So(tq.Add(t, ""), ShouldBeNil) |
+ t := &tq.Task{Path: "/hello/world"} |
+ So(tq.Add(c, "", t), ShouldBeNil) |
Convey("works", func() { |
- err := tq.Delete(t, "") |
+ err := tq.Delete(c, "", t) |
So(err, ShouldBeNil) |
So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 1) |
@@ -200,26 +201,26 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("cannot delete a task twice", func() { |
- So(tq.Delete(t, ""), ShouldBeNil) |
+ So(tq.Delete(c, "", t), ShouldBeNil) |
- So(tq.Delete(t, "").Error(), ShouldContainSubstring, "TOMBSTONED_TASK") |
+ So(tq.Delete(c, "", t).Error(), ShouldContainSubstring, "TOMBSTONED_TASK") |
Convey("but you can if you do a reset", func() { |
tqt.ResetTasks() |
- So(tq.Add(t, ""), ShouldBeNil) |
- So(tq.Delete(t, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t), ShouldBeNil) |
+ So(tq.Delete(c, "", t), ShouldBeNil) |
}) |
}) |
Convey("cannot delete from bogus queues", func() { |
- err := tq.Delete(t, "wat") |
+ err := tq.Delete(c, "wat", t) |
So(err.Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
}) |
Convey("cannot delete a missing task", func() { |
t.Name = "tarntioarenstyw" |
- err := tq.Delete(t, "") |
+ err := tq.Delete(c, "", t) |
So(err.Error(), ShouldContainSubstring, "UNKNOWN_TASK") |
}) |
@@ -227,10 +228,10 @@ func TestTaskQueue(t *testing.T) { |
t2 := t.Duplicate() |
t2.Name = "" |
t2.Path = "/hi/city" |
- So(tq.Add(t2, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t2), ShouldBeNil) |
Convey("usually works", func() { |
- So(tq.DeleteMulti([]*tqS.Task{t, t2}, ""), ShouldBeNil) |
+ So(tq.Delete(c, "", t, t2), ShouldBeNil) |
So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 2) |
}) |
@@ -239,17 +240,17 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("works with transactions", func() { |
- t := &tqS.Task{Path: "/hello/world"} |
- So(tq.Add(t, ""), ShouldBeNil) |
+ t := &tq.Task{Path: "/hello/world"} |
+ So(tq.Add(c, "", t), ShouldBeNil) |
- t2 := &tqS.Task{Path: "/hi/city"} |
- So(tq.Add(t2, ""), ShouldBeNil) |
+ t2 := &tq.Task{Path: "/hi/city"} |
+ So(tq.Add(c, "", t2), ShouldBeNil) |
- So(tq.Delete(t2, ""), ShouldBeNil) |
+ So(tq.Delete(c, "", t2), ShouldBeNil) |
Convey("can view regular tasks", func() { |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- tqt := tqS.GetRaw(c).Testable() |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ tqt := tq.Raw(c).GetTestable() |
So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
@@ -259,13 +260,12 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("can add a new task", func() { |
- t3 := &tqS.Task{Path: "/sandwitch/victory"} |
+ t3 := &tq.Task{Path: "/sandwitch/victory"} |
- err := dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- tq := tqS.Get(c) |
- tqt := tq.Testable() |
+ err := ds.RunInTransaction(c, func(c context.Context) error { |
+ tqt := tq.GetTestable(c) |
- So(tq.Add(t3, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t3), ShouldBeNil) |
So(t3.Name, ShouldEqual, "") |
So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
@@ -289,15 +289,14 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("can add a new task (but reset the state in a test)", func() { |
- t3 := &tqS.Task{Path: "/sandwitch/victory"} |
- |
- ttq := tqS.Interface(nil) |
+ t3 := &tq.Task{Path: "/sandwitch/victory"} |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- ttq = tqS.Get(c) |
- tqt := ttq.Testable() |
+ var txnCtx context.Context |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ txnCtx = c |
+ tqt := tq.GetTestable(c) |
- So(ttq.Add(t3, ""), ShouldBeNil) |
+ So(tq.Add(c, "", t3), ShouldBeNil) |
So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
@@ -317,18 +316,17 @@ func TestTaskQueue(t *testing.T) { |
So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0) |
Convey("and reusing a closed context is bad times", func() { |
- So(ttq.Add(nil, "").Error(), ShouldContainSubstring, "expired") |
+ So(tq.Add(txnCtx, "", nil).Error(), ShouldContainSubstring, "expired") |
}) |
}) |
Convey("you can AddMulti as well", func() { |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- tq := tqS.Get(c) |
- tqt := tq.Testable() |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ tqt := tq.GetTestable(c) |
t.Name = "" |
- tasks := []*tqS.Task{t.Duplicate(), t.Duplicate(), t.Duplicate()} |
- So(tq.AddMulti(tasks, ""), ShouldBeNil) |
+ tasks := []*tq.Task{t.Duplicate(), t.Duplicate(), t.Duplicate()} |
+ So(tq.Add(c, "", tasks...), ShouldBeNil) |
So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 1) |
So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 3) |
return nil |
@@ -338,22 +336,22 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("unless you add too many things", func() { |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
for i := 0; i < 5; i++ { |
- So(tqS.Get(c).Add(t.Duplicate(), ""), ShouldBeNil) |
+ So(tq.Add(c, "", t.Duplicate()), ShouldBeNil) |
} |
- So(tqS.Get(c).Add(t, "").Error(), ShouldContainSubstring, "BAD_REQUEST") |
+ So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "BAD_REQUEST") |
return nil |
}, nil), ShouldBeNil) |
}) |
Convey("unless you Add to a bad queue", func() { |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- So(tqS.Get(c).Add(t, "meat").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ So(tq.Add(c, "meat", t).Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
Convey("unless you add it!", func() { |
- tqS.GetRaw(c).Testable().CreateQueue("meat") |
- So(tqS.Get(c).Add(t, "meat"), ShouldBeNil) |
+ tq.Raw(c).GetTestable().CreateQueue("meat") |
+ So(tq.Add(c, "meat", t), ShouldBeNil) |
}) |
return nil |
@@ -361,29 +359,30 @@ func TestTaskQueue(t *testing.T) { |
}) |
Convey("No other features are available, however", func() { |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- So(tqS.Get(c).Delete(t, "").Error(), ShouldContainSubstring, "cannot DeleteMulti from a transaction") |
- So(tqS.Get(c).Purge("").Error(), ShouldContainSubstring, "cannot Purge from a transaction") |
- _, err := tqS.Get(c).Stats("") |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ So(tq.Delete(c, "", t).Error(), ShouldContainSubstring, "cannot DeleteMulti from a transaction") |
+ So(tq.Purge(c, "").Error(), ShouldContainSubstring, "cannot Purge from a transaction") |
+ _, err := tq.Stats(c, "") |
So(err.Error(), ShouldContainSubstring, "cannot Stats from a transaction") |
return nil |
}, nil), ShouldBeNil) |
}) |
Convey("can get the non-transactional taskqueue context though", func() { |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- So(tqS.GetNoTxn(c).Delete(t, ""), ShouldBeNil) |
- So(tqS.GetNoTxn(c).Purge(""), ShouldBeNil) |
- _, err := tqS.GetNoTxn(c).Stats("") |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ noTxn := ds.WithoutTransaction(c) |
+ So(tq.Delete(noTxn, "", t), ShouldBeNil) |
+ So(tq.Purge(noTxn, ""), ShouldBeNil) |
+ _, err := tq.Stats(noTxn, "") |
So(err, ShouldBeNil) |
return nil |
}, nil), ShouldBeNil) |
}) |
Convey("adding a new task only happens if we don't errout", func() { |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- t3 := &tqS.Task{Path: "/sandwitch/victory"} |
- So(tqS.Get(c).Add(t3, ""), ShouldBeNil) |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ t3 := &tq.Task{Path: "/sandwitch/victory"} |
+ So(tq.Add(c, "", t3), ShouldBeNil) |
return fmt.Errorf("nooooo") |
}, nil), ShouldErrLike, "nooooo") |
@@ -395,10 +394,8 @@ func TestTaskQueue(t *testing.T) { |
Convey("likewise, a panic doesn't schedule anything", func() { |
func() { |
defer func() { _ = recover() }() |
- So(dsS.Get(c).RunInTransaction(func(c context.Context) error { |
- tq := tqS.Get(c) |
- |
- So(tq.Add(&tqS.Task{Path: "/sandwitch/victory"}, ""), ShouldBeNil) |
+ So(ds.RunInTransaction(c, func(c context.Context) error { |
+ So(tq.Add(c, "", &tq.Task{Path: "/sandwitch/victory"}), ShouldBeNil) |
panic(fmt.Errorf("nooooo")) |
}, nil), ShouldBeNil) |