Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Unified Diff: scheduler/appengine/engine/dsset/dsset_test.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: rebase Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « scheduler/appengine/engine/dsset/dsset.go ('k') | scheduler/appengine/engine/internal/tq_tasks.proto » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/dsset/dsset_test.go
diff --git a/scheduler/appengine/engine/dsset/dsset_test.go b/scheduler/appengine/engine/dsset/dsset_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..95381a538e6f647ce2b5f966dfc9641d04127130
--- /dev/null
+++ b/scheduler/appengine/engine/dsset/dsset_test.go
@@ -0,0 +1,279 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dsset
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/impl/memory"
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/data/rand/mathrand"
+ "github.com/luci/luci-go/common/data/stringset"
+
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+func testingContext() context.Context {
+ c := memory.Use(context.Background())
+ c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC()))
+ c = mathrand.Set(c, rand.New(rand.NewSource(1000)))
+ return c
+}
+
+// pop pops a bunch of items from the set and returns items that were popped.
+func pop(c context.Context, s *Set, listing *Listing, ids []string) (popped []string, tombs []*Tombstone, err error) {
+ op, err := s.BeginPop(c, listing)
+ if err != nil {
+ return nil, nil, err
+ }
+ for _, id := range ids {
+ if op.Pop(id) {
+ popped = append(popped, id)
+ }
+ }
+ if tombs, err = FinishPop(c, op); err != nil {
+ return nil, nil, err
+ }
+ return popped, tombs, nil
+}
+
+func TestSet(t *testing.T) {
+ t.Parallel()
+
+ Convey("item one lifecycle", t, func() {
+ c := testingContext()
+
+ set := Set{
+ ID: "test",
+ ShardCount: 3,
+ TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
+ TombstonesDelay: time.Minute,
+ }
+
+ // Add one item.
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
+
+ // The item is returned by the listing.
+ listing, err := set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
+ So(listing.Tombstones, ShouldBeNil)
+
+ // Pop it!
+ var cleanup []*Tombstone
+ err = datastore.RunInTransaction(c, func(c context.Context) error {
+ popped, tombs, err := pop(c, &set, listing, []string{"abc"})
+ So(err, ShouldBeNil)
+ So(popped, ShouldResemble, []string{"abc"})
+ So(len(tombs), ShouldEqual, 1)
+ So(tombs[0].id, ShouldEqual, "abc")
+ So(len(tombs[0].storage), ShouldEqual, 1)
+ cleanup = tombs
+ return nil
+ }, nil)
+ So(err, ShouldBeNil)
+
+ // The listing no longer returns it, but we have a fresh tombstone that can
+ // be cleaned up.
+ listing, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldBeNil)
+ So(len(listing.Tombstones), ShouldEqual, 1)
+ So(listing.Tombstones[0].id, ShouldEqual, "abc")
+
+ // Cleaning up the storage using tombstones from Pop works.
+ So(CleanupStorage(c, cleanup), ShouldBeNil)
+
+ // The listing no longer returns the item, and there's no tombstones to
+ // cleanup.
+ listing, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldBeNil)
+ So(listing.Tombstones, ShouldBeNil)
+
+ // Attempt to add it back (should be ignored). Add a bunch of times to make
+ // sure to fill in many shards (this is pseudo-random).
+ for i := 0; i < 5; i++ {
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
+ }
+
+ // The listing still doesn't returns it, but we now have a tombstone to
+ // cleanup (again).
+ listing, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldBeNil)
+ So(len(listing.Tombstones), ShouldEqual, 1)
+ So(listing.Tombstones[0].old, ShouldBeFalse)
+ So(len(listing.Tombstones[0].storage), ShouldEqual, 3) // all shards
+
+ // Popping it again doesn't work either.
+ err = datastore.RunInTransaction(c, func(c context.Context) error {
+ popped, tombs, err := pop(c, &set, listing, []string{"abc"})
+ So(err, ShouldBeNil)
+ So(popped, ShouldBeNil)
+ So(tombs, ShouldBeNil)
+ return nil
+ }, nil)
+ So(err, ShouldBeNil)
+
+ // Cleaning up the storage, again. This should make List stop returning
+ // the tombstone (since it has no storage items associated with it and it's
+ // not ready to be evicted yet).
+ So(CleanupStorage(c, listing.Tombstones), ShouldBeNil)
+ listing, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldBeNil)
+ So(listing.Tombstones, ShouldBeNil)
+
+ // Time passes, tombstone expires.
+ clock.Get(c).(testclock.TestClock).Add(2 * time.Minute)
+
+ // Listing now returns expired tombstone.
+ listing, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldBeNil)
+ So(len(listing.Tombstones), ShouldEqual, 1)
+ So(len(listing.Tombstones[0].storage), ShouldEqual, 0) // cleaned already
+
+ // Cleanup storage keys.
+ So(CleanupStorage(c, listing.Tombstones), ShouldBeNil)
+
+ // Cleanup the tombstones themselves.
+ err = datastore.RunInTransaction(c, func(c context.Context) error {
+ popped, tombs, err := pop(c, &set, listing, nil)
+ So(err, ShouldBeNil)
+ So(popped, ShouldBeNil)
+ So(tombs, ShouldBeNil)
+ return nil
+ }, nil)
+ So(err, ShouldBeNil)
+
+ // No tombstones returned any longer.
+ listing, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldBeNil)
+ So(listing.Tombstones, ShouldBeNil)
+
+ // And the item can be added back now, since no trace of it is left.
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
+
+ // Yep, it is there.
+ listing, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
+ So(listing.Tombstones, ShouldBeNil)
+ })
+
+ Convey("stress", t, func() {
+ // Add 1000 items in parallel from N goroutines, and (also in parallel),
+ // run N instances of "List and pop all", collecting the result in single
+ // list. There should be no duplicates in the final list!
+ c := testingContext()
+
+ set := Set{
+ ID: "test",
+ ShardCount: 3,
+ TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
+ TombstonesDelay: time.Minute,
+ }
+
+ producers := 3
+ consumers := 5
+ items := 100
+
+ wakeups := make(chan string)
+
+ lock := sync.Mutex{}
+ consumed := []string{}
+
+ for i := 0; i < producers; i++ {
+ go func() {
+ for j := 0; j < items; j++ {
+ set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}})
+ // Wake up 3 consumers, so they "fight".
+ wakeups <- "wake"
+ wakeups <- "wake"
+ wakeups <- "wake"
+ }
+ for i := 0; i < consumers; i++ {
+ wakeups <- "done"
+ }
+ }()
+ }
+
+ consume := func() {
+ listing, err := set.List(c)
+ if err != nil || len(listing.Items) == 0 {
+ return
+ }
+
+ keys := []string{}
+ for _, itm := range listing.Items {
+ keys = append(keys, itm.ID)
+ }
+
+ // Try to pop all.
+ var popped []string
+ var tombs []*Tombstone
+ err = datastore.RunInTransaction(c, func(c context.Context) error {
+ var err error
+ popped, tombs, err = pop(c, &set, listing, keys)
+ return err
+ }, nil)
+ // Best-effort storage cleanup on success.
+ if err == nil {
+ CleanupStorage(c, tombs)
+ }
+
+ // Consider items consumed only if transaction has landed.
+ if err == nil && len(popped) != 0 {
+ lock.Lock()
+ consumed = append(consumed, popped...)
+ lock.Unlock()
+ }
+ }
+
+ wg := sync.WaitGroup{}
+ wg.Add(consumers)
+ for i := 0; i < consumers; i++ {
+ go func() {
+ defer wg.Done()
+ done := false
+ for !done {
+ done = (<-wakeups) == "done"
+ consume()
+ }
+ }()
+ }
+
+ wg.Wait() // this waits for completion of the entire pipeline
+
+ // Make sure 'consumed' is the initially produced set.
+ dedup := stringset.New(len(consumed))
+ for _, itm := range consumed {
+ dedup.Add(itm)
+ }
+ So(dedup.Len(), ShouldEqual, len(consumed)) // no dups
+ So(len(consumed), ShouldEqual, items) // all are accounted for
+ })
+}
« no previous file with comments | « scheduler/appengine/engine/dsset/dsset.go ('k') | scheduler/appengine/engine/internal/tq_tasks.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698