Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(189)

Side by Side Diff: scheduler/appengine/engine/dsset/dsset.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: rebase Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package dsset implements a particular flavor of datastore-backed set.
16 //
17 // Due to its internal structure, it requires some maintenance on behalf of the
18 // caller to periodically cleanup removed items (aka tombstones).
19 //
20 // Items added to the set should have unique IDs, at least for the duration of
21 // some configurable time interval, as defined by TombstonesDelay property.
22 // It means removed items can't be added back to the set right away (the set
23 // will think they are already there). This is required to make 'Add' operation
24 // idempotent.
25 //
26 // TombstonesDelay is assumed to be much larger than time scale of all "fast"
27 // processes in the system, in particular all List+Pop processes. For example,
28 // if List+Pop is expected to take 1 min, TombstonesDelay should be >> 1 min
29 // (e.g. 5 min). Setting TombstonesDelay to very large value is harmful though,
30 // since it may slow down 'List' and 'Pop' (by allowing more garbage that will
31 // have to be filtered out).
32 //
33 // Properties (where N is current size of the set):
34 // * Batch 'Add' with configurable QPS limit, O(1) performance.
35 // * Transactional consistent 'Pop' (1 QPS limit), O(N) performance.
36 // * Non-transactional consistent 'List' (1 QPS limit), O(N) performance.
37 // * Popped items can't be re-added until their tombstones expire.
38 //
39 // These properties make dsset suitable for multiple producers, single consumer
40 // queues, where order of items is not important, each item has a unique
41 // identifier, and the queue size is small.
42 //
43 // Structurally dsset consists of N+1 entity groups:
44 // * N separate entity groups that contain N shards of the set.
45 // * 1 entity group (with a configurable root) that holds tombstones.
46 //
47 // It is safe to increase number of shards at any time. Decreasing number of
48 // shards is dangerous (but can be done with some more coding).
49 //
50 // More shards make:
51 // * Add() less contentious (so it can support more QPS).
52 // * List() and CleanupStorage() slower and more expensive.
53 // * Pop() is not affected by number of shards.
54 package dsset
55
56 import (
57 "fmt"
58 "sync"
59 "time"
60
61 "golang.org/x/net/context"
62
63 "github.com/luci/gae/service/datastore"
64 "github.com/luci/luci-go/common/clock"
65 "github.com/luci/luci-go/common/data/rand/mathrand"
66 "github.com/luci/luci-go/common/data/stringset"
67 "github.com/luci/luci-go/common/errors"
68 "github.com/luci/luci-go/common/retry/transient"
69 )
70
71 // batchSize is total number of items to pass to PutMulti or DeleteMulti RPCs.
72 const batchSize = 500
73
74 // Set holds a set of Items and uses Tombstones to achieve idempotency of Add.
75 //
76 // Producers just call Add(...).
77 //
78 // The consumer must run more elaborate algorithm that ensures atomicity of
79 // 'Pop' and takes care of cleaning up of the garbage. This requires a mix of
80 // transactional and non-transactional actions:
81 //
82 // listing, err := set.List(ctx)
83 // if err != nil || listing.Empty() {
84 // return err
85 // }
86 //
87 // if err := dsset.CleanupStorage(ctx, listing.Tombstones); err != nil {
88 // return err
89 // }
90 //
91 // ... Fetch any additional info associated with 'listing.Items' ...
92 //
93 // var tombstones []*dsset.Tombstone
94 // err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
95 // op, err := set.BeginPop(ctx, listing)
96 // if err != nil {
97 // return err
98 // }
99 // for _, itm := range items {
100 // if op.Pop(item.ID) {
101 // // The item was indeed in the set and we've just removed it!
102 // } else {
103 // // Some other transaction has popped it already.
104 // }
105 // }
106 // tombstones, err = dsset.FinishPop(ctx, op)
107 // return err
108 // }, nil)
109 // if err == nil {
110 // dsset.CleanupStorage(ctx, tombstones) // best-effort cleanup
111 // }
112 // return err
113 type Set struct {
114 ID string // global ID, used to construct datastore keys
115 ShardCount int // number of entity groups to use for sto rage
116 TombstonesRoot *datastore.Key // tombstones entity parent key
117 TombstonesDelay time.Duration // how long to keep tombstones in the set
118 }
119
120 // Item is what's stored in the set.
121 type Item struct {
122 ID string // unique in time identifier of the item
123 Value []byte // arbitrary value (<1 MB, but preferably much smaller)
124 }
125
126 // Listing is returned by 'List' call.
127 //
128 // It contains actual listing of items in the set, as well as a bunch of service
129 // information used by other operations ('CleanupStorage' and 'Pop') to keep
130 // the set in a garbage-free and consistent state.
131 //
132 // The only way to construct a correct Listing is to call 'List' method.
133 //
134 // See comments for Set struct and List method for more info.
135 type Listing struct {
136 Items []Item // all items in the set, in arbitrary order
137 Tombstones []*Tombstone // tombstones that can be cleaned up now
138
139 set string // parent set ID
140 producedAt time.Time // when 'List' call was initiated
141 idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanu p
142 }
143
144 // Empty is true if both 'Items' and 'Tombstones' are empty
145 func (l *Listing) Empty() bool {
146 return len(l.Items) == 0 && len(l.Tombstones) == 0
147 }
148
149 // Tombstone is a reference to a deleted item that still lingers in the set.
150 //
151 // Tombstones exist to make sure recently popped items do not reappear in the
152 // set if producers attempt to re-add them.
153 //
154 // Its fields are intentionally private to force correct usage of Set's methods.
155 type Tombstone struct {
156 id string // deleted item ID
157 storage []*datastore.Key // itemEntity's to delete in 'CleanupStorage'
158 old bool // true if tombstone should be popped in 'Pop '
159 cleanedUp bool // true if 'CleanupStorage' processed the tom bstone
160 }
161
162 // Add idempotently adds a bunch of items to the set.
163 //
164 // If items with given keys are already in the set, or have been deleted
165 // recently, they won't be re-added. No error is returned in this case. When
166 // retrying the call like that, the caller is responsible to pass exact same
167 // Item.Value, otherwise 'List' may return random variant of the added item.
168 //
169 // Writes to some single entity group (not known in advance). If called outside
170 // of a transaction and the call fails, may add only some subset of items.
171 // Running inside a transaction makes this operation atomic.
172 //
173 // Returns only transient errors.
174 func (s *Set) Add(c context.Context, items []Item) error {
175 // Pick a random shard and add all new items there. If this is a retry, they
176 // may exist in some other shard already. We don't care, they'll be
177 // deduplicated in 'List'. If added items have been popped already (they have
178 // tombstones), 'List' will omit them as well.
179 shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount))
180 entities := make([]itemEntity, len(items))
181 for i, itm := range items {
182 entities[i] = itemEntity{
183 ID: itm.ID,
184 Parent: shardRoot,
185 Value: itm.Value,
186 }
187 }
188 return transient.Tag.Apply(batchOp(len(entities), func(start, end int) e rror {
189 return datastore.Put(c, entities[start:end])
190 }))
191 }
192
193 // List returns all items that are currently in the set (in arbitrary order),
194 // as well as a set of tombstones that points to items that were previously
195 // popped and can be cleaned up now.
196 //
197 // Must be called outside of transactions (panics otherwise). Reads many entity
198 // groups, including TombstonesRoot one.
199 //
200 // The set of tombstones to cleanup can be passed to 'CleanupStorage', and
201 // later to 'BeginPop' (as party of the listing), in that order. Not doing
202 // so will lead to accumulation of garbage in the set that will slow down 'List'
203 // and 'Pop'.
204 //
205 // Returns only transient errors.
206 func (s *Set) List(c context.Context) (*Listing, error) {
207 if datastore.CurrentTransaction(c) != nil {
208 panic("dsset.Set.List must be called outside of a transaction")
209 }
210 now := clock.Now(c).UTC()
211
212 // Fetch all shards (via consistent ancestor queries) and all tombstones .
213
214 shards := make([][]*itemEntity, s.ShardCount)
215 tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
216
217 wg := sync.WaitGroup{}
218 wg.Add(1 + s.ShardCount)
219 errs := errors.NewLazyMultiError(s.ShardCount + 1)
220
221 go func() {
222 defer wg.Done()
223 if err := datastore.Get(c, &tombsEntity); err != nil && err != d atastore.ErrNoSuchEntity {
224 errs.Assign(0, err)
225 }
226 }()
227
228 for i := 0; i < s.ShardCount; i++ {
229 go func(i int) {
230 defer wg.Done()
231 q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRo ot(c, i))
232 errs.Assign(i+1, datastore.GetAll(c, q, &shards[i]))
233 }(i)
234 }
235
236 wg.Wait()
237 if err := errs.Get(); err != nil {
238 return nil, transient.Tag.Apply(err)
239 }
240
241 // Mapping "item ID" => "list of entities to delete to remove it". This is
242 // eventually used by 'CleanupStorage'. Under normal circumstances, the list
243 // has only one item, but there can be more if 'Add' call was retried (s o the
244 // item ends up in multiple different shards).
245 idToKeys := map[string][]*datastore.Key{}
246 for _, shard := range shards {
247 for _, e := range shard {
248 idToKeys[e.ID] = append(idToKeys[e.ID], datastore.KeyFor Obj(c, e))
249 }
250 }
251
252 // A set of items we pretend not to see. Initially all tombstoned ones.
253 //
254 // Since we are iterating over tombstone list anyway, find all sufficien tly
255 // old tombstones or tombstones that still have storage associated with them.
256 // We return them to the caller, so they can be cleaned up:
257 // * 'CleanupStorage' makes sure 'storage' entities are deleted.
258 // * 'BeginPop' completely erases old tombstones.
259 var tombs []*Tombstone
260 ignore := stringset.New(len(tombsEntity.Tombstones))
261 for _, t := range tombsEntity.Tombstones {
262 ignore.Add(t.ID)
263 old := now.Sub(t.Tombstoned) > s.TombstonesDelay
264 if storage := idToKeys[t.ID]; len(storage) > 0 || old {
265 tombs = append(tombs, &Tombstone{
266 id: t.ID,
267 storage: storage,
268 old: old, // if true, BeginPop will delete t his tombstone
269 })
270 }
271 }
272
273 // Join all shards, throwing away tombstoned and duplicated items.
274 var items []Item
275 for _, shard := range shards {
276 for _, e := range shard {
277 if !ignore.Has(e.ID) {
278 items = append(items, Item{
279 ID: e.ID,
280 Value: e.Value,
281 })
282 ignore.Add(e.ID)
283 }
284 }
285 }
286
287 return &Listing{
288 Items: items,
289 Tombstones: tombs,
290 set: s.ID,
291 producedAt: now,
292 idToKeys: idToKeys,
293 }, nil
294 }
295
296 // PopOp is an in-progress 'Pop' operation.
297 //
298 // See BeginPop.
299 type PopOp struct {
300 ctx context.Context // datastore context to use for thi s op
301 txn datastore.Transaction // a transaction that started Begin Pop
302 now time.Time // popping time for all popped item s
303 dirty bool // true if the tombstone map was mo dified
304 finished bool // true if finished already
305 entity *tombstonesEntity // entity with tombstones
306 tombs map[string]tombstone // entity.Tombstones in a map form
307 idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanup
308 popped []*Tombstone // new tombstones for popped items
309 }
310
311 // BeginPop initiates 'Pop' operation.
312 //
313 // Pop operation is used to transactionally remove items from the set, as well
314 // as cleanup old tombstones. It must be finished with 'dsset.FinishPop', even
315 // if no items have been popped: the internal state still can change in this
316 // case, since 'BeginPop' cleans up old tombstones. Even more, it is necessary
317 // to do 'Pop' if listing contains non-empty set of tombstones (regardless of
318 // whether the caller wants to actually pop any items from the set). This is
319 // part of the required set maintenance.
320 //
321 // Requires a transaction. Modifies TombstonesRoot entity group (and only it).
322 //
323 // Returns only transient errors. Such errors usually mean that the entire pop
324 // sequence ('List' + 'Pop') should be retried.
325 func (s *Set) BeginPop(c context.Context, listing *Listing) (*PopOp, error) {
326 if listing.set != s.ID {
327 panic("passed Listing from another set")
328 }
329 txn := datastore.CurrentTransaction(c)
330 if txn == nil {
331 panic("dsset.Set.BeginPop must be called inside a transaction")
332 }
333
334 now := clock.Now(c).UTC()
335 if age := now.Sub(listing.producedAt); age > s.TombstonesDelay {
336 return nil, transient.Tag.Apply(fmt.Errorf("the listing is stale (%s > %s)", age, s.TombstonesDelay))
337 }
338
339 entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
340 if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNo SuchEntity {
341 return nil, transient.Tag.Apply(err)
342 }
343
344 // The data in tombstonesEntity, in map form.
345 tombs := make(map[string]tombstone, len(entity.Tombstones))
346 for _, t := range entity.Tombstones {
347 tombs[t.ID] = t
348 }
349
350 // Throw away old tombstones right away.
351 dirty := false
352 for _, tomb := range listing.Tombstones {
353 if tomb.old {
354 if !tomb.cleanedUp {
355 panic("trying to remove Tombstone that wasn't cl eaned up")
356 }
357 if _, hasTomb := tombs[tomb.id]; hasTomb {
358 delete(tombs, tomb.id)
359 dirty = true
360 }
361 }
362 }
363
364 return &PopOp{
365 ctx: c,
366 txn: txn,
367 now: now,
368 dirty: dirty,
369 entity: entity,
370 tombs: tombs,
371 idToKeys: listing.idToKeys,
372 }, nil
373 }
374
375 // CanPop returns true if the given item can be popped from the set.
376 //
377 // Returns false if this item has been popped before (perhaps in another
378 // transaction), or it's not in the the listing passed to BeginPop.
379 func (p *PopOp) CanPop(id string) bool {
380 if _, hasTomb := p.tombs[id]; hasTomb {
381 return false // already popped by someone else
382 }
383 if _, present := p.idToKeys[id]; present {
384 return true // listed in the set
385 }
386 return false
387 }
388
389 // Pop removed the item from the set and returns true if it was there.
390 //
391 // Returns false if this item has been popped before (perhaps in another
392 // transaction), or it's not in the the listing passed to BeginPop.
393 func (p *PopOp) Pop(id string) bool {
394 if p.finished {
395 panic("the operation has already been finished")
396 }
397 if !p.CanPop(id) {
398 return false
399 }
400 p.tombs[id] = tombstone{ID: id, Tombstoned: p.now}
401 p.popped = append(p.popped, &Tombstone{
402 id: id,
403 storage: p.idToKeys[id],
404 old: false, // BeingPop will ignore this fresh tombstone
405 })
406 p.dirty = true
407 return true
408 }
409
410 // makeTombstonesEntity is used internally by FinishPop.
411 func (p *PopOp) makeTombstonesEntity() *tombstonesEntity {
412 p.entity.Tombstones = p.entity.Tombstones[:0]
413 for _, tomb := range p.tombs {
414 p.entity.Tombstones = append(p.entity.Tombstones, tomb)
415 }
416 return p.entity
417 }
418
419 ////////////////////////////////////////////////////////////////////////////////
420
421 // FinishPop completes one or more pop operations (for different sets) by
422 // submitting changes to datastore.
423 //
424 // Must be called within same transaction that called BeginPop.
425 //
426 // It returns a list of tombstones for popped items. The storage used by the
427 // items can be reclaimed right away by calling 'CleanupStorage'. It is fine
428 // not to do so, 'List' will eventually return all tombstones that need cleaning
429 // anyway. Calling 'CleanupStorage' as best effort is still beneficial though,
430 // since it will reduce the amount of garbage in the set.
431 //
432 // Returns only transient errors.
433 func FinishPop(ctx context.Context, ops ...*PopOp) (tombs []*Tombstone, err erro r) {
434 txn := datastore.CurrentTransaction(ctx)
435
436 entities := []*tombstonesEntity{}
437 tombsCount := 0
438 for _, op := range ops {
439 if op.finished {
440 panic("the operation has already been finished")
441 }
442 if op.txn != txn {
443 panic("wrong transaction")
444 }
445 if op.dirty {
446 entities = append(entities, op.makeTombstonesEntity())
447 tombsCount += len(op.popped)
448 }
449 }
450
451 if err := datastore.Put(ctx, entities); err != nil {
452 return nil, transient.Tag.Apply(err)
453 }
454
455 if tombsCount != 0 {
456 tombs = make([]*Tombstone, 0, tombsCount)
457 }
458 for _, op := range ops {
459 tombs = append(tombs, op.popped...)
460 op.finished = true
461 }
462
463 return tombs, nil
464 }
465
466 // CleanupStorage deletes entities used to store items under given tombstones.
467 //
468 // This is datastore's MultiDelete RPC in disguise. Touches many entity groups.
469 // Must be called outside of transactions. Idempotent.
470 //
471 // Can handle tombstones from multiple different sets at once. This is preferred
472 // over calling 'CleanupStorage' multiple times (once per set), since it
473 // collapses multiple datastore RPCs into one.
474 //
475 // This MUST be called before tombstones returned by 'List' are removed in
476 // 'Pop'. Failure to do so will make items reappear in the set.
477 //
478 // Returns only transient errors. There's no way to know which items were
479 // removed and which weren't in case of an error.
480 func CleanupStorage(c context.Context, cleanup ...[]*Tombstone) error {
481 if datastore.CurrentTransaction(c) != nil {
482 panic("dsset.CleanupStorage must be called outside of a transact ion")
483 }
484
485 keys := []*datastore.Key{}
486 for _, tombs := range cleanup {
487 for _, tomb := range tombs {
488 keys = append(keys, tomb.storage...)
489 }
490 }
491
492 err := batchOp(len(keys), func(start, end int) error {
493 return datastore.Delete(c, keys[start:end])
494 })
495 if err != nil {
496 return transient.Tag.Apply(err)
497 }
498
499 for _, tombs := range cleanup {
500 for _, tomb := range tombs {
501 tomb.cleanedUp = true
502 tomb.storage = nil
503 }
504 }
505 return nil
506 }
507
508 ////////////////////////////////////////////////////////////////////////////////
509
510 type itemEntity struct {
511 _kind string `gae:"$kind,dsset.Item"`
512
513 ID string `gae:"$id"`
514 Parent *datastore.Key `gae:"$parent"`
515 Value []byte `gae:",noindex"`
516 }
517
518 type tombstonesEntity struct {
519 _kind string `gae:"$kind,dsset.Tombstones"`
520
521 ID string `gae:"$id"`
522 Parent *datastore.Key `gae:"$parent"`
523 Tombstones []tombstone `gae:",noindex"`
524 }
525
526 type tombstone struct {
527 ID string // ID of tombstoned item
528 Tombstoned time.Time // when it was popped
529 }
530
531 // shardRoot returns entity group key to use for a given shard.
532 func (s *Set) shardRoot(c context.Context, n int) *datastore.Key {
533 return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil)
534 }
535
536 // batchOp splits 'total' into batches and calls 'op' in parallel.
537 //
538 // Doesn't preserve order of returned errors! Don't try to deconstruct the
539 // returned multi error, the position of individual errors there does not
540 // correlate with the original array.
541 func batchOp(total int, op func(start, end int) error) error {
542 switch {
543 case total == 0:
544 return nil
545 case total <= batchSize:
546 return op(0, total)
547 }
548
549 errs := make(chan error)
550 ops := 0
551 offset := 0
552 for total > 0 {
553 count := batchSize
554 if count > total {
555 count = total
556 }
557 go func(start, end int) {
558 errs <- op(start, end)
559 }(offset, offset+count)
560 offset += count
561 total -= count
562 ops++
563 }
564
565 var all errors.MultiError
566 for i := 0; i < ops; i++ {
567 err := <-errs
568 if merr, yep := err.(errors.MultiError); yep {
569 for _, e := range merr {
570 if e != nil {
571 all = append(all, e)
572 }
573 }
574 } else if err != nil {
575 all = append(all, err)
576 }
577 }
578
579 if len(all) == 0 {
580 return nil
581 }
582 return all
583 }
OLDNEW
« no previous file with comments | « scheduler/appengine/engine/cron/demo/queue.yaml ('k') | scheduler/appengine/engine/dsset/dsset_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698