| OLD | NEW | 
|    1 // Copyright 2015 The Chromium Authors. All rights reserved. |    1 // Copyright 2015 The Chromium Authors. All rights reserved. | 
|    2 // Use of this source code is governed by a BSD-style license that can be |    2 // Use of this source code is governed by a BSD-style license that can be | 
|    3 // found in the LICENSE file. |    3 // found in the LICENSE file. | 
|    4  |    4  | 
|    5 package txnBuf |    5 package txnBuf | 
|    6  |    6  | 
|    7 import ( |    7 import ( | 
|    8         "bytes" |    8         "bytes" | 
|    9         "sync" |    9         "sync" | 
|   10  |   10  | 
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|   87         } |   87         } | 
|   88         return &sizeTracker{k2s, s.total} |   88         return &sizeTracker{k2s, s.total} | 
|   89 } |   89 } | 
|   90  |   90  | 
|   91 type txnBufState struct { |   91 type txnBufState struct { | 
|   92         sync.Mutex |   92         sync.Mutex | 
|   93  |   93  | 
|   94         // encoded key -> size of entity. A size of 0 means that the entity is |   94         // encoded key -> size of entity. A size of 0 means that the entity is | 
|   95         // deleted. |   95         // deleted. | 
|   96         entState *sizeTracker |   96         entState *sizeTracker | 
|   97 »       memDS    datastore.RawInterface |   97 »       bufDS    datastore.RawInterface | 
|   98  |   98  | 
|   99         roots     stringset.Set |   99         roots     stringset.Set | 
|  100         rootLimit int |  100         rootLimit int | 
|  101  |  101  | 
|  102 »       aid         string |  102 »       aid      string | 
|  103 »       ns          string |  103 »       ns       string | 
|  104 »       parentDS    datastore.RawInterface |  104 »       parentDS datastore.RawInterface | 
|  105 »       parentState *txnBufState |  | 
|  106  |  105  | 
|  107         // sizeBudget is the number of bytes that this transaction has to operat
     e |  106         // sizeBudget is the number of bytes that this transaction has to operat
     e | 
|  108         // within. It's only used when attempting to apply() the transaction, an
     d |  107         // within. It's only used when attempting to apply() the transaction, an
     d | 
|  109         // it is the threshold for the delta of applying this transaction to the |  108         // it is the threshold for the delta of applying this transaction to the | 
|  110         // parent transaction. Note that a buffered transaction could actually h
     ave |  109         // parent transaction. Note that a buffered transaction could actually h
     ave | 
|  111         // a negative delta if the parent transaction had many large entities wh
     ich |  110         // a negative delta if the parent transaction had many large entities wh
     ich | 
|  112         // the inner transaction deleted. |  111         // the inner transaction deleted. | 
|  113         sizeBudget int64 |  112         sizeBudget int64 | 
|  114  |  | 
|  115         // siblingLock is to prevent two nested transactions from running at the
      same |  | 
|  116         // time. |  | 
|  117         // |  | 
|  118         // Example: |  | 
|  119         //   RunInTransaction() { // root |  | 
|  120         //     RunInTransaction() // A |  | 
|  121         //     RunInTransaction() // B |  | 
|  122         //   } |  | 
|  123         // |  | 
|  124         // This will prevent A and B from running simulatneously. |  | 
|  125         siblingLock sync.Mutex |  | 
|  126 } |  113 } | 
|  127  |  114  | 
|  128 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
     tore.TransactionOptions) error { |  115 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
     tore.TransactionOptions) error { | 
|  129         inf := info.Get(ctx) |  116         inf := info.Get(ctx) | 
|  130         ns := inf.GetNamespace() |  117         ns := inf.GetNamespace() | 
|  131  |  118  | 
|  132         parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) |  119         parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) | 
|  133         roots := stringset.New(0) |  120         roots := stringset.New(0) | 
|  134         rootLimit := 1 |  121         rootLimit := 1 | 
|  135         if opts != nil && opts.XG { |  122         if opts != nil && opts.XG { | 
|  136                 rootLimit = XGTransactionGroupLimit |  123                 rootLimit = XGTransactionGroupLimit | 
|  137         } |  124         } | 
|  138         sizeBudget := DefaultSizeBudget |  125         sizeBudget := DefaultSizeBudget | 
|  139         if parentState != nil { |  126         if parentState != nil { | 
|  140                 parentState.siblingLock.Lock() |  | 
|  141                 defer parentState.siblingLock.Unlock() |  | 
|  142  |  | 
|  143                 // TODO(riannucci): this is a bit wonky since it means that a ch
     ild |  127                 // TODO(riannucci): this is a bit wonky since it means that a ch
     ild | 
|  144                 // transaction declaring XG=true will only get to modify 25 grou
     ps IF |  128                 // transaction declaring XG=true will only get to modify 25 grou
     ps IF | 
|  145                 // they're same groups affected by the parent transactions. So i
     nstead of |  129                 // they're same groups affected by the parent transactions. So i
     nstead of | 
|  146                 // respecting opts.XG for inner transactions, we just dup everyt
     hing from |  130                 // respecting opts.XG for inner transactions, we just dup everyt
     hing from | 
|  147                 // the parent transaction. |  131                 // the parent transaction. | 
|  148                 roots = parentState.roots.Dup() |  132                 roots = parentState.roots.Dup() | 
|  149                 rootLimit = parentState.rootLimit |  133                 rootLimit = parentState.rootLimit | 
|  150  |  134  | 
|  151                 sizeBudget = parentState.sizeBudget - parentState.entState.total |  135                 sizeBudget = parentState.sizeBudget - parentState.entState.total | 
|  152                 if sizeBudget < DefaultSizeThreshold { |  136                 if sizeBudget < DefaultSizeThreshold { | 
|  153                         return ErrTransactionTooLarge |  137                         return ErrTransactionTooLarge | 
|  154                 } |  138                 } | 
|  155         } |  139         } | 
|  156  |  140  | 
|  157 »       memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) |  141 »       bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) | 
|  158         if err != nil { |  142         if err != nil { | 
|  159                 return err |  143                 return err | 
|  160         } |  144         } | 
|  161  |  145  | 
|  162         state := &txnBufState{ |  146         state := &txnBufState{ | 
|  163 »       »       entState:    &sizeTracker{}, |  147 »       »       entState:   &sizeTracker{}, | 
|  164 »       »       memDS:       memDS.Raw(), |  148 »       »       bufDS:      bufDS.Raw(), | 
|  165 »       »       roots:       roots, |  149 »       »       roots:      roots, | 
|  166 »       »       rootLimit:   rootLimit, |  150 »       »       rootLimit:  rootLimit, | 
|  167 »       »       ns:          ns, |  151 »       »       ns:         ns, | 
|  168 »       »       aid:         inf.AppID(), |  152 »       »       aid:        inf.AppID(), | 
|  169 »       »       parentDS:    datastore.Get(ctx).Raw(), |  153 »       »       parentDS:   datastore.Get(context.WithValue(ctx, dsTxnBufHaveLoc
     k, true)).Raw(), | 
|  170 »       »       parentState: parentState, |  154 »       »       sizeBudget: sizeBudget, | 
|  171 »       »       sizeBudget:  sizeBudget, |  | 
|  172         } |  155         } | 
|  173         if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { |  156         if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { | 
|  174                 return err |  157                 return err | 
|  175         } |  158         } | 
|  176 »       return state.apply() |  159  | 
 |  160 »       // no reason to unlock this ever. At this point it's toast. | 
 |  161 »       state.Lock() | 
 |  162  | 
 |  163 »       if parentState == nil { | 
 |  164 »       »       return commitToReal(state) | 
 |  165 »       } | 
 |  166  | 
 |  167 »       if err = parentState.canApplyLocked(state); err != nil { | 
 |  168 »       »       return err | 
 |  169 »       } | 
 |  170  | 
 |  171 »       parentState.commitLocked(state) | 
 |  172 »       return nil | 
