Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package txnBuf | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "sync" | |
| 10 | |
| 11 "github.com/luci/gae/impl/memory" | |
| 12 "github.com/luci/gae/service/datastore" | |
| 13 "github.com/luci/gae/service/datastore/serialize" | |
| 14 "github.com/luci/gae/service/info" | |
| 15 "github.com/luci/luci-go/common/errors" | |
| 16 "github.com/luci/luci-go/common/stringset" | |
| 17 "golang.org/x/net/context" | |
| 18 ) | |
| 19 | |
| 20 // DefaultSizeBudget is the size budget for the root transaction. | |
| 21 // | |
| 22 // Because our estimation algorithm isn't entirely correct, we take 5% off | |
| 23 // the limit for encoding and estimate inaccuracies. | |
| 24 // | |
| 25 // 10MB taken on 2015/09/24: | |
| 26 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits | |
| 27 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95) | |
| 28 | |
| 29 // DefaultSizeThreshold prevents the root transaction from getting too close | |
| 30 // to the budget. If the code attempts to begin a transaction which would have | |
| 31 // less than this threshold for its budget, the transaction will immediately | |
| 32 // return ErrTransactionTooLarge. | |
| 33 const DefaultSizeThreshold = int64(10 * 1000) | |
| 34 | |
| 35 // XGTransactionGroupLimit is the number of transaction groups to allow in an | |
| 36 // XG transaction. | |
| 37 // | |
| 38 // 25 taken on 2015/09/24: | |
| 39 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction | |
| 40 const XGTransactionGroupLimit = 25 | |
| 41 | |
| 42 // sizeTracker tracks the size of a buffered transaction. The rules are simple: | |
| 43 // * deletes count for the size of their key, but 0 data | |
| 44 // * puts count for the size of their key plus the 'EstimateSize' for their | |
| 45 // data. | |
| 46 type sizeTracker struct { | |
| 47 keyToSize map[string]int64 | |
| 48 total int64 | |
| 49 } | |
| 50 | |
| 51 // set states that the given key is being set to an entity with the size `val`. | |
| 52 // A val of 0 means "I'm deleting this key" | |
| 53 func (s *sizeTracker) set(key string, val int64) { | |
| 54 prev, existed := s.keyToSize[key] | |
| 55 if s.keyToSize == nil { | |
|
Vadim Sh.
2015/09/30 01:11:28
err... you should probably swap this 'if' with pre
iannucci
2015/09/30 02:00:10
done
| |
| 56 s.keyToSize = make(map[string]int64) | |
| 57 } | |
| 58 s.keyToSize[key] = val | |
| 59 s.total += val - prev | |
| 60 if !existed { | |
| 61 s.total += int64(len(key)) | |
| 62 } | |
| 63 } | |
| 64 | |
| 65 // get returns the currently tracked size for key, and wheter or not the key | |
| 66 // has any tracked value. | |
| 67 func (s *sizeTracker) get(key string) (int64, bool) { | |
| 68 size, has := s.keyToSize[key] | |
| 69 return size, has | |
| 70 } | |
| 71 | |
| 72 // has returns true iff key has a tracked value. | |
| 73 func (s *sizeTracker) has(key string) bool { | |
| 74 _, has := s.keyToSize[key] | |
| 75 return has | |
| 76 } | |
| 77 | |
| 78 // dup returns a duplicate sizeTracker. | |
| 79 func (s *sizeTracker) dup() *sizeTracker { | |
| 80 if len(s.keyToSize) == 0 { | |
| 81 return &sizeTracker{} | |
| 82 } | |
| 83 k2s := make(map[string]int64, len(s.keyToSize)) | |
| 84 for k, v := range s.keyToSize { | |
| 85 k2s[k] = v | |
| 86 } | |
| 87 return &sizeTracker{k2s, s.total} | |
| 88 } | |
| 89 | |
| 90 type txnBufState struct { | |
| 91 sync.Mutex | |
| 92 | |
| 93 // encoded key -> size of entity. A size of 0 means that the entity is | |
| 94 // deleted. | |
| 95 entState *sizeTracker | |
| 96 memDS datastore.RawInterface | |
| 97 | |
| 98 roots stringset.Set | |
| 99 rootLimit int | |
| 100 | |
| 101 aid string | |
| 102 ns string | |
| 103 parentDS datastore.RawInterface | |
| 104 parentState *txnBufState | |
| 105 | |
| 106 // sizeBudget is the number of bytes that this transaction has to operat e | |
| 107 // within. It's only used when attempting to apply() the transaction, an d | |
| 108 // it is the threshold for the delta of applying this transaction to the | |
| 109 // parent transaction. Note that a buffered transaction could actually h ave | |
| 110 // a negative delta if the parent transaction had many large entities wh ich | |
| 111 // the inner transaction deleted. | |
| 112 sizeBudget int64 | |
| 113 | |
| 114 siblingLock sync.Mutex | |
|
Vadim Sh.
2015/09/30 01:11:29
hm....
iannucci
2015/09/30 02:00:10
added comment
| |
| 115 } | |
| 116 | |
| 117 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error { | |
| 118 inf := info.Get(ctx) | |
| 119 ns := inf.GetNamespace() | |
| 120 | |
| 121 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) | |
| 122 roots := stringset.New(0) | |
| 123 rootLimit := 1 | |
| 124 if opts != nil && opts.XG { | |
| 125 rootLimit = XGTransactionGroupLimit | |
| 126 } | |
| 127 sizeBudget := DefaultSizeBudget | |
| 128 if parentState != nil { | |
| 129 parentState.siblingLock.Lock() | |
| 130 defer parentState.siblingLock.Unlock() | |
|
Vadim Sh.
2015/09/30 01:11:29
this is released at the very end of this function,
iannucci
2015/09/30 02:00:10
yeah. It's to prevent two transactions on the same
| |
| 131 | |
| 132 // TODO(riannucci): this is a bit wonky since it means that a ch ild | |
| 133 // transaction declaring XG=true will only get to modify 25 grou ps IF | |
| 134 // they're same groups affected by the parent transactions. So i nstead of | |
| 135 // respecting opts.XG for inner transactions, we just dup everyt hing from | |
| 136 // the parent transaction. | |
| 137 roots = parentState.roots.Dup() | |
| 138 rootLimit = parentState.rootLimit | |
| 139 | |
| 140 sizeBudget = parentState.sizeBudget - parentState.entState.total | |
| 141 if sizeBudget < DefaultSizeThreshold { | |
| 142 return ErrTransactionTooLarge | |
| 143 } | |
| 144 } | |
| 145 | |
| 146 memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) | |
| 147 if err != nil { | |
| 148 return err | |
| 149 } | |
| 150 | |
| 151 state := &txnBufState{ | |
| 152 entState: &sizeTracker{}, | |
| 153 memDS: memDS.Raw(), | |
| 154 roots: roots, | |
| 155 rootLimit: rootLimit, | |
| 156 ns: ns, | |
| 157 aid: inf.AppID(), | |
| 158 parentDS: datastore.Get(ctx).Raw(), | |
| 159 parentState: parentState, | |
| 160 sizeBudget: sizeBudget, | |
| 161 } | |
| 162 err = cb(context.WithValue(ctx, dsTxnBufParent, state)) | |
| 163 if err != nil { | |
| 164 return err | |
| 165 } | |
| 166 return state.apply() | |
| 167 } | |
| 168 | |
| 169 // item is a temporary object for representing key/entity pairs and their cache | |
| 170 // state (e.g. if they exist in the in-memory datastore buffer or not). | |
| 171 // Additionally item memoizes some common comparison strings. item objects | |
| 172 // should never be persisted outside of a single function/query context. | |
| 173 type item struct { | |
| 174 key *datastore.Key | |
| 175 data datastore.PropertyMap | |
| 176 buffered bool | |
| 177 | |
| 178 encKey string | |
| 179 | |
| 180 // cmpRow is used to hold the toComparableString value for this item dur ing | |
| 181 // a query. | |
| 182 cmpRow string | |
| 183 | |
| 184 // err is a bit of a hack for passing back synchronized errors from | |
| 185 // queryToIter. | |
| 186 err error | |
| 187 } | |
| 188 | |
| 189 func (i *item) getEncKey() string { | |
| 190 if i.encKey == "" { | |
| 191 i.encKey = string(serialize.ToBytes(i.key)) | |
| 192 } | |
| 193 return i.encKey | |
| 194 } | |
| 195 | |
| 196 func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) str ing { | |
| 197 if i.cmpRow == "" { | |
| 198 row, key := toComparableString(lower, upper, order, i.key, i.dat a) | |
| 199 i.cmpRow = string(row) | |
| 200 if i.encKey == "" { | |
| 201 i.encKey = string(key) | |
| 202 } | |
| 203 } | |
| 204 return i.cmpRow | |
| 205 } | |
| 206 | |
| 207 func (t *txnBufState) updateRootsLocked(roots stringset.Set) error { | |
| 208 curRootLen := t.roots.Len() | |
| 209 proposedRoots := stringset.New(1) | |
| 210 roots.Iter(func(root string) bool { | |
| 211 if !t.roots.Has(root) { | |
| 212 proposedRoots.Add(root) | |
| 213 } | |
| 214 return proposedRoots.Len()+curRootLen <= t.rootLimit | |
| 215 }) | |
| 216 if proposedRoots.Len()+curRootLen > t.rootLimit { | |
| 217 return errors.New("operating on too many entity groups in nested transaction") | |
| 218 } | |
| 219 // only need to update the roots if they did something that required upd ating | |
| 220 if proposedRoots.Len() > 0 { | |
| 221 proposedRoots.Iter(func(root string) bool { | |
| 222 t.roots.Add(root) | |
| 223 return true | |
| 224 }) | |
| 225 } | |
| 226 return nil | |
| 227 } | |
| 228 | |
| 229 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) { | |
| 230 encKeys, roots := toEncoded(keys) | |
| 231 ret := make([]item, len(keys)) | |
| 232 | |
| 233 idxMap := []int(nil) | |
| 234 toGetKeys := []*datastore.Key(nil) | |
| 235 | |
| 236 t.Lock() | |
| 237 defer t.Unlock() | |
| 238 | |
| 239 if err := t.updateRootsLocked(roots); err != nil { | |
| 240 return nil, err | |
| 241 } | |
| 242 | |
| 243 for i, key := range keys { | |
| 244 ret[i].key = key | |
| 245 ret[i].encKey = encKeys[i] | |
| 246 if size, ok := t.entState.get(ret[i].getEncKey()); ok { | |
| 247 ret[i].buffered = true | |
| 248 if size > 0 { | |
| 249 idxMap = append(idxMap, i) | |
| 250 toGetKeys = append(toGetKeys, key) | |
| 251 } | |
| 252 } | |
| 253 } | |
| 254 | |
| 255 if len(toGetKeys) > 0 { | |
| 256 j := 0 | |
| 257 t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) { | |
| 258 impossible(err) | |
|
Vadim Sh.
2015/09/30 01:11:29
I have a feeling someday we will start seeing this
iannucci
2015/09/30 02:00:10
probably! that's why I don't ignore it :) But I do
| |
| 259 ret[idxMap[j]].data = pm | |
| 260 j++ | |
| 261 }) | |
| 262 } | |
| 263 | |
| 264 return ret, nil | |
| 265 } | |
| 266 | |
| 267 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error { | |
| 268 encKeys, roots := toEncoded(keys) | |
| 269 | |
| 270 t.Lock() | |
| 271 defer t.Unlock() | |
| 272 | |
| 273 if err := t.updateRootsLocked(roots); err != nil { | |
| 274 return err | |
| 275 } | |
| 276 | |
| 277 i := 0 | |
| 278 err := t.memDS.DeleteMulti(keys, func(err error) { | |
| 279 impossible(err) | |
| 280 t.entState.set(encKeys[i], 0) | |
| 281 i++ | |
| 282 }) | |
| 283 impossible(err) | |
| 284 return nil | |
| 285 } | |
| 286 | |
| 287 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error { | |
| 288 encKeys, roots := toEncoded(keys) | |
| 289 | |
| 290 t.Lock() | |
| 291 defer t.Unlock() | |
| 292 | |
| 293 if err := t.updateRootsLocked(roots); err != nil { | |
| 294 return err | |
| 295 } | |
| 296 | |
| 297 i := 0 | |
| 298 err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) { | |
| 299 impossible(err) | |
| 300 t.entState.set(encKeys[i], vals[i].EstimateSize()) | |
| 301 i++ | |
| 302 }) | |
| 303 impossible(err) | |
| 304 return nil | |
| 305 } | |
| 306 | |
| 307 // apply actually takes the buffered transaction and applies it to the parent | |
| 308 // transaction. It will only return an error if the underlying 'real' datastore | |
| 309 // returns an error on PutMulti or DeleteMulti. | |
| 310 func (t *txnBufState) apply() error { | |
| 311 t.Lock() | |
| 312 defer t.Unlock() | |
| 313 | |
| 314 // if parentState is nil... just try to commit this anyway. The estimate s | |
| 315 // we're using here are just educated guesses. If it fits for real, then | |
| 316 // hooray. If not, then the underlying datastore will error. | |
| 317 if t.parentState != nil { | |
| 318 proposedState := t.parentState.entState.dup() | |
| 319 for k, v := range t.entState.keyToSize { | |
| 320 proposedState.set(k, v) | |
| 321 } | |
| 322 if proposedState.total > t.sizeBudget { | |
| 323 return ErrTransactionTooLarge | |
| 324 } | |
| 325 } | |
| 326 | |
| 327 toPutKeys := []*datastore.Key(nil) | |
| 328 toPut := []datastore.PropertyMap(nil) | |
| 329 toDel := []*datastore.Key(nil) | |
| 330 | |
| 331 // need to pull all items out of the in-memory datastore. Fortunately we have | |
| 332 // kindless queries, and we disabled all the special entities, so just | |
| 333 // run a kindless query without any filters and it will return all data | |
| 334 // currently in memDS :). | |
| 335 fq, err := datastore.NewQuery("").Finalize() | |
| 336 impossible(err) | |
| 337 | |
| 338 err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool { | |
| 339 toPutKeys = append(toPutKeys, key) | |
| 340 toPut = append(toPut, data) | |
| 341 return true | |
| 342 }) | |
| 343 memoryCorruption(err) | |
| 344 | |
| 345 for keyStr, size := range t.entState.keyToSize { | |
| 346 if size == 0 { | |
| 347 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns) | |
| 348 memoryCorruption(err) | |
| 349 toDel = append(toDel, k) | |
| 350 } | |
| 351 } | |
| 352 | |
| 353 wg := sync.WaitGroup{} | |
| 354 | |
| 355 pErr := error(nil) | |
| 356 dErr := error(nil) | |
| 357 | |
| 358 ds := t.parentDS | |
| 359 if toPut != nil { | |
| 360 wg.Add(1) | |
| 361 go func() { | |
| 362 defer wg.Done() | |
| 363 mErr := errors.NewLazyMultiError(len(toPut)) | |
| 364 i := 0 | |
| 365 pErr = ds.PutMulti(toPutKeys, toPut, func(_ *datastore.K ey, err error) { | |
| 366 i++ | |
| 367 mErr.Assign(i, err) | |
| 368 }) | |
| 369 pErr = mErr.Get() | |
| 370 }() | |
| 371 } | |
| 372 | |
| 373 if toDel != nil { | |
| 374 wg.Add(1) | |
| 375 go func() { | |
| 376 defer wg.Done() | |
| 377 mErr := errors.NewLazyMultiError(len(toDel)) | |
| 378 i := 0 | |
| 379 dErr = ds.DeleteMulti(toDel, func(err error) { | |
| 380 mErr.Assign(i, err) | |
| 381 i++ | |
| 382 }) | |
| 383 dErr = mErr.Get() | |
| 384 }() | |
| 385 } | |
| 386 wg.Wait() | |
| 387 | |
| 388 if pErr != nil { | |
| 389 return pErr | |
| 390 } | |
| 391 return dErr | |
| 392 } | |
| 393 | |
| 394 // toEncoded returns a list of all of the serialized versions of these keys, | |
| 395 // plus a stringset of all the encoded root keys that. | |
| 396 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | |
| 397 roots = stringset.New(len(keys)) | |
| 398 full = make([]string, len(keys)) | |
| 399 for i, k := range keys { | |
| 400 roots.Add(string(serialize.ToBytes(k.Root()))) | |
| 401 full[i] = string(serialize.ToBytes(k)) | |
| 402 } | |
| 403 return | |
| 404 } | |
| OLD | NEW |