Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(350)

Side by Side Diff: filter/txnBuf/state.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: rebase Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sync"
10
11 "github.com/luci/gae/impl/memory"
12 "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/stringset"
17 "golang.org/x/net/context"
18 )
19
20 // DefaultSizeBudget is the size budget for the root transaction.
21 //
22 // Because our estimation algorithm isn't entirely correct, we take 5% off
23 // the limit for encoding and estimate inaccuracies.
24 //
25 // 10MB taken on 2015/09/24:
26 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits
27 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95)
28
29 // DefaultSizeThreshold prevents the root transaction from getting too close
30 // to the budget. If the code attempts to begin a transaction which would have
31 // less than this threshold for its budget, the transaction will immediately
32 // return ErrTransactionTooLarge.
33 const DefaultSizeThreshold = int64(10 * 1000)
34
35 // XGTransactionGroupLimit is the number of transaction groups to allow in an
36 // XG transaction.
37 //
38 // 25 taken on 2015/09/24:
39 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction
40 const XGTransactionGroupLimit = 25
41
42 // sizeTracker tracks the size of a buffered transaction. The rules are simple:
43 // * deletes count for the size of their key, but 0 data
44 // * puts count for the size of their key plus the 'EstimateSize' for their
45 // data.
46 type sizeTracker struct {
47 keyToSize map[string]int64
48 total int64
49 }
50
51 // set states that the given key is being set to an entity with the size `val`.
52 // A val of 0 means "I'm deleting this key"
53 func (s *sizeTracker) set(key string, val int64) {
54 prev, existed := s.keyToSize[key]
55 if s.keyToSize == nil {
Vadim Sh. 2015/09/30 01:11:28 err... you should probably swap this 'if' with pre
iannucci 2015/09/30 02:00:10 done
56 s.keyToSize = make(map[string]int64)
57 }
58 s.keyToSize[key] = val
59 s.total += val - prev
60 if !existed {
61 s.total += int64(len(key))
62 }
63 }
64
65 // get returns the currently tracked size for key, and wheter or not the key
66 // has any tracked value.
67 func (s *sizeTracker) get(key string) (int64, bool) {
68 size, has := s.keyToSize[key]
69 return size, has
70 }
71
72 // has returns true iff key has a tracked value.
73 func (s *sizeTracker) has(key string) bool {
74 _, has := s.keyToSize[key]
75 return has
76 }
77
78 // dup returns a duplicate sizeTracker.
79 func (s *sizeTracker) dup() *sizeTracker {
80 if len(s.keyToSize) == 0 {
81 return &sizeTracker{}
82 }
83 k2s := make(map[string]int64, len(s.keyToSize))
84 for k, v := range s.keyToSize {
85 k2s[k] = v
86 }
87 return &sizeTracker{k2s, s.total}
88 }
89
90 type txnBufState struct {
91 sync.Mutex
92
93 // encoded key -> size of entity. A size of 0 means that the entity is
94 // deleted.
95 entState *sizeTracker
96 memDS datastore.RawInterface
97
98 roots stringset.Set
99 rootLimit int
100
101 aid string
102 ns string
103 parentDS datastore.RawInterface
104 parentState *txnBufState
105
106 // sizeBudget is the number of bytes that this transaction has to operat e
107 // within. It's only used when attempting to apply() the transaction, an d
108 // it is the threshold for the delta of applying this transaction to the
109 // parent transaction. Note that a buffered transaction could actually h ave
110 // a negative delta if the parent transaction had many large entities wh ich
111 // the inner transaction deleted.
112 sizeBudget int64
113
114 siblingLock sync.Mutex
Vadim Sh. 2015/09/30 01:11:29 hm....
iannucci 2015/09/30 02:00:10 added comment
115 }
116
117 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error {
118 inf := info.Get(ctx)
119 ns := inf.GetNamespace()
120
121 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState)
122 roots := stringset.New(0)
123 rootLimit := 1
124 if opts != nil && opts.XG {
125 rootLimit = XGTransactionGroupLimit
126 }
127 sizeBudget := DefaultSizeBudget
128 if parentState != nil {
129 parentState.siblingLock.Lock()
130 defer parentState.siblingLock.Unlock()
Vadim Sh. 2015/09/30 01:11:29 this is released at the very end of this function,
iannucci 2015/09/30 02:00:10 yeah. It's to prevent two transactions on the same
131
132 // TODO(riannucci): this is a bit wonky since it means that a ch ild
133 // transaction declaring XG=true will only get to modify 25 grou ps IF
134 // they're same groups affected by the parent transactions. So i nstead of
135 // respecting opts.XG for inner transactions, we just dup everyt hing from
136 // the parent transaction.
137 roots = parentState.roots.Dup()
138 rootLimit = parentState.rootLimit
139
140 sizeBudget = parentState.sizeBudget - parentState.entState.total
141 if sizeBudget < DefaultSizeThreshold {
142 return ErrTransactionTooLarge
143 }
144 }
145
146 memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
147 if err != nil {
148 return err
149 }
150
151 state := &txnBufState{
152 entState: &sizeTracker{},
153 memDS: memDS.Raw(),
154 roots: roots,
155 rootLimit: rootLimit,
156 ns: ns,
157 aid: inf.AppID(),
158 parentDS: datastore.Get(ctx).Raw(),
159 parentState: parentState,
160 sizeBudget: sizeBudget,
161 }
162 err = cb(context.WithValue(ctx, dsTxnBufParent, state))
163 if err != nil {
164 return err
165 }
166 return state.apply()
167 }
168
169 // item is a temporary object for representing key/entity pairs and their cache
170 // state (e.g. if they exist in the in-memory datastore buffer or not).
171 // Additionally item memoizes some common comparison strings. item objects
172 // should never be persisted outside of a single function/query context.
173 type item struct {
174 key *datastore.Key
175 data datastore.PropertyMap
176 buffered bool
177
178 encKey string
179
180 // cmpRow is used to hold the toComparableString value for this item dur ing
181 // a query.
182 cmpRow string
183
184 // err is a bit of a hack for passing back synchronized errors from
185 // queryToIter.
186 err error
187 }
188
189 func (i *item) getEncKey() string {
190 if i.encKey == "" {
191 i.encKey = string(serialize.ToBytes(i.key))
192 }
193 return i.encKey
194 }
195
196 func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) str ing {
197 if i.cmpRow == "" {
198 row, key := toComparableString(lower, upper, order, i.key, i.dat a)
199 i.cmpRow = string(row)
200 if i.encKey == "" {
201 i.encKey = string(key)
202 }
203 }
204 return i.cmpRow
205 }
206
207 func (t *txnBufState) updateRootsLocked(roots stringset.Set) error {
208 curRootLen := t.roots.Len()
209 proposedRoots := stringset.New(1)
210 roots.Iter(func(root string) bool {
211 if !t.roots.Has(root) {
212 proposedRoots.Add(root)
213 }
214 return proposedRoots.Len()+curRootLen <= t.rootLimit
215 })
216 if proposedRoots.Len()+curRootLen > t.rootLimit {
217 return errors.New("operating on too many entity groups in nested transaction")
218 }
219 // only need to update the roots if they did something that required upd ating
220 if proposedRoots.Len() > 0 {
221 proposedRoots.Iter(func(root string) bool {
222 t.roots.Add(root)
223 return true
224 })
225 }
226 return nil
227 }
228
229 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) {
230 encKeys, roots := toEncoded(keys)
231 ret := make([]item, len(keys))
232
233 idxMap := []int(nil)
234 toGetKeys := []*datastore.Key(nil)
235
236 t.Lock()
237 defer t.Unlock()
238
239 if err := t.updateRootsLocked(roots); err != nil {
240 return nil, err
241 }
242
243 for i, key := range keys {
244 ret[i].key = key
245 ret[i].encKey = encKeys[i]
246 if size, ok := t.entState.get(ret[i].getEncKey()); ok {
247 ret[i].buffered = true
248 if size > 0 {
249 idxMap = append(idxMap, i)
250 toGetKeys = append(toGetKeys, key)
251 }
252 }
253 }
254
255 if len(toGetKeys) > 0 {
256 j := 0
257 t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
258 impossible(err)
Vadim Sh. 2015/09/30 01:11:29 I have a feeling someday we will start seeing this
iannucci 2015/09/30 02:00:10 probably! that's why I don't ignore it :) But I do
259 ret[idxMap[j]].data = pm
260 j++
261 })
262 }
263
264 return ret, nil
265 }
266
267 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error {
268 encKeys, roots := toEncoded(keys)
269
270 t.Lock()
271 defer t.Unlock()
272
273 if err := t.updateRootsLocked(roots); err != nil {
274 return err
275 }
276
277 i := 0
278 err := t.memDS.DeleteMulti(keys, func(err error) {
279 impossible(err)
280 t.entState.set(encKeys[i], 0)
281 i++
282 })
283 impossible(err)
284 return nil
285 }
286
287 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error {
288 encKeys, roots := toEncoded(keys)
289
290 t.Lock()
291 defer t.Unlock()
292
293 if err := t.updateRootsLocked(roots); err != nil {
294 return err
295 }
296
297 i := 0
298 err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
299 impossible(err)
300 t.entState.set(encKeys[i], vals[i].EstimateSize())
301 i++
302 })
303 impossible(err)
304 return nil
305 }
306
307 // apply actually takes the buffered transaction and applies it to the parent
308 // transaction. It will only return an error if the underlying 'real' datastore
309 // returns an error on PutMulti or DeleteMulti.
310 func (t *txnBufState) apply() error {
311 t.Lock()
312 defer t.Unlock()
313
314 // if parentState is nil... just try to commit this anyway. The estimate s
315 // we're using here are just educated guesses. If it fits for real, then
316 // hooray. If not, then the underlying datastore will error.
317 if t.parentState != nil {
318 proposedState := t.parentState.entState.dup()
319 for k, v := range t.entState.keyToSize {
320 proposedState.set(k, v)
321 }
322 if proposedState.total > t.sizeBudget {
323 return ErrTransactionTooLarge
324 }
325 }
326
327 toPutKeys := []*datastore.Key(nil)
328 toPut := []datastore.PropertyMap(nil)
329 toDel := []*datastore.Key(nil)
330
331 // need to pull all items out of the in-memory datastore. Fortunately we have
332 // kindless queries, and we disabled all the special entities, so just
333 // run a kindless query without any filters and it will return all data
334 // currently in memDS :).
335 fq, err := datastore.NewQuery("").Finalize()
336 impossible(err)
337
338 err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool {
339 toPutKeys = append(toPutKeys, key)
340 toPut = append(toPut, data)
341 return true
342 })
343 memoryCorruption(err)
344
345 for keyStr, size := range t.entState.keyToSize {
346 if size == 0 {
347 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns)
348 memoryCorruption(err)
349 toDel = append(toDel, k)
350 }
351 }
352
353 wg := sync.WaitGroup{}
354
355 pErr := error(nil)
356 dErr := error(nil)
357
358 ds := t.parentDS
359 if toPut != nil {
360 wg.Add(1)
361 go func() {
362 defer wg.Done()
363 mErr := errors.NewLazyMultiError(len(toPut))
364 i := 0
365 pErr = ds.PutMulti(toPutKeys, toPut, func(_ *datastore.K ey, err error) {
366 i++
367 mErr.Assign(i, err)
368 })
369 pErr = mErr.Get()
370 }()
371 }
372
373 if toDel != nil {
374 wg.Add(1)
375 go func() {
376 defer wg.Done()
377 mErr := errors.NewLazyMultiError(len(toDel))
378 i := 0
379 dErr = ds.DeleteMulti(toDel, func(err error) {
380 mErr.Assign(i, err)
381 i++
382 })
383 dErr = mErr.Get()
384 }()
385 }
386 wg.Wait()
387
388 if pErr != nil {
389 return pErr
390 }
391 return dErr
392 }
393
394 // toEncoded returns a list of all of the serialized versions of these keys,
395 // plus a stringset of all the encoded root keys that.
396 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
397 roots = stringset.New(len(keys))
398 full = make([]string, len(keys))
399 for i, k := range keys {
400 roots.Add(string(serialize.ToBytes(k.Root())))
401 full[i] = string(serialize.ToBytes(k))
402 }
403 return
404 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698