Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(45)

Side by Side Diff: scheduler/appengine/engine/dsset/dsset_test.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: rebase Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package dsset
16
17 import (
18 "fmt"
19 "math/rand"
20 "sync"
21 "testing"
22 "time"
23
24 "golang.org/x/net/context"
25
26 "github.com/luci/gae/impl/memory"
27 "github.com/luci/gae/service/datastore"
28 "github.com/luci/luci-go/common/clock"
29 "github.com/luci/luci-go/common/clock/testclock"
30 "github.com/luci/luci-go/common/data/rand/mathrand"
31 "github.com/luci/luci-go/common/data/stringset"
32
33 . "github.com/smartystreets/goconvey/convey"
34 )
35
36 func testingContext() context.Context {
37 c := memory.Use(context.Background())
38 c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC()))
39 c = mathrand.Set(c, rand.New(rand.NewSource(1000)))
40 return c
41 }
42
43 // pop pops a bunch of items from the set and returns items that were popped.
44 func pop(c context.Context, s *Set, listing *Listing, ids []string) (popped []st ring, tombs []*Tombstone, err error) {
45 op, err := s.BeginPop(c, listing)
46 if err != nil {
47 return nil, nil, err
48 }
49 for _, id := range ids {
50 if op.Pop(id) {
51 popped = append(popped, id)
52 }
53 }
54 if tombs, err = FinishPop(c, op); err != nil {
55 return nil, nil, err
56 }
57 return popped, tombs, nil
58 }
59
60 func TestSet(t *testing.T) {
61 t.Parallel()
62
63 Convey("item one lifecycle", t, func() {
64 c := testingContext()
65
66 set := Set{
67 ID: "test",
68 ShardCount: 3,
69 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
70 TombstonesDelay: time.Minute,
71 }
72
73 // Add one item.
74 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
75
76 // The item is returned by the listing.
77 listing, err := set.List(c)
78 So(err, ShouldBeNil)
79 So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
80 So(listing.Tombstones, ShouldBeNil)
81
82 // Pop it!
83 var cleanup []*Tombstone
84 err = datastore.RunInTransaction(c, func(c context.Context) erro r {
85 popped, tombs, err := pop(c, &set, listing, []string{"ab c"})
86 So(err, ShouldBeNil)
87 So(popped, ShouldResemble, []string{"abc"})
88 So(len(tombs), ShouldEqual, 1)
89 So(tombs[0].id, ShouldEqual, "abc")
90 So(len(tombs[0].storage), ShouldEqual, 1)
91 cleanup = tombs
92 return nil
93 }, nil)
94 So(err, ShouldBeNil)
95
96 // The listing no longer returns it, but we have a fresh tombsto ne that can
97 // be cleaned up.
98 listing, err = set.List(c)
99 So(err, ShouldBeNil)
100 So(listing.Items, ShouldBeNil)
101 So(len(listing.Tombstones), ShouldEqual, 1)
102 So(listing.Tombstones[0].id, ShouldEqual, "abc")
103
104 // Cleaning up the storage using tombstones from Pop works.
105 So(CleanupStorage(c, cleanup), ShouldBeNil)
106
107 // The listing no longer returns the item, and there's no tombst ones to
108 // cleanup.
109 listing, err = set.List(c)
110 So(err, ShouldBeNil)
111 So(listing.Items, ShouldBeNil)
112 So(listing.Tombstones, ShouldBeNil)
113
114 // Attempt to add it back (should be ignored). Add a bunch of ti mes to make
115 // sure to fill in many shards (this is pseudo-random).
116 for i := 0; i < 5; i++ {
117 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
118 }
119
120 // The listing still doesn't returns it, but we now have a tombs tone to
121 // cleanup (again).
122 listing, err = set.List(c)
123 So(err, ShouldBeNil)
124 So(listing.Items, ShouldBeNil)
125 So(len(listing.Tombstones), ShouldEqual, 1)
126 So(listing.Tombstones[0].old, ShouldBeFalse)
127 So(len(listing.Tombstones[0].storage), ShouldEqual, 3) // all sh ards
128
129 // Popping it again doesn't work either.
130 err = datastore.RunInTransaction(c, func(c context.Context) erro r {
131 popped, tombs, err := pop(c, &set, listing, []string{"ab c"})
132 So(err, ShouldBeNil)
133 So(popped, ShouldBeNil)
134 So(tombs, ShouldBeNil)
135 return nil
136 }, nil)
137 So(err, ShouldBeNil)
138
139 // Cleaning up the storage, again. This should make List stop re turning
140 // the tombstone (since it has no storage items associated with it and it's
141 // not ready to be evicted yet).
142 So(CleanupStorage(c, listing.Tombstones), ShouldBeNil)
143 listing, err = set.List(c)
144 So(err, ShouldBeNil)
145 So(listing.Items, ShouldBeNil)
146 So(listing.Tombstones, ShouldBeNil)
147
148 // Time passes, tombstone expires.
149 clock.Get(c).(testclock.TestClock).Add(2 * time.Minute)
150
151 // Listing now returns expired tombstone.
152 listing, err = set.List(c)
153 So(err, ShouldBeNil)
154 So(listing.Items, ShouldBeNil)
155 So(len(listing.Tombstones), ShouldEqual, 1)
156 So(len(listing.Tombstones[0].storage), ShouldEqual, 0) // cleane d already
157
158 // Cleanup storage keys.
159 So(CleanupStorage(c, listing.Tombstones), ShouldBeNil)
160
161 // Cleanup the tombstones themselves.
162 err = datastore.RunInTransaction(c, func(c context.Context) erro r {
163 popped, tombs, err := pop(c, &set, listing, nil)
164 So(err, ShouldBeNil)
165 So(popped, ShouldBeNil)
166 So(tombs, ShouldBeNil)
167 return nil
168 }, nil)
169 So(err, ShouldBeNil)
170
171 // No tombstones returned any longer.
172 listing, err = set.List(c)
173 So(err, ShouldBeNil)
174 So(listing.Items, ShouldBeNil)
175 So(listing.Tombstones, ShouldBeNil)
176
177 // And the item can be added back now, since no trace of it is l eft.
178 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
179
180 // Yep, it is there.
181 listing, err = set.List(c)
182 So(err, ShouldBeNil)
183 So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
184 So(listing.Tombstones, ShouldBeNil)
185 })
186
187 Convey("stress", t, func() {
188 // Add 1000 items in parallel from N goroutines, and (also in pa rallel),
189 // run N instances of "List and pop all", collecting the result in single
190 // list. There should be no duplicates in the final list!
191 c := testingContext()
192
193 set := Set{
194 ID: "test",
195 ShardCount: 3,
196 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
197 TombstonesDelay: time.Minute,
198 }
199
200 producers := 3
201 consumers := 5
202 items := 100
203
204 wakeups := make(chan string)
205
206 lock := sync.Mutex{}
207 consumed := []string{}
208
209 for i := 0; i < producers; i++ {
210 go func() {
211 for j := 0; j < items; j++ {
212 set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}})
213 // Wake up 3 consumers, so they "fight".
214 wakeups <- "wake"
215 wakeups <- "wake"
216 wakeups <- "wake"
217 }
218 for i := 0; i < consumers; i++ {
219 wakeups <- "done"
220 }
221 }()
222 }
223
224 consume := func() {
225 listing, err := set.List(c)
226 if err != nil || len(listing.Items) == 0 {
227 return
228 }
229
230 keys := []string{}
231 for _, itm := range listing.Items {
232 keys = append(keys, itm.ID)
233 }
234
235 // Try to pop all.
236 var popped []string
237 var tombs []*Tombstone
238 err = datastore.RunInTransaction(c, func(c context.Conte xt) error {
239 var err error
240 popped, tombs, err = pop(c, &set, listing, keys)
241 return err
242 }, nil)
243 // Best-effort storage cleanup on success.
244 if err == nil {
245 CleanupStorage(c, tombs)
246 }
247
248 // Consider items consumed only if transaction has lande d.
249 if err == nil && len(popped) != 0 {
250 lock.Lock()
251 consumed = append(consumed, popped...)
252 lock.Unlock()
253 }
254 }
255
256 wg := sync.WaitGroup{}
257 wg.Add(consumers)
258 for i := 0; i < consumers; i++ {
259 go func() {
260 defer wg.Done()
261 done := false
262 for !done {
263 done = (<-wakeups) == "done"
264 consume()
265 }
266 }()
267 }
268
269 wg.Wait() // this waits for completion of the entire pipeline
270
271 // Make sure 'consumed' is the initially produced set.
272 dedup := stringset.New(len(consumed))
273 for _, itm := range consumed {
274 dedup.Add(itm)
275 }
276 So(dedup.Len(), ShouldEqual, len(consumed)) // no dups
277 So(len(consumed), ShouldEqual, items) // all are accounted for
278 })
279 }
OLDNEW
« no previous file with comments | « scheduler/appengine/engine/dsset/dsset.go ('k') | scheduler/appengine/engine/internal/tq_tasks.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698