OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package dscache |
| 6 |
| 7 import ( |
| 8 "time" |
| 9 |
| 10 ds "github.com/luci/gae/service/datastore" |
| 11 "github.com/luci/gae/service/memcache" |
| 12 log "github.com/luci/luci-go/common/logging" |
| 13 "golang.org/x/net/context" |
| 14 ) |
| 15 |
| 16 type dsCache struct { |
| 17 ds.RawInterface |
| 18 |
| 19 *supportContext |
| 20 } |
| 21 |
| 22 var _ ds.RawInterface = (*dsCache)(nil) |
| 23 |
| 24 func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
| 25 return d.mutation(keys, func() error { |
| 26 return d.RawInterface.DeleteMulti(keys, cb) |
| 27 }) |
| 28 } |
| 29 |
| 30 func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiC
B) error { |
| 31 return d.mutation(keys, func() error { |
| 32 return d.RawInterface.PutMulti(keys, vals, cb) |
| 33 }) |
| 34 } |
| 35 |
| 36 func (d *dsCache) GetMulti(keys []ds.Key, metas ds.MultiMetaGetter, cb ds.GetMul
tiCB) error { |
| 37 lockItems, nonce := d.mkRandLockItems(keys, metas) |
| 38 if len(lockItems) == 0 { |
| 39 return d.RawInterface.GetMulti(keys, metas, cb) |
| 40 } |
| 41 |
| 42 if err := d.mc.AddMulti(lockItems); err != nil { |
| 43 (log.Fields{log.ErrorKey: err}).Warningf( |
| 44 d.c, "dscache: GetMulti: memcache.AddMulti") |
| 45 |
| 46 } |
| 47 if err := d.mc.GetMulti(lockItems); err != nil { |
| 48 (log.Fields{log.ErrorKey: err}).Warningf( |
| 49 d.c, "dscache: GetMulti: memcache.GetMulti") |
| 50 } |
| 51 |
| 52 p := makeFetchPlan(d.c, d.aid, d.ns, &facts{keys, metas, lockItems, nonc
e}) |
| 53 |
| 54 if !p.empty() { |
| 55 // looks like we have something to pull from datastore, and mayb
e some work |
| 56 // to save stuff back to memcache. |
| 57 |
| 58 toCas := []memcache.Item{} |
| 59 j := 0 |
| 60 err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.
PropertyMap, err error) { |
| 61 i := p.idxMap[j] |
| 62 toSave := p.toSave[j] |
| 63 j++ |
| 64 |
| 65 data := []byte(nil) |
| 66 |
| 67 // true: save entity to memcache |
| 68 // false: lock entity in memcache forever |
| 69 shouldSave := true |
| 70 if err == nil { |
| 71 p.decoded[i] = pm |
| 72 if toSave != nil { |
| 73 data = encodeItemValue(pm) |
| 74 if len(data) > internalValueSizeLimit { |
| 75 shouldSave = false |
| 76 log.Warningf( |
| 77 d.c, "dscache: encoded e
ntity too big (%d/%d)!", |
| 78 len(data), internalValue
SizeLimit) |
| 79 } |
| 80 } |
| 81 } else { |
| 82 p.lme.Assign(i, err) |
| 83 if err != ds.ErrNoSuchEntity { |
| 84 return // aka continue to the next entry |
| 85 } |
| 86 } |
| 87 |
| 88 if toSave != nil { |
| 89 if shouldSave { // save |
| 90 expSecs := metas.GetMetaDefault(i, Cache
ExpirationMeta, CacheTimeSeconds).(int64) |
| 91 toSave.SetFlags(uint32(ItemHasData)) |
| 92 toSave.SetExpiration(time.Duration(expSe
cs) * time.Second) |
| 93 toSave.SetValue(data) |
| 94 } else { |
| 95 // Set a lock with an infinite timeout.
No one else should try to |
| 96 // serialize this item to memcache until
something Put/Delete's it. |
| 97 toSave.SetFlags(uint32(ItemHasLock)) |
| 98 toSave.SetExpiration(0) |
| 99 toSave.SetValue(nil) |
| 100 } |
| 101 toCas = append(toCas, toSave) |
| 102 } |
| 103 }) |
| 104 if err != nil { |
| 105 return err |
| 106 } |
| 107 if len(toCas) > 0 { |
| 108 // we have entries to save back to memcache. |
| 109 if err := d.mc.CompareAndSwapMulti(toCas); err != nil { |
| 110 (log.Fields{log.ErrorKey: err}).Warningf( |
| 111 d.c, "dscache: GetMulti: memcache.Compar
eAndSwapMulti") |
| 112 } |
| 113 } |
| 114 } |
| 115 |
| 116 // finally, run the callback for all of the decoded items and the errors
, |
| 117 // if any. |
| 118 for i, dec := range p.decoded { |
| 119 cb(dec, p.lme.GetOne(i)) |
| 120 } |
| 121 |
| 122 return nil |
| 123 } |
| 124 |
| 125 func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.Trans
actionOptions) error { |
| 126 txnState := dsTxnState{} |
| 127 err := d.RawInterface.RunInTransaction(func(ctx context.Context) error { |
| 128 txnState.reset() |
| 129 err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState)) |
| 130 if err == nil { |
| 131 err = txnState.apply(d.supportContext) |
| 132 } |
| 133 return err |
| 134 }, opts) |
| 135 if err == nil { |
| 136 txnState.release(d.supportContext) |
| 137 } |
| 138 return err |
| 139 } |
OLD | NEW |