|  177 } |  173 } | 
|  178  |  174  | 
|  179 // item is a temporary object for representing key/entity pairs and their cache |  175 // item is a temporary object for representing key/entity pairs and their cache | 
|  180 // state (e.g. if they exist in the in-memory datastore buffer or not). |  176 // state (e.g. if they exist in the in-memory datastore buffer or not). | 
|  181 // Additionally item memoizes some common comparison strings. item objects |  177 // Additionally item memoizes some common comparison strings. item objects | 
|  182 // must never be persisted outside of a single function/query context. |  178 // must never be persisted outside of a single function/query context. | 
|  183 type item struct { |  179 type item struct { | 
|  184         key      *datastore.Key |  180         key      *datastore.Key | 
|  185         data     datastore.PropertyMap |  181         data     datastore.PropertyMap | 
|  186         buffered bool |  182         buffered bool | 
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|  229         // only need to update the roots if they did something that required upd
     ating |  225         // only need to update the roots if they did something that required upd
     ating | 
|  230         if proposedRoots.Len() > 0 { |  226         if proposedRoots.Len() > 0 { | 
|  231                 proposedRoots.Iter(func(root string) bool { |  227                 proposedRoots.Iter(func(root string) bool { | 
|  232                         t.roots.Add(root) |  228                         t.roots.Add(root) | 
|  233                         return true |  229                         return true | 
|  234                 }) |  230                 }) | 
|  235         } |  231         } | 
|  236         return nil |  232         return nil | 
|  237 } |  233 } | 
|  238  |  234  | 
|  239 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) { |  235 func (t *txnBufState) getMulti(keys []*datastore.Key, metas datastore.MultiMetaG
     etter, cb datastore.GetMultiCB, haveLock bool) error { | 
|  240         encKeys, roots := toEncoded(keys) |  236         encKeys, roots := toEncoded(keys) | 
|  241 »       ret := make([]item, len(keys)) |  237 »       data := make([]item, len(keys)) | 
|  242  |  238  | 
|  243         idxMap := []int(nil) |  239         idxMap := []int(nil) | 
|  244         toGetKeys := []*datastore.Key(nil) |  240         toGetKeys := []*datastore.Key(nil) | 
|  245  |  241  | 
|  246 »       t.Lock() |  242 »       lme := errors.NewLazyMultiError(len(keys)) | 
|  247 »       defer t.Unlock() |  243 »       err := func() error { | 
|  248  |  244 »       »       if !haveLock { | 
|  249 »       if err := t.updateRootsLocked(roots); err != nil { |  245 »       »       »       t.Lock() | 
|  250 »       »       return nil, err |  246 »       »       »       defer t.Unlock() | 
|  251 »       } |  247 »       »       } | 
|  252  |  248  | 
 |  249 »       »       if err := t.updateRootsLocked(roots); err != nil { | 
 |  250 »       »       »       return err | 
 |  251 »       »       } | 
 |  252  | 
 |  253 »       »       for i, key := range keys { | 
 |  254 »       »       »       data[i].key = key | 
 |  255 »       »       »       data[i].encKey = encKeys[i] | 
 |  256 »       »       »       if size, ok := t.entState.get(data[i].getEncKey()); ok { | 
 |  257 »       »       »       »       data[i].buffered = true | 
 |  258 »       »       »       »       if size > 0 { | 
 |  259 »       »       »       »       »       idxMap = append(idxMap, i) | 
 |  260 »       »       »       »       »       toGetKeys = append(toGetKeys, key) | 
 |  261 »       »       »       »       } | 
 |  262 »       »       »       } | 
 |  263 »       »       } | 
 |  264  | 
 |  265 »       »       if len(toGetKeys) > 0 { | 
 |  266 »       »       »       j := 0 | 
 |  267 »       »       »       t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope
     rtyMap, err error) { | 
 |  268 »       »       »       »       impossible(err) | 
 |  269 »       »       »       »       data[idxMap[j]].data = pm | 
 |  270 »       »       »       »       j++ | 
 |  271 »       »       »       }) | 
 |  272 »       »       } | 
 |  273  | 
 |  274 »       »       idxMap = nil | 
 |  275 »       »       getKeys := []*datastore.Key(nil) | 
 |  276 »       »       getMetas := datastore.MultiMetaGetter(nil) | 
 |  277  | 
 |  278 »       »       for i, itm := range data { | 
 |  279 »       »       »       if !itm.buffered { | 
 |  280 »       »       »       »       idxMap = append(idxMap, i) | 
 |  281 »       »       »       »       getKeys = append(getKeys, itm.key) | 
 |  282 »       »       »       »       getMetas = append(getMetas, metas.GetSingle(i)) | 
 |  283 »       »       »       } | 
 |  284 »       »       } | 
 |  285  | 
 |  286 »       »       if len(idxMap) > 0 { | 
 |  287 »       »       »       j := 0 | 
 |  288 »       »       »       err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da
     tastore.PropertyMap, err error) { | 
 |  289 »       »       »       »       if err != datastore.ErrNoSuchEntity { | 
 |  290 »       »       »       »       »       i := idxMap[j] | 
 |  291 »       »       »       »       »       if !lme.Assign(i, err) { | 
 |  292 »       »       »       »       »       »       data[i].data = pm | 
 |  293 »       »       »       »       »       } | 
 |  294 »       »       »       »       } | 
 |  295 »       »       »       »       j++ | 
 |  296 »       »       »       }) | 
 |  297 »       »       »       if err != nil { | 
 |  298 »       »       »       »       return err | 
 |  299 »       »       »       } | 
 |  300 »       »       } | 
 |  301 »       »       return nil | 
 |  302 »       }() | 
 |  303 »       if err != nil { | 
 |  304 »       »       return err | 
 |  305 »       } | 
 |  306  | 
 |  307 »       for i, itm := range data { | 
 |  308 »       »       err := lme.GetOne(i) | 
 |  309 »       »       if err != nil { | 
 |  310 »       »       »       cb(nil, err) | 
 |  311 »       »       } else if itm.data == nil { | 
 |  312 »       »       »       cb(nil, datastore.ErrNoSuchEntity) | 
 |  313 »       »       } else { | 
 |  314 »       »       »       cb(itm.data, nil) | 
 |  315 »       »       } | 
 |  316 »       } | 
 |  317 »       return nil | 
 |  318 } | 
 |  319  | 
 |  320 func (t *txnBufState) deleteMulti(keys []*datastore.Key, cb datastore.DeleteMult
     iCB, haveLock bool) error { | 
 |  321 »       encKeys, roots := toEncoded(keys) | 
 |  322  | 
 |  323 »       err := func() error { | 
 |  324 »       »       if !haveLock { | 
 |  325 »       »       »       t.Lock() | 
 |  326 »       »       »       defer t.Unlock() | 
 |  327 »       »       } | 
 |  328  | 
 |  329 »       »       if err := t.updateRootsLocked(roots); err != nil { | 
 |  330 »       »       »       return err | 
 |  331 »       »       } | 
 |  332  | 
 |  333 »       »       i := 0 | 
 |  334 »       »       err := t.bufDS.DeleteMulti(keys, func(err error) { | 
 |  335 »       »       »       impossible(err) | 
 |  336 »       »       »       t.entState.set(encKeys[i], 0) | 
 |  337 »       »       »       i++ | 
 |  338 »       »       }) | 
 |  339 »       »       impossible(err) | 
 |  340 »       »       return nil | 
 |  341 »       }() | 
 |  342 »       if err != nil { | 
 |  343 »       »       return err | 
 |  344 »       } | 
 |  345  | 
 |  346 »       for range keys { | 
 |  347 »       »       cb(nil) | 
 |  348 »       } | 
 |  349  | 
 |  350 »       return nil | 
 |  351 } | 
 |  352  | 
 |  353 func (t *txnBufState) fixKeys(keys []*datastore.Key) ([]*datastore.Key, error) { | 
 |  354 »       lme := errors.NewLazyMultiError(len(keys)) | 
 |  355 »       realKeys := []*datastore.Key(nil) | 
|  253         for i, key := range keys { |  356         for i, key := range keys { | 
|  254 »       »       ret[i].key = key |  357 »       »       if key.Incomplete() { | 
|  255 »       »       ret[i].encKey = encKeys[i] |  358 »       »       »       // intentionally call AllocateIDs without lock. | 
|  256 »       »       if size, ok := t.entState.get(ret[i].getEncKey()); ok { |  359 »       »       »       start, err := t.parentDS.AllocateIDs(key, 1) | 
|  257 »       »       »       ret[i].buffered = true |  360 »       »       »       if !lme.Assign(i, err) { | 
|  258 »       »       »       if size > 0 { |  361 »       »       »       »       if realKeys == nil { | 
|  259 »       »       »       »       idxMap = append(idxMap, i) |  362 »       »       »       »       »       realKeys = make([]*datastore.Key, len(ke
     ys)) | 
|  260 »       »       »       »       toGetKeys = append(toGetKeys, key) |  363 »       »       »       »       »       copy(realKeys, keys) | 
|  261 »       »       »       } |  364 »       »       »       »       } | 
|  262 »       »       } |  365  | 
|  263 »       } |  366 »       »       »       »       aid, ns, toks := key.Split() | 
|  264  |  367 »       »       »       »       toks[len(toks)-1].IntID = start | 
|  265 »       if len(toGetKeys) > 0 { |  368 »       »       »       »       realKeys[i] = datastore.NewKeyToks(aid, ns, toks
     ) | 
|  266 »       »       j := 0 |  369 »       »       »       } | 
|  267 »       »       t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, 
     err error) { |  370 »       »       } | 
 |  371 »       } | 
 |  372 »       err := lme.Get() | 
 |  373  | 
 |  374 »       if realKeys != nil { | 
 |  375 »       »       return realKeys, err | 
 |  376 »       } | 
 |  377 »       return keys, err | 
 |  378 } | 
 |  379  | 
 |  380 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM
     ap, cb datastore.PutMultiCB, haveLock bool) error { | 
 |  381 »       keys, err := t.fixKeys(keys) | 
 |  382 »       if err != nil { | 
 |  383 »       »       for _, e := range err.(errors.MultiError) { | 
 |  384 »       »       »       cb(nil, e) | 
 |  385 »       »       } | 
 |  386 »       »       return nil | 
 |  387 »       } | 
 |  388  | 
 |  389 »       encKeys, roots := toEncoded(keys) | 
 |  390  | 
 |  391 »       err = func() error { | 
 |  392 »       »       if !haveLock { | 
 |  393 »       »       »       t.Lock() | 
 |  394 »       »       »       defer t.Unlock() | 
 |  395 »       »       } | 
 |  396  | 
 |  397 »       »       if err := t.updateRootsLocked(roots); err != nil { | 
 |  398 »       »       »       return err | 
 |  399 »       »       } | 
 |  400  | 
 |  401 »       »       i := 0 | 
 |  402 »       »       err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e
     rror) { | 
|  268                         impossible(err) |  403                         impossible(err) | 
|  269 »       »       »       ret[idxMap[j]].data = pm |  404 »       »       »       t.entState.set(encKeys[i], vals[i].EstimateSize()) | 
|  270 »       »       »       j++ |  405 »       »       »       i++ | 
|  271                 }) |  406                 }) | 
|  272 »       } |  407 »       »       impossible(err) | 
|  273  |  408 »       »       return nil | 
|  274 »       return ret, nil |  409 »       }() | 
|  275 } |  410 »       if err != nil { | 
|  276  |  | 
|  277 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error { |  | 
|  278 »       encKeys, roots := toEncoded(keys) |  | 
|  279  |  | 
|  280 »       t.Lock() |  | 
|  281 »       defer t.Unlock() |  | 
|  282  |  | 
|  283 »       if err := t.updateRootsLocked(roots); err != nil { |  | 
|  284                 return err |  411                 return err | 
|  285         } |  412         } | 
|  286  |  413  | 
|  287 »       i := 0 |  414 »       for _, k := range keys { | 
|  288 »       err := t.memDS.DeleteMulti(keys, func(err error) { |  415 »       »       cb(k, nil) | 
|  289 »       »       impossible(err) |  416 »       } | 
|  290 »       »       t.entState.set(encKeys[i], 0) |  417 »       return nil | 
|  291 »       »       i++ |  418 } | 
|  292 »       }) |  419  | 
|  293 »       impossible(err) |  420 func commitToReal(s *txnBufState) error { | 
|  294 »       return nil |  421 »       toPut, toPutKeys, toDel := s.effect() | 
|  295 } |  | 
|  296  |  | 
|  297 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM
     ap) error { |  | 
|  298 »       encKeys, roots := toEncoded(keys) |  | 
|  299  |  | 
|  300 »       t.Lock() |  | 
|  301 »       defer t.Unlock() |  | 
|  302  |  | 
|  303 »       if err := t.updateRootsLocked(roots); err != nil { |  | 
|  304 »       »       return err |  | 
|  305 »       } |  | 
|  306  |  | 
|  307 »       i := 0 |  | 
|  308 »       err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) { |  | 
|  309 »       »       impossible(err) |  | 
|  310 »       »       t.entState.set(encKeys[i], vals[i].EstimateSize()) |  | 
|  311 »       »       i++ |  | 
|  312 »       }) |  | 
|  313 »       impossible(err) |  | 
|  314 »       return nil |  | 
|  315 } |  | 
|  316  |  | 
|  317 // apply actually takes the buffered transaction and applies it to the parent |  | 
|  318 // transaction. It will only return an error if the underlying 'real' datastore |  | 
|  319 // returns an error on PutMulti or DeleteMulti. |  | 
|  320 func (t *txnBufState) apply() error { |  | 
|  321 »       t.Lock() |  | 
|  322 »       defer t.Unlock() |  | 
|  323  |  | 
|  324 »       // if parentState is nil... just try to commit this anyway. The estimate
     s |  | 
|  325 »       // we're using here are just educated guesses. If it fits for real, then |  | 
|  326 »       // hooray. If not, then the underlying datastore will error. |  | 
|  327 »       if t.parentState != nil { |  | 
|  328 »       »       t.parentState.Lock() |  | 
|  329 »       »       proposedState := t.parentState.entState.dup() |  | 
|  330 »       »       t.parentState.Unlock() |  | 
|  331 »       »       for k, v := range t.entState.keyToSize { |  | 
|  332 »       »       »       proposedState.set(k, v) |  | 
|  333 »       »       } |  | 
|  334 »       »       if proposedState.total > t.sizeBudget { |  | 
|  335 »       »       »       return ErrTransactionTooLarge |  | 
|  336 »       »       } |  | 
|  337 »       } |  | 
|  338  |  | 
|  339 »       toPutKeys := []*datastore.Key(nil) |  | 
|  340 »       toPut := []datastore.PropertyMap(nil) |  | 
|  341 »       toDel := []*datastore.Key(nil) |  | 
|  342  |  | 
|  343 »       // need to pull all items out of the in-memory datastore. Fortunately we
      have |  | 
|  344 »       // kindless queries, and we disabled all the special entities, so just |  | 
|  345 »       // run a kindless query without any filters and it will return all data |  | 
|  346 »       // currently in memDS :). |  | 
|  347 »       fq, err := datastore.NewQuery("").Finalize() |  | 
|  348 »       impossible(err) |  | 
|  349  |  | 
|  350 »       err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
     p, _ datastore.CursorCB) bool { |  | 
|  351 »       »       toPutKeys = append(toPutKeys, key) |  | 
|  352 »       »       toPut = append(toPut, data) |  | 
|  353 »       »       return true |  | 
|  354 »       }) |  | 
|  355 »       memoryCorruption(err) |  | 
|  356  |  | 
|  357 »       for keyStr, size := range t.entState.keyToSize { |  | 
|  358 »       »       if size == 0 { |  | 
|  359 »       »       »       k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
     ), serialize.WithoutContext, t.aid, t.ns) |  | 
