OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package txnBuf |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "sync" |
| 10 |
| 11 "github.com/luci/gae/impl/memory" |
| 12 "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/gae/service/info" |
| 15 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/parallel" |
| 17 "github.com/luci/luci-go/common/stringset" |
| 18 "golang.org/x/net/context" |
| 19 ) |
| 20 |
| 21 // DefaultSizeBudget is the size budget for the root transaction. |
| 22 // |
| 23 // Because our estimation algorithm isn't entirely correct, we take 5% off |
| 24 // the limit for encoding and estimate inaccuracies. |
| 25 // |
| 26 // 10MB taken on 2015/09/24: |
| 27 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits |
| 28 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95) |
| 29 |
| 30 // DefaultSizeThreshold prevents the root transaction from getting too close |
| 31 // to the budget. If the code attempts to begin a transaction which would have |
| 32 // less than this threshold for its budget, the transaction will immediately |
| 33 // return ErrTransactionTooLarge. |
| 34 const DefaultSizeThreshold = int64(10 * 1000) |
| 35 |
| 36 // XGTransactionGroupLimit is the number of transaction groups to allow in an |
| 37 // XG transaction. |
| 38 // |
| 39 // 25 taken on 2015/09/24: |
| 40 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can
_be_done_in_a_transaction |
| 41 const XGTransactionGroupLimit = 25 |
| 42 |
| 43 // sizeTracker tracks the size of a buffered transaction. The rules are simple: |
| 44 // * deletes count for the size of their key, but 0 data |
| 45 // * puts count for the size of their key plus the 'EstimateSize' for their |
| 46 // data. |
| 47 type sizeTracker struct { |
| 48 keyToSize map[string]int64 |
| 49 total int64 |
| 50 } |
| 51 |
| 52 // set states that the given key is being set to an entity with the size `val`. |
| 53 // A val of 0 means "I'm deleting this key" |
| 54 func (s *sizeTracker) set(key string, val int64) { |
| 55 if s.keyToSize == nil { |
| 56 s.keyToSize = make(map[string]int64) |
| 57 } |
| 58 prev, existed := s.keyToSize[key] |
| 59 s.keyToSize[key] = val |
| 60 s.total += val - prev |
| 61 if !existed { |
| 62 s.total += int64(len(key)) |
| 63 } |
| 64 } |
| 65 |
| 66 // get returns the currently tracked size for key, and wheter or not the key |
| 67 // has any tracked value. |
| 68 func (s *sizeTracker) get(key string) (int64, bool) { |
| 69 size, has := s.keyToSize[key] |
| 70 return size, has |
| 71 } |
| 72 |
| 73 // has returns true iff key has a tracked value. |
| 74 func (s *sizeTracker) has(key string) bool { |
| 75 _, has := s.keyToSize[key] |
| 76 return has |
| 77 } |
| 78 |
| 79 // dup returns a duplicate sizeTracker. |
| 80 func (s *sizeTracker) dup() *sizeTracker { |
| 81 if len(s.keyToSize) == 0 { |
| 82 return &sizeTracker{} |
| 83 } |
| 84 k2s := make(map[string]int64, len(s.keyToSize)) |
| 85 for k, v := range s.keyToSize { |
| 86 k2s[k] = v |
| 87 } |
| 88 return &sizeTracker{k2s, s.total} |
| 89 } |
| 90 |
| 91 type txnBufState struct { |
| 92 sync.Mutex |
| 93 |
| 94 // encoded key -> size of entity. A size of 0 means that the entity is |
| 95 // deleted. |
| 96 entState *sizeTracker |
| 97 memDS datastore.RawInterface |
| 98 |
| 99 roots stringset.Set |
| 100 rootLimit int |
| 101 |
| 102 aid string |
| 103 ns string |
| 104 parentDS datastore.RawInterface |
| 105 parentState *txnBufState |
| 106 |
| 107 // sizeBudget is the number of bytes that this transaction has to operat
e |
| 108 // within. It's only used when attempting to apply() the transaction, an
d |
| 109 // it is the threshold for the delta of applying this transaction to the |
| 110 // parent transaction. Note that a buffered transaction could actually h
ave |
| 111 // a negative delta if the parent transaction had many large entities wh
ich |
| 112 // the inner transaction deleted. |
| 113 sizeBudget int64 |
| 114 |
| 115 // siblingLock is to prevent two nested transactions from running at the
same |
| 116 // time. |
| 117 // |
| 118 // Example: |
| 119 // RunInTransaction() { // root |
| 120 // RunInTransaction() // A |
| 121 // RunInTransaction() // B |
| 122 // } |
| 123 // |
| 124 // This will prevent A and B from running simulatneously. |
| 125 siblingLock sync.Mutex |
| 126 } |
| 127 |
| 128 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
tore.TransactionOptions) error { |
| 129 inf := info.Get(ctx) |
| 130 ns := inf.GetNamespace() |
| 131 |
| 132 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) |
| 133 roots := stringset.New(0) |
| 134 rootLimit := 1 |
| 135 if opts != nil && opts.XG { |
| 136 rootLimit = XGTransactionGroupLimit |
| 137 } |
| 138 sizeBudget := DefaultSizeBudget |
| 139 if parentState != nil { |
| 140 parentState.siblingLock.Lock() |
| 141 defer parentState.siblingLock.Unlock() |
| 142 |
| 143 // TODO(riannucci): this is a bit wonky since it means that a ch
ild |
| 144 // transaction declaring XG=true will only get to modify 25 grou
ps IF |
| 145 // they're same groups affected by the parent transactions. So i
nstead of |
| 146 // respecting opts.XG for inner transactions, we just dup everyt
hing from |
| 147 // the parent transaction. |
| 148 roots = parentState.roots.Dup() |
| 149 rootLimit = parentState.rootLimit |
| 150 |
| 151 sizeBudget = parentState.sizeBudget - parentState.entState.total |
| 152 if sizeBudget < DefaultSizeThreshold { |
| 153 return ErrTransactionTooLarge |
| 154 } |
| 155 } |
| 156 |
| 157 memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) |
| 158 if err != nil { |
| 159 return err |
| 160 } |
| 161 |
| 162 state := &txnBufState{ |
| 163 entState: &sizeTracker{}, |
| 164 memDS: memDS.Raw(), |
| 165 roots: roots, |
| 166 rootLimit: rootLimit, |
| 167 ns: ns, |
| 168 aid: inf.AppID(), |
| 169 parentDS: datastore.Get(ctx).Raw(), |
| 170 parentState: parentState, |
| 171 sizeBudget: sizeBudget, |
| 172 } |
| 173 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { |
| 174 return err |
| 175 } |
| 176 return state.apply() |
| 177 } |
| 178 |
| 179 // item is a temporary object for representing key/entity pairs and their cache |
| 180 // state (e.g. if they exist in the in-memory datastore buffer or not). |
| 181 // Additionally item memoizes some common comparison strings. item objects |
| 182 // must never be persisted outside of a single function/query context. |
| 183 type item struct { |
| 184 key *datastore.Key |
| 185 data datastore.PropertyMap |
| 186 buffered bool |
| 187 |
| 188 encKey string |
| 189 |
| 190 // cmpRow is used to hold the toComparableString value for this item dur
ing |
| 191 // a query. |
| 192 cmpRow string |
| 193 |
| 194 // err is a bit of a hack for passing back synchronized errors from |
| 195 // queryToIter. |
| 196 err error |
| 197 } |
| 198 |
| 199 func (i *item) getEncKey() string { |
| 200 if i.encKey == "" { |
| 201 i.encKey = string(serialize.ToBytes(i.key)) |
| 202 } |
| 203 return i.encKey |
| 204 } |
| 205 |
| 206 func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) str
ing { |
| 207 if i.cmpRow == "" { |
| 208 row, key := toComparableString(lower, upper, order, i.key, i.dat
a) |
| 209 i.cmpRow = string(row) |
| 210 if i.encKey == "" { |
| 211 i.encKey = string(key) |
| 212 } |
| 213 } |
| 214 return i.cmpRow |
| 215 } |
| 216 |
| 217 func (t *txnBufState) updateRootsLocked(roots stringset.Set) error { |
| 218 curRootLen := t.roots.Len() |
| 219 proposedRoots := stringset.New(1) |
| 220 roots.Iter(func(root string) bool { |
| 221 if !t.roots.Has(root) { |
| 222 proposedRoots.Add(root) |
| 223 } |
| 224 return proposedRoots.Len()+curRootLen <= t.rootLimit |
| 225 }) |
| 226 if proposedRoots.Len()+curRootLen > t.rootLimit { |
| 227 return ErrTooManyRoots |
| 228 } |
| 229 // only need to update the roots if they did something that required upd
ating |
| 230 if proposedRoots.Len() > 0 { |
| 231 proposedRoots.Iter(func(root string) bool { |
| 232 t.roots.Add(root) |
| 233 return true |
| 234 }) |
| 235 } |
| 236 return nil |
| 237 } |
| 238 |
| 239 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) { |
| 240 encKeys, roots := toEncoded(keys) |
| 241 ret := make([]item, len(keys)) |
| 242 |
| 243 idxMap := []int(nil) |
| 244 toGetKeys := []*datastore.Key(nil) |
| 245 |
| 246 t.Lock() |
| 247 defer t.Unlock() |
| 248 |
| 249 if err := t.updateRootsLocked(roots); err != nil { |
| 250 return nil, err |
| 251 } |
| 252 |
| 253 for i, key := range keys { |
| 254 ret[i].key = key |
| 255 ret[i].encKey = encKeys[i] |
| 256 if size, ok := t.entState.get(ret[i].getEncKey()); ok { |
| 257 ret[i].buffered = true |
| 258 if size > 0 { |
| 259 idxMap = append(idxMap, i) |
| 260 toGetKeys = append(toGetKeys, key) |
| 261 } |
| 262 } |
| 263 } |
| 264 |
| 265 if len(toGetKeys) > 0 { |
| 266 j := 0 |
| 267 t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap,
err error) { |
| 268 impossible(err) |
| 269 ret[idxMap[j]].data = pm |
| 270 j++ |
| 271 }) |
| 272 } |
| 273 |
| 274 return ret, nil |
| 275 } |
| 276 |
| 277 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error { |
| 278 encKeys, roots := toEncoded(keys) |
| 279 |
| 280 t.Lock() |
| 281 defer t.Unlock() |
| 282 |
| 283 if err := t.updateRootsLocked(roots); err != nil { |
| 284 return err |
| 285 } |
| 286 |
| 287 i := 0 |
| 288 err := t.memDS.DeleteMulti(keys, func(err error) { |
| 289 impossible(err) |
| 290 t.entState.set(encKeys[i], 0) |
| 291 i++ |
| 292 }) |
| 293 impossible(err) |
| 294 return nil |
| 295 } |
| 296 |
| 297 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM
ap) error { |
| 298 encKeys, roots := toEncoded(keys) |
| 299 |
| 300 t.Lock() |
| 301 defer t.Unlock() |
| 302 |
| 303 if err := t.updateRootsLocked(roots); err != nil { |
| 304 return err |
| 305 } |
| 306 |
| 307 i := 0 |
| 308 err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) { |
| 309 impossible(err) |
| 310 t.entState.set(encKeys[i], vals[i].EstimateSize()) |
| 311 i++ |
| 312 }) |
| 313 impossible(err) |
| 314 return nil |
| 315 } |
| 316 |
| 317 // apply actually takes the buffered transaction and applies it to the parent |
| 318 // transaction. It will only return an error if the underlying 'real' datastore |
| 319 // returns an error on PutMulti or DeleteMulti. |
| 320 func (t *txnBufState) apply() error { |
| 321 t.Lock() |
| 322 defer t.Unlock() |
| 323 |
| 324 // if parentState is nil... just try to commit this anyway. The estimate
s |
| 325 // we're using here are just educated guesses. If it fits for real, then |
| 326 // hooray. If not, then the underlying datastore will error. |
| 327 if t.parentState != nil { |
| 328 t.parentState.Lock() |
| 329 proposedState := t.parentState.entState.dup() |
| 330 t.parentState.Unlock() |
| 331 for k, v := range t.entState.keyToSize { |
| 332 proposedState.set(k, v) |
| 333 } |
| 334 if proposedState.total > t.sizeBudget { |
| 335 return ErrTransactionTooLarge |
| 336 } |
| 337 } |
| 338 |
| 339 toPutKeys := []*datastore.Key(nil) |
| 340 toPut := []datastore.PropertyMap(nil) |
| 341 toDel := []*datastore.Key(nil) |
| 342 |
| 343 // need to pull all items out of the in-memory datastore. Fortunately we
have |
| 344 // kindless queries, and we disabled all the special entities, so just |
| 345 // run a kindless query without any filters and it will return all data |
| 346 // currently in memDS :). |
| 347 fq, err := datastore.NewQuery("").Finalize() |
| 348 impossible(err) |
| 349 |
| 350 err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) bool { |
| 351 toPutKeys = append(toPutKeys, key) |
| 352 toPut = append(toPut, data) |
| 353 return true |
| 354 }) |
| 355 memoryCorruption(err) |
| 356 |
| 357 for keyStr, size := range t.entState.keyToSize { |
| 358 if size == 0 { |
| 359 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.aid, t.ns) |
| 360 memoryCorruption(err) |
| 361 toDel = append(toDel, k) |
| 362 } |
| 363 } |
| 364 |
| 365 ds := t.parentDS |
| 366 |
| 367 return parallel.FanOutIn(func(ch chan<- func() error) { |
| 368 if len(toPut) > 0 { |
| 369 ch <- func() error { |
| 370 mErr := errors.NewLazyMultiError(len(toPut)) |
| 371 i := 0 |
| 372 err := ds.PutMulti(toPutKeys, toPut, func(_ *dat
astore.Key, err error) { |
| 373 mErr.Assign(i, err) |
| 374 i++ |
| 375 }) |
| 376 if err == nil { |
| 377 err = mErr.Get() |
| 378 } |
| 379 return err |
| 380 } |
| 381 } |
| 382 if len(toDel) > 0 { |
| 383 ch <- func() error { |
| 384 mErr := errors.NewLazyMultiError(len(toDel)) |
| 385 i := 0 |
| 386 err := ds.DeleteMulti(toDel, func(err error) { |
| 387 mErr.Assign(i, err) |
| 388 i++ |
| 389 }) |
| 390 if err == nil { |
| 391 err = mErr.Get() |
| 392 } |
| 393 return err |
| 394 } |
| 395 } |
| 396 }) |
| 397 } |
| 398 |
| 399 // toEncoded returns a list of all of the serialized versions of these keys, |
| 400 // plus a stringset of all the encoded root keys that `keys` represents. |
| 401 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
| 402 roots = stringset.New(len(keys)) |
| 403 full = make([]string, len(keys)) |
| 404 for i, k := range keys { |
| 405 roots.Add(string(serialize.ToBytes(k.Root()))) |
| 406 full[i] = string(serialize.ToBytes(k)) |
| 407 } |
| 408 return |
| 409 } |
OLD | NEW |