OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 package dsset |
| 16 |
| 17 import ( |
| 18 "fmt" |
| 19 "math/rand" |
| 20 "sync" |
| 21 "testing" |
| 22 "time" |
| 23 |
| 24 "golang.org/x/net/context" |
| 25 |
| 26 "github.com/luci/gae/impl/memory" |
| 27 "github.com/luci/gae/service/datastore" |
| 28 "github.com/luci/luci-go/common/clock" |
| 29 "github.com/luci/luci-go/common/clock/testclock" |
| 30 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 31 "github.com/luci/luci-go/common/data/stringset" |
| 32 |
| 33 . "github.com/smartystreets/goconvey/convey" |
| 34 ) |
| 35 |
| 36 func testingContext() context.Context { |
| 37 c := memory.Use(context.Background()) |
| 38 c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC())) |
| 39 c = mathrand.Set(c, rand.New(rand.NewSource(1000))) |
| 40 return c |
| 41 } |
| 42 |
| 43 // pop pops a bunch of items from the set and returns items that were popped. |
| 44 func pop(c context.Context, s *Set, listing *Listing, ids []string) (popped []st
ring, tombs []*Tombstone, err error) { |
| 45 op, err := s.BeginPop(c, listing) |
| 46 if err != nil { |
| 47 return nil, nil, err |
| 48 } |
| 49 for _, id := range ids { |
| 50 if op.Pop(id) { |
| 51 popped = append(popped, id) |
| 52 } |
| 53 } |
| 54 if tombs, err = FinishPop(c, op); err != nil { |
| 55 return nil, nil, err |
| 56 } |
| 57 return popped, tombs, nil |
| 58 } |
| 59 |
| 60 func TestSet(t *testing.T) { |
| 61 t.Parallel() |
| 62 |
| 63 Convey("item one lifecycle", t, func() { |
| 64 c := testingContext() |
| 65 |
| 66 set := Set{ |
| 67 ID: "test", |
| 68 ShardCount: 3, |
| 69 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0,
nil), |
| 70 TombstonesDelay: time.Minute, |
| 71 } |
| 72 |
| 73 // Add one item. |
| 74 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
| 75 |
| 76 // The item is returned by the listing. |
| 77 listing, err := set.List(c) |
| 78 So(err, ShouldBeNil) |
| 79 So(listing.Items, ShouldResemble, []Item{{ID: "abc"}}) |
| 80 So(listing.Tombstones, ShouldBeNil) |
| 81 |
| 82 // Pop it! |
| 83 var cleanup []*Tombstone |
| 84 err = datastore.RunInTransaction(c, func(c context.Context) erro
r { |
| 85 popped, tombs, err := pop(c, &set, listing, []string{"ab
c"}) |
| 86 So(err, ShouldBeNil) |
| 87 So(popped, ShouldResemble, []string{"abc"}) |
| 88 So(len(tombs), ShouldEqual, 1) |
| 89 So(tombs[0].id, ShouldEqual, "abc") |
| 90 So(len(tombs[0].storage), ShouldEqual, 1) |
| 91 cleanup = tombs |
| 92 return nil |
| 93 }, nil) |
| 94 So(err, ShouldBeNil) |
| 95 |
| 96 // The listing no longer returns it, but we have a fresh tombsto
ne that can |
| 97 // be cleaned up. |
| 98 listing, err = set.List(c) |
| 99 So(err, ShouldBeNil) |
| 100 So(listing.Items, ShouldBeNil) |
| 101 So(len(listing.Tombstones), ShouldEqual, 1) |
| 102 So(listing.Tombstones[0].id, ShouldEqual, "abc") |
| 103 |
| 104 // Cleaning up the storage using tombstones from Pop works. |
| 105 So(CleanupStorage(c, cleanup), ShouldBeNil) |
| 106 |
| 107 // The listing no longer returns the item, and there's no tombst
ones to |
| 108 // cleanup. |
| 109 listing, err = set.List(c) |
| 110 So(err, ShouldBeNil) |
| 111 So(listing.Items, ShouldBeNil) |
| 112 So(listing.Tombstones, ShouldBeNil) |
| 113 |
| 114 // Attempt to add it back (should be ignored). Add a bunch of ti
mes to make |
| 115 // sure to fill in many shards (this is pseudo-random). |
| 116 for i := 0; i < 5; i++ { |
| 117 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
| 118 } |
| 119 |
| 120 // The listing still doesn't returns it, but we now have a tombs
tone to |
| 121 // cleanup (again). |
| 122 listing, err = set.List(c) |
| 123 So(err, ShouldBeNil) |
| 124 So(listing.Items, ShouldBeNil) |
| 125 So(len(listing.Tombstones), ShouldEqual, 1) |
| 126 So(listing.Tombstones[0].old, ShouldBeFalse) |
| 127 So(len(listing.Tombstones[0].storage), ShouldEqual, 3) // all sh
ards |
| 128 |
| 129 // Popping it again doesn't work either. |
| 130 err = datastore.RunInTransaction(c, func(c context.Context) erro
r { |
| 131 popped, tombs, err := pop(c, &set, listing, []string{"ab
c"}) |
| 132 So(err, ShouldBeNil) |
| 133 So(popped, ShouldBeNil) |
| 134 So(tombs, ShouldBeNil) |
| 135 return nil |
| 136 }, nil) |
| 137 So(err, ShouldBeNil) |
| 138 |
| 139 // Cleaning up the storage, again. This should make List stop re
turning |
| 140 // the tombstone (since it has no storage items associated with
it and it's |
| 141 // not ready to be evicted yet). |
| 142 So(CleanupStorage(c, listing.Tombstones), ShouldBeNil) |
| 143 listing, err = set.List(c) |
| 144 So(err, ShouldBeNil) |
| 145 So(listing.Items, ShouldBeNil) |
| 146 So(listing.Tombstones, ShouldBeNil) |
| 147 |
| 148 // Time passes, tombstone expires. |
| 149 clock.Get(c).(testclock.TestClock).Add(2 * time.Minute) |
| 150 |
| 151 // Listing now returns expired tombstone. |
| 152 listing, err = set.List(c) |
| 153 So(err, ShouldBeNil) |
| 154 So(listing.Items, ShouldBeNil) |
| 155 So(len(listing.Tombstones), ShouldEqual, 1) |
| 156 So(len(listing.Tombstones[0].storage), ShouldEqual, 0) // cleane
d already |
| 157 |
| 158 // Cleanup storage keys. |
| 159 So(CleanupStorage(c, listing.Tombstones), ShouldBeNil) |
| 160 |
| 161 // Cleanup the tombstones themselves. |
| 162 err = datastore.RunInTransaction(c, func(c context.Context) erro
r { |
| 163 popped, tombs, err := pop(c, &set, listing, nil) |
| 164 So(err, ShouldBeNil) |
| 165 So(popped, ShouldBeNil) |
| 166 So(tombs, ShouldBeNil) |
| 167 return nil |
| 168 }, nil) |
| 169 So(err, ShouldBeNil) |
| 170 |
| 171 // No tombstones returned any longer. |
| 172 listing, err = set.List(c) |
| 173 So(err, ShouldBeNil) |
| 174 So(listing.Items, ShouldBeNil) |
| 175 So(listing.Tombstones, ShouldBeNil) |
| 176 |
| 177 // And the item can be added back now, since no trace of it is l
eft. |
| 178 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
| 179 |
| 180 // Yep, it is there. |
| 181 listing, err = set.List(c) |
| 182 So(err, ShouldBeNil) |
| 183 So(listing.Items, ShouldResemble, []Item{{ID: "abc"}}) |
| 184 So(listing.Tombstones, ShouldBeNil) |
| 185 }) |
| 186 |
| 187 Convey("stress", t, func() { |
| 188 // Add 1000 items in parallel from N goroutines, and (also in pa
rallel), |
| 189 // run N instances of "List and pop all", collecting the result
in single |
| 190 // list. There should be no duplicates in the final list! |
| 191 c := testingContext() |
| 192 |
| 193 set := Set{ |
| 194 ID: "test", |
| 195 ShardCount: 3, |
| 196 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0,
nil), |
| 197 TombstonesDelay: time.Minute, |
| 198 } |
| 199 |
| 200 producers := 3 |
| 201 consumers := 5 |
| 202 items := 100 |
| 203 |
| 204 wakeups := make(chan string) |
| 205 |
| 206 lock := sync.Mutex{} |
| 207 consumed := []string{} |
| 208 |
| 209 for i := 0; i < producers; i++ { |
| 210 go func() { |
| 211 for j := 0; j < items; j++ { |
| 212 set.Add(c, []Item{{ID: fmt.Sprintf("%d",
j)}}) |
| 213 // Wake up 3 consumers, so they "fight". |
| 214 wakeups <- "wake" |
| 215 wakeups <- "wake" |
| 216 wakeups <- "wake" |
| 217 } |
| 218 for i := 0; i < consumers; i++ { |
| 219 wakeups <- "done" |
| 220 } |
| 221 }() |
| 222 } |
| 223 |
| 224 consume := func() { |
| 225 listing, err := set.List(c) |
| 226 if err != nil || len(listing.Items) == 0 { |
| 227 return |
| 228 } |
| 229 |
| 230 keys := []string{} |
| 231 for _, itm := range listing.Items { |
| 232 keys = append(keys, itm.ID) |
| 233 } |
| 234 |
| 235 // Try to pop all. |
| 236 var popped []string |
| 237 var tombs []*Tombstone |
| 238 err = datastore.RunInTransaction(c, func(c context.Conte
xt) error { |
| 239 var err error |
| 240 popped, tombs, err = pop(c, &set, listing, keys) |
| 241 return err |
| 242 }, nil) |
| 243 // Best-effort storage cleanup on success. |
| 244 if err == nil { |
| 245 CleanupStorage(c, tombs) |
| 246 } |
| 247 |
| 248 // Consider items consumed only if transaction has lande
d. |
| 249 if err == nil && len(popped) != 0 { |
| 250 lock.Lock() |
| 251 consumed = append(consumed, popped...) |
| 252 lock.Unlock() |
| 253 } |
| 254 } |
| 255 |
| 256 wg := sync.WaitGroup{} |
| 257 wg.Add(consumers) |
| 258 for i := 0; i < consumers; i++ { |
| 259 go func() { |
| 260 defer wg.Done() |
| 261 done := false |
| 262 for !done { |
| 263 done = (<-wakeups) == "done" |
| 264 consume() |
| 265 } |
| 266 }() |
| 267 } |
| 268 |
| 269 wg.Wait() // this waits for completion of the entire pipeline |
| 270 |
| 271 // Make sure 'consumed' is the initially produced set. |
| 272 dedup := stringset.New(len(consumed)) |
| 273 for _, itm := range consumed { |
| 274 dedup.Add(itm) |
| 275 } |
| 276 So(dedup.Len(), ShouldEqual, len(consumed)) // no dups |
| 277 So(len(consumed), ShouldEqual, items) // all are accounted
for |
| 278 }) |
| 279 } |
OLD | NEW |