|  360 »       »       »       memoryCorruption(err) |  | 
|  361 »       »       »       toDel = append(toDel, k) |  | 
|  362 »       »       } |  | 
|  363 »       } |  | 
|  364  |  | 
|  365 »       ds := t.parentDS |  | 
|  366  |  422  | 
|  367         return parallel.FanOutIn(func(ch chan<- func() error) { |  423         return parallel.FanOutIn(func(ch chan<- func() error) { | 
|  368                 if len(toPut) > 0 { |  424                 if len(toPut) > 0 { | 
|  369                         ch <- func() error { |  425                         ch <- func() error { | 
|  370                                 mErr := errors.NewLazyMultiError(len(toPut)) |  426                                 mErr := errors.NewLazyMultiError(len(toPut)) | 
|  371                                 i := 0 |  427                                 i := 0 | 
|  372 »       »       »       »       err := ds.PutMulti(toPutKeys, toPut, func(_ *dat
     astore.Key, err error) { |  428 »       »       »       »       err := s.parentDS.PutMulti(toPutKeys, toPut, fun
     c(_ *datastore.Key, err error) { | 
|  373                                         mErr.Assign(i, err) |  429                                         mErr.Assign(i, err) | 
|  374                                         i++ |  430                                         i++ | 
|  375                                 }) |  431                                 }) | 
|  376                                 if err == nil { |  432                                 if err == nil { | 
|  377                                         err = mErr.Get() |  433                                         err = mErr.Get() | 
|  378                                 } |  434                                 } | 
|  379                                 return err |  435                                 return err | 
|  380                         } |  436                         } | 
|  381                 } |  437                 } | 
|  382                 if len(toDel) > 0 { |  438                 if len(toDel) > 0 { | 
|  383                         ch <- func() error { |  439                         ch <- func() error { | 
|  384                                 mErr := errors.NewLazyMultiError(len(toDel)) |  440                                 mErr := errors.NewLazyMultiError(len(toDel)) | 
|  385                                 i := 0 |  441                                 i := 0 | 
|  386 »       »       »       »       err := ds.DeleteMulti(toDel, func(err error) { |  442 »       »       »       »       err := s.parentDS.DeleteMulti(toDel, func(err er
     ror) { | 
|  387                                         mErr.Assign(i, err) |  443                                         mErr.Assign(i, err) | 
|  388                                         i++ |  444                                         i++ | 
|  389                                 }) |  445                                 }) | 
|  390                                 if err == nil { |  446                                 if err == nil { | 
|  391                                         err = mErr.Get() |  447                                         err = mErr.Get() | 
|  392                                 } |  448                                 } | 
|  393                                 return err |  449                                 return err | 
|  394                         } |  450                         } | 
|  395                 } |  451                 } | 
|  396         }) |  452         }) | 
|  397 } |  453 } | 
|  398  |  454  | 
 |  455 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel 
     []*datastore.Key) { | 
 |  456         // TODO(riannucci): preallocate return slices | 
 |  457  | 
 |  458         // need to pull all items out of the in-memory datastore. Fortunately we
      have | 
 |  459         // kindless queries, and we disabled all the special entities, so just | 
 |  460         // run a kindless query without any filters and it will return all data | 
 |  461         // currently in bufDS :). | 
 |  462         fq, err := datastore.NewQuery("").Finalize() | 
 |  463         impossible(err) | 
 |  464  | 
 |  465         err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
     p, _ datastore.CursorCB) bool { | 
 |  466                 toPutKeys = append(toPutKeys, key) | 
 |  467                 toPut = append(toPut, data) | 
 |  468                 return true | 
 |  469         }) | 
 |  470         memoryCorruption(err) | 
 |  471  | 
 |  472         for keyStr, size := range t.entState.keyToSize { | 
 |  473                 if size == 0 { | 
 |  474                         k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
     ), serialize.WithoutContext, t.aid, t.ns) | 
 |  475                         memoryCorruption(err) | 
 |  476                         toDel = append(toDel, k) | 
 |  477                 } | 
 |  478         } | 
 |  479  | 
 |  480         return | 
 |  481 } | 
 |  482  | 
 |  483 func (t *txnBufState) canApplyLocked(s *txnBufState) error { | 
 |  484         proposedState := t.entState.dup() | 
 |  485  | 
 |  486         for k, v := range s.entState.keyToSize { | 
 |  487                 proposedState.set(k, v) | 
 |  488         } | 
 |  489         if proposedState.total > s.sizeBudget { | 
 |  490                 return ErrTransactionTooLarge | 
 |  491         } | 
 |  492         return nil | 
 |  493 } | 
 |  494  | 
 |  495 func (t *txnBufState) commitLocked(s *txnBufState) { | 
 |  496         toPut, toPutKeys, toDel := s.effect() | 
 |  497  | 
 |  498         if len(toPut) > 0 { | 
 |  499                 impossible(t.putMulti(toPutKeys, toPut, | 
 |  500                         func(_ *datastore.Key, err error) { impossible(err) }, t
     rue)) | 
 |  501         } | 
 |  502  | 
 |  503         if len(toDel) > 0 { | 
 |  504                 impossible(t.deleteMulti(toDel, impossible, true)) | 
 |  505         } | 
 |  506 } | 
 |  507  | 
|  399 // toEncoded returns a list of all of the serialized versions of these keys, |  508 // toEncoded returns a list of all of the serialized versions of these keys, | 
|  400 // plus a stringset of all the encoded root keys that `keys` represents. |  509 // plus a stringset of all the encoded root keys that `keys` represents. | 
|  401 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |  510 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | 
|  402         roots = stringset.New(len(keys)) |  511         roots = stringset.New(len(keys)) | 
|  403         full = make([]string, len(keys)) |  512         full = make([]string, len(keys)) | 
|  404         for i, k := range keys { |  513         for i, k := range keys { | 
|  405                 roots.Add(string(serialize.ToBytes(k.Root()))) |  514                 roots.Add(string(serialize.ToBytes(k.Root()))) | 
|  406                 full[i] = string(serialize.ToBytes(k)) |  515                 full[i] = string(serialize.ToBytes(k)) | 
|  407         } |  516         } | 
|  408         return |  517         return | 
|  409 } |  518 } | 
| OLD | NEW |