Index: scheduler/appengine/engine/dsset/dsset_test.go |
diff --git a/scheduler/appengine/engine/dsset/dsset_test.go b/scheduler/appengine/engine/dsset/dsset_test.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..95381a538e6f647ce2b5f966dfc9641d04127130 |
--- /dev/null |
+++ b/scheduler/appengine/engine/dsset/dsset_test.go |
@@ -0,0 +1,279 @@ |
+// Copyright 2017 The LUCI Authors. |
+// |
+// Licensed under the Apache License, Version 2.0 (the "License"); |
+// you may not use this file except in compliance with the License. |
+// You may obtain a copy of the License at |
+// |
+// http://www.apache.org/licenses/LICENSE-2.0 |
+// |
+// Unless required by applicable law or agreed to in writing, software |
+// distributed under the License is distributed on an "AS IS" BASIS, |
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+// See the License for the specific language governing permissions and |
+// limitations under the License. |
+ |
+package dsset |
+ |
+import ( |
+ "fmt" |
+ "math/rand" |
+ "sync" |
+ "testing" |
+ "time" |
+ |
+ "golang.org/x/net/context" |
+ |
+ "github.com/luci/gae/impl/memory" |
+ "github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/clock/testclock" |
+ "github.com/luci/luci-go/common/data/rand/mathrand" |
+ "github.com/luci/luci-go/common/data/stringset" |
+ |
+ . "github.com/smartystreets/goconvey/convey" |
+) |
+ |
+func testingContext() context.Context { |
+ c := memory.Use(context.Background()) |
+ c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC())) |
+ c = mathrand.Set(c, rand.New(rand.NewSource(1000))) |
+ return c |
+} |
+ |
+// pop pops a bunch of items from the set and returns items that were popped. |
+func pop(c context.Context, s *Set, listing *Listing, ids []string) (popped []string, tombs []*Tombstone, err error) { |
+ op, err := s.BeginPop(c, listing) |
+ if err != nil { |
+ return nil, nil, err |
+ } |
+ for _, id := range ids { |
+ if op.Pop(id) { |
+ popped = append(popped, id) |
+ } |
+ } |
+ if tombs, err = FinishPop(c, op); err != nil { |
+ return nil, nil, err |
+ } |
+ return popped, tombs, nil |
+} |
+ |
+func TestSet(t *testing.T) { |
+ t.Parallel() |
+ |
+ Convey("item one lifecycle", t, func() { |
+ c := testingContext() |
+ |
+ set := Set{ |
+ ID: "test", |
+ ShardCount: 3, |
+ TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), |
+ TombstonesDelay: time.Minute, |
+ } |
+ |
+ // Add one item. |
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
+ |
+ // The item is returned by the listing. |
+ listing, err := set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldResemble, []Item{{ID: "abc"}}) |
+ So(listing.Tombstones, ShouldBeNil) |
+ |
+ // Pop it! |
+ var cleanup []*Tombstone |
+ err = datastore.RunInTransaction(c, func(c context.Context) error { |
+ popped, tombs, err := pop(c, &set, listing, []string{"abc"}) |
+ So(err, ShouldBeNil) |
+ So(popped, ShouldResemble, []string{"abc"}) |
+ So(len(tombs), ShouldEqual, 1) |
+ So(tombs[0].id, ShouldEqual, "abc") |
+ So(len(tombs[0].storage), ShouldEqual, 1) |
+ cleanup = tombs |
+ return nil |
+ }, nil) |
+ So(err, ShouldBeNil) |
+ |
+ // The listing no longer returns it, but we have a fresh tombstone that can |
+ // be cleaned up. |
+ listing, err = set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldBeNil) |
+ So(len(listing.Tombstones), ShouldEqual, 1) |
+ So(listing.Tombstones[0].id, ShouldEqual, "abc") |
+ |
+ // Cleaning up the storage using tombstones from Pop works. |
+ So(CleanupStorage(c, cleanup), ShouldBeNil) |
+ |
+ // The listing no longer returns the item, and there's no tombstones to |
+ // cleanup. |
+ listing, err = set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldBeNil) |
+ So(listing.Tombstones, ShouldBeNil) |
+ |
+ // Attempt to add it back (should be ignored). Add a bunch of times to make |
+ // sure to fill in many shards (this is pseudo-random). |
+ for i := 0; i < 5; i++ { |
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
+ } |
+ |
+ // The listing still doesn't returns it, but we now have a tombstone to |
+ // cleanup (again). |
+ listing, err = set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldBeNil) |
+ So(len(listing.Tombstones), ShouldEqual, 1) |
+ So(listing.Tombstones[0].old, ShouldBeFalse) |
+ So(len(listing.Tombstones[0].storage), ShouldEqual, 3) // all shards |
+ |
+ // Popping it again doesn't work either. |
+ err = datastore.RunInTransaction(c, func(c context.Context) error { |
+ popped, tombs, err := pop(c, &set, listing, []string{"abc"}) |
+ So(err, ShouldBeNil) |
+ So(popped, ShouldBeNil) |
+ So(tombs, ShouldBeNil) |
+ return nil |
+ }, nil) |
+ So(err, ShouldBeNil) |
+ |
+ // Cleaning up the storage, again. This should make List stop returning |
+ // the tombstone (since it has no storage items associated with it and it's |
+ // not ready to be evicted yet). |
+ So(CleanupStorage(c, listing.Tombstones), ShouldBeNil) |
+ listing, err = set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldBeNil) |
+ So(listing.Tombstones, ShouldBeNil) |
+ |
+ // Time passes, tombstone expires. |
+ clock.Get(c).(testclock.TestClock).Add(2 * time.Minute) |
+ |
+ // Listing now returns expired tombstone. |
+ listing, err = set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldBeNil) |
+ So(len(listing.Tombstones), ShouldEqual, 1) |
+ So(len(listing.Tombstones[0].storage), ShouldEqual, 0) // cleaned already |
+ |
+ // Cleanup storage keys. |
+ So(CleanupStorage(c, listing.Tombstones), ShouldBeNil) |
+ |
+ // Cleanup the tombstones themselves. |
+ err = datastore.RunInTransaction(c, func(c context.Context) error { |
+ popped, tombs, err := pop(c, &set, listing, nil) |
+ So(err, ShouldBeNil) |
+ So(popped, ShouldBeNil) |
+ So(tombs, ShouldBeNil) |
+ return nil |
+ }, nil) |
+ So(err, ShouldBeNil) |
+ |
+ // No tombstones returned any longer. |
+ listing, err = set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldBeNil) |
+ So(listing.Tombstones, ShouldBeNil) |
+ |
+ // And the item can be added back now, since no trace of it is left. |
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
+ |
+ // Yep, it is there. |
+ listing, err = set.List(c) |
+ So(err, ShouldBeNil) |
+ So(listing.Items, ShouldResemble, []Item{{ID: "abc"}}) |
+ So(listing.Tombstones, ShouldBeNil) |
+ }) |
+ |
+ Convey("stress", t, func() { |
+ // Add 1000 items in parallel from N goroutines, and (also in parallel), |
+ // run N instances of "List and pop all", collecting the result in single |
+ // list. There should be no duplicates in the final list! |
+ c := testingContext() |
+ |
+ set := Set{ |
+ ID: "test", |
+ ShardCount: 3, |
+ TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), |
+ TombstonesDelay: time.Minute, |
+ } |
+ |
+ producers := 3 |
+ consumers := 5 |
+ items := 100 |
+ |
+ wakeups := make(chan string) |
+ |
+ lock := sync.Mutex{} |
+ consumed := []string{} |
+ |
+ for i := 0; i < producers; i++ { |
+ go func() { |
+ for j := 0; j < items; j++ { |
+ set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}}) |
+ // Wake up 3 consumers, so they "fight". |
+ wakeups <- "wake" |
+ wakeups <- "wake" |
+ wakeups <- "wake" |
+ } |
+ for i := 0; i < consumers; i++ { |
+ wakeups <- "done" |
+ } |
+ }() |
+ } |
+ |
+ consume := func() { |
+ listing, err := set.List(c) |
+ if err != nil || len(listing.Items) == 0 { |
+ return |
+ } |
+ |
+ keys := []string{} |
+ for _, itm := range listing.Items { |
+ keys = append(keys, itm.ID) |
+ } |
+ |
+ // Try to pop all. |
+ var popped []string |
+ var tombs []*Tombstone |
+ err = datastore.RunInTransaction(c, func(c context.Context) error { |
+ var err error |
+ popped, tombs, err = pop(c, &set, listing, keys) |
+ return err |
+ }, nil) |
+ // Best-effort storage cleanup on success. |
+ if err == nil { |
+ CleanupStorage(c, tombs) |
+ } |
+ |
+ // Consider items consumed only if transaction has landed. |
+ if err == nil && len(popped) != 0 { |
+ lock.Lock() |
+ consumed = append(consumed, popped...) |
+ lock.Unlock() |
+ } |
+ } |
+ |
+ wg := sync.WaitGroup{} |
+ wg.Add(consumers) |
+ for i := 0; i < consumers; i++ { |
+ go func() { |
+ defer wg.Done() |
+ done := false |
+ for !done { |
+ done = (<-wakeups) == "done" |
+ consume() |
+ } |
+ }() |
+ } |
+ |
+ wg.Wait() // this waits for completion of the entire pipeline |
+ |
+ // Make sure 'consumed' is the initially produced set. |
+ dedup := stringset.New(len(consumed)) |
+ for _, itm := range consumed { |
+ dedup.Add(itm) |
+ } |
+ So(dedup.Len(), ShouldEqual, len(consumed)) // no dups |
+ So(len(consumed), ShouldEqual, items) // all are accounted for |
+ }) |
+} |