OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package txnBuf | 5 package txnBuf |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "sync" | 9 "sync" |
10 | 10 |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
98 sync.Mutex | 98 sync.Mutex |
99 | 99 |
100 // encoded key -> size of entity. A size of 0 means that the entity is | 100 // encoded key -> size of entity. A size of 0 means that the entity is |
101 // deleted. | 101 // deleted. |
102 entState *sizeTracker | 102 entState *sizeTracker |
103 bufDS datastore.RawInterface | 103 bufDS datastore.RawInterface |
104 | 104 |
105 roots stringset.Set | 105 roots stringset.Set |
106 rootLimit int | 106 rootLimit int |
107 | 107 |
108 » aid string | 108 » kc datastore.KeyContext |
109 » ns string | |
110 parentDS datastore.RawInterface | 109 parentDS datastore.RawInterface |
111 | 110 |
112 // sizeBudget is the number of bytes that this transaction has to operat
e | 111 // sizeBudget is the number of bytes that this transaction has to operat
e |
113 // within. It's only used when attempting to apply() the transaction, an
d | 112 // within. It's only used when attempting to apply() the transaction, an
d |
114 // it is the threshold for the delta of applying this transaction to the | 113 // it is the threshold for the delta of applying this transaction to the |
115 // parent transaction. Note that a buffered transaction could actually h
ave | 114 // parent transaction. Note that a buffered transaction could actually h
ave |
116 // a negative delta if the parent transaction had many large entities wh
ich | 115 // a negative delta if the parent transaction had many large entities wh
ich |
117 // the inner transaction deleted. | 116 // the inner transaction deleted. |
118 sizeBudget int64 | 117 sizeBudget int64 |
119 // countBudget is the number of entity writes that this transaction has
to | 118 // countBudget is the number of entity writes that this transaction has
to |
120 // operate in. | 119 // operate in. |
121 writeCountBudget int | 120 writeCountBudget int |
122 } | 121 } |
123 | 122 |
124 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
tore.TransactionOptions) error { | 123 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
tore.TransactionOptions) error { |
125 inf := info.Get(ctx) | |
126 ns, _ := inf.GetNamespace() | |
127 | |
128 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) | 124 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) |
129 roots := stringset.New(0) | 125 roots := stringset.New(0) |
130 rootLimit := 1 | 126 rootLimit := 1 |
131 if opts != nil && opts.XG { | 127 if opts != nil && opts.XG { |
132 rootLimit = XGTransactionGroupLimit | 128 rootLimit = XGTransactionGroupLimit |
133 } | 129 } |
134 sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudg
et | 130 sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudg
et |
135 if parentState != nil { | 131 if parentState != nil { |
136 // TODO(riannucci): this is a bit wonky since it means that a ch
ild | 132 // TODO(riannucci): this is a bit wonky since it means that a ch
ild |
137 // transaction declaring XG=true will only get to modify 25 grou
ps IF | 133 // transaction declaring XG=true will only get to modify 25 grou
ps IF |
138 // they're same groups affected by the parent transactions. So i
nstead of | 134 // they're same groups affected by the parent transactions. So i
nstead of |
139 // respecting opts.XG for inner transactions, we just dup everyt
hing from | 135 // respecting opts.XG for inner transactions, we just dup everyt
hing from |
140 // the parent transaction. | 136 // the parent transaction. |
141 roots = parentState.roots.Dup() | 137 roots = parentState.roots.Dup() |
142 rootLimit = parentState.rootLimit | 138 rootLimit = parentState.rootLimit |
143 | 139 |
144 sizeBudget = parentState.sizeBudget - parentState.entState.total | 140 sizeBudget = parentState.sizeBudget - parentState.entState.total |
145 writeCountBudget = parentState.writeCountBudget - parentState.en
tState.numWrites() | 141 writeCountBudget = parentState.writeCountBudget - parentState.en
tState.numWrites() |
146 } | 142 } |
147 | 143 |
148 state := &txnBufState{ | 144 state := &txnBufState{ |
149 entState: &sizeTracker{}, | 145 entState: &sizeTracker{}, |
150 » » bufDS: memory.NewDatastore(inf).Raw(), | 146 » » bufDS: memory.NewDatastore(ctx, info.Raw(ctx)), |
151 roots: roots, | 147 roots: roots, |
152 rootLimit: rootLimit, | 148 rootLimit: rootLimit, |
153 » » ns: ns, | 149 » » kc: datastore.GetKeyContext(ctx), |
154 » » aid: inf.FullyQualifiedAppID(), | 150 » » parentDS: datastore.Raw(context.WithValue(ctx, dsTxnBufH
aveLock, true)), |
155 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufH
aveLock, true)).Raw(), | |
156 sizeBudget: sizeBudget, | 151 sizeBudget: sizeBudget, |
157 writeCountBudget: writeCountBudget, | 152 writeCountBudget: writeCountBudget, |
158 } | 153 } |
159 if err := cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil
{ | 154 if err := cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil
{ |
160 return err | 155 return err |
161 } | 156 } |
162 | 157 |
163 // no reason to unlock this ever. At this point it's toast. | 158 // no reason to unlock this ever. At this point it's toast. |
164 state.Lock() | 159 state.Lock() |
165 | 160 |
(...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
505 | 500 |
506 err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) error { | 501 err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa
p, _ datastore.CursorCB) error { |
507 toPutKeys = append(toPutKeys, key) | 502 toPutKeys = append(toPutKeys, key) |
508 toPut = append(toPut, data) | 503 toPut = append(toPut, data) |
509 return nil | 504 return nil |
510 }) | 505 }) |
511 memoryCorruption(err) | 506 memoryCorruption(err) |
512 | 507 |
513 for keyStr, size := range t.entState.keyToSize { | 508 for keyStr, size := range t.entState.keyToSize { |
514 if size == 0 { | 509 if size == 0 { |
515 » » » k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.aid, t.ns) | 510 » » » k, err := serialize.ReadKey(bytes.NewBufferString(keyStr
), serialize.WithoutContext, t.kc) |
516 memoryCorruption(err) | 511 memoryCorruption(err) |
517 toDel = append(toDel, k) | 512 toDel = append(toDel, k) |
518 } | 513 } |
519 } | 514 } |
520 | 515 |
521 return | 516 return |
522 } | 517 } |
523 | 518 |
524 func (t *txnBufState) canApplyLocked(s *txnBufState) error { | 519 func (t *txnBufState) canApplyLocked(s *txnBufState) error { |
525 proposedState := t.entState.dup() | 520 proposedState := t.entState.dup() |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
564 // plus a stringset of all the encoded root keys that `keys` represents. | 559 // plus a stringset of all the encoded root keys that `keys` represents. |
565 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | 560 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
566 roots = stringset.New(len(keys)) | 561 roots = stringset.New(len(keys)) |
567 full = make([]string, len(keys)) | 562 full = make([]string, len(keys)) |
568 for i, k := range keys { | 563 for i, k := range keys { |
569 roots.Add(string(serialize.ToBytes(k.Root()))) | 564 roots.Add(string(serialize.ToBytes(k.Root()))) |
570 full[i] = string(serialize.ToBytes(k)) | 565 full[i] = string(serialize.ToBytes(k)) |
571 } | 566 } |
572 return | 567 return |
573 } | 568 } |
OLD | NEW |