OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
11 "sync/atomic" | 11 "sync/atomic" |
12 | 12 |
13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
14 "github.com/luci/gae/service/datastore/dskey" | 14 "github.com/luci/gae/service/datastore/dskey" |
15 "github.com/luci/gae/service/datastore/serialize" | 15 "github.com/luci/gae/service/datastore/serialize" |
16 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
17 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
18 ) | 18 ) |
19 | 19 |
20 //////////////////////////////// dataStoreData ///////////////////////////////// | 20 //////////////////////////////// dataStoreData ///////////////////////////////// |
21 | 21 |
22 type dataStoreData struct { | 22 type dataStoreData struct { |
23 rwlock sync.RWMutex | 23 rwlock sync.RWMutex |
24 » // See README.md for store schema. | 24 » // See README.md for head schema. |
25 » store *memStore | 25 » head *memStore |
26 » snap *memStore | 26 » snap *memStore |
27 } | 27 } |
28 | 28 |
29 var ( | 29 var ( |
30 _ = memContextObj((*dataStoreData)(nil)) | 30 _ = memContextObj((*dataStoreData)(nil)) |
31 _ = sync.Locker((*dataStoreData)(nil)) | 31 _ = sync.Locker((*dataStoreData)(nil)) |
32 ) | 32 ) |
33 | 33 |
34 func newDataStoreData() *dataStoreData { | 34 func newDataStoreData() *dataStoreData { |
35 » store := newMemStore() | 35 » head := newMemStore() |
36 return &dataStoreData{ | 36 return &dataStoreData{ |
37 » » store: store, | 37 » » head: head, |
38 » » snap: store.Snapshot(), // empty but better than a nil pointer. | 38 » » snap: head.Snapshot(), // empty but better than a nil pointer. |
39 } | 39 } |
40 } | 40 } |
41 | 41 |
42 func (d *dataStoreData) Lock() { | 42 func (d *dataStoreData) Lock() { |
43 d.rwlock.Lock() | 43 d.rwlock.Lock() |
44 } | 44 } |
45 | 45 |
46 func (d *dataStoreData) Unlock() { | 46 func (d *dataStoreData) Unlock() { |
47 d.rwlock.Unlock() | 47 d.rwlock.Unlock() |
48 } | 48 } |
49 | 49 |
50 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { | 50 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
51 d.rwlock.RLock() | 51 d.rwlock.RLock() |
52 defer d.rwlock.RUnlock() | 52 defer d.rwlock.RUnlock() |
53 » head = d.store.Snapshot() | 53 » head = d.head.Snapshot() |
54 if consistent { | 54 if consistent { |
55 idx = head | 55 idx = head |
56 } else { | 56 } else { |
57 idx = d.snap | 57 idx = d.snap |
58 } | 58 } |
59 return | 59 return |
60 } | 60 } |
61 | 61 |
62 func (d *dataStoreData) takeSnapshot() *memStore { | 62 func (d *dataStoreData) takeSnapshot() *memStore { |
63 d.rwlock.RLock() | 63 d.rwlock.RLock() |
64 defer d.rwlock.RUnlock() | 64 defer d.rwlock.RUnlock() |
65 » return d.store.Snapshot() | 65 » return d.head.Snapshot() |
66 } | 66 } |
67 | 67 |
68 func (d *dataStoreData) setSnapshot(snap *memStore) { | 68 func (d *dataStoreData) setSnapshot(snap *memStore) { |
69 d.rwlock.Lock() | 69 d.rwlock.Lock() |
70 defer d.rwlock.Unlock() | 70 defer d.rwlock.Unlock() |
71 d.snap = snap | 71 d.snap = snap |
72 } | 72 } |
73 | 73 |
74 func (d *dataStoreData) catchupIndexes() { | 74 func (d *dataStoreData) catchupIndexes() { |
75 d.rwlock.Lock() | 75 d.rwlock.Lock() |
76 defer d.rwlock.Unlock() | 76 defer d.rwlock.Unlock() |
77 » d.snap = d.store.Snapshot() | 77 » d.snap = d.head.Snapshot() |
78 } | 78 } |
79 | 79 |
80 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 80 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
81 | 81 |
82 func groupMetaKey(key ds.Key) []byte { | 82 func groupMetaKey(key ds.Key) []byte { |
83 » return keyBytes(serialize.WithoutContext, | 83 » return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root( key))) |
84 » » dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key))) | |
85 } | 84 } |
86 | 85 |
87 func groupIDsKey(key ds.Key) []byte { | 86 func groupIDsKey(key ds.Key) []byte { |
88 » return keyBytes(serialize.WithoutContext, | 87 » return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.R oot(key))) |
89 » » dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key) )) | |
90 } | 88 } |
91 | 89 |
92 func rootIDsKey(kind string) []byte { | 90 func rootIDsKey(kind string) []byte { |
93 » return keyBytes(serialize.WithoutContext, | 91 » return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
94 » » dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) | |
95 } | 92 } |
96 | 93 |
97 func curVersion(ents *memCollection, key []byte) int64 { | 94 func curVersion(ents *memCollection, key []byte) int64 { |
98 if ents != nil { | 95 if ents != nil { |
99 if v := ents.Get(key); v != nil { | 96 if v := ents.Get(key); v != nil { |
100 pm, err := rpm(v) | 97 pm, err := rpm(v) |
101 » » » if err != nil { | 98 » » » memoryCorruption(err) |
102 » » » » panic(err) // memory corruption | 99 |
103 » » » } | |
104 pl, ok := pm["__version__"] | 100 pl, ok := pm["__version__"] |
105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 101 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
106 return pl[0].Value().(int64) | 102 return pl[0].Value().(int64) |
107 } | 103 } |
108 » » » panic(fmt.Errorf("__version__ property missing or wrong: %v", pm)) | 104 |
105 » » » memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm)) | |
109 } | 106 } |
110 } | 107 } |
111 return 0 | 108 return 0 |
112 } | 109 } |
113 | 110 |
114 func incrementLocked(ents *memCollection, key []byte) int64 { | 111 func incrementLocked(ents *memCollection, key []byte) int64 { |
115 ret := curVersion(ents, key) + 1 | 112 ret := curVersion(ents, key) + 1 |
116 » buf := &bytes.Buffer{} | 113 » ents.Set(key, serialize.ToBytes(ds.PropertyMap{ |
117 » serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ | 114 » » "__version__": {ds.MkPropertyNI(ret)}, |
118 » » "__version__": {ds.MkPropertyNI(ret)}}) | 115 » })) |
119 » ents.Set(key, buf.Bytes()) | |
120 return ret | 116 return ret |
121 } | 117 } |
122 | 118 |
123 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { | 119 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
124 coll := "ents:" + key.Namespace() | 120 coll := "ents:" + key.Namespace() |
125 » ents := d.store.GetCollection(coll) | 121 » ents := d.head.GetCollection(coll) |
126 if ents == nil { | 122 if ents == nil { |
127 » » ents = d.store.SetCollection(coll, nil) | 123 » » ents = d.head.SetCollection(coll, nil) |
128 } | 124 } |
129 | 125 |
130 if dskey.Incomplete(key) { | 126 if dskey.Incomplete(key) { |
131 idKey := []byte(nil) | 127 idKey := []byte(nil) |
132 if key.Parent() == nil { | 128 if key.Parent() == nil { |
133 idKey = rootIDsKey(key.Kind()) | 129 idKey = rootIDsKey(key.Kind()) |
134 } else { | 130 } else { |
135 idKey = groupIDsKey(key) | 131 idKey = groupIDsKey(key) |
136 } | 132 } |
137 id := incrementLocked(ents, idKey) | 133 id := incrementLocked(ents, idKey) |
138 key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) | 134 key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) |
139 } | 135 } |
140 | 136 |
141 return ents, key | 137 return ents, key |
142 } | 138 } |
143 | 139 |
144 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { | 140 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { |
145 for i, k := range keys { | 141 for i, k := range keys { |
146 buf := &bytes.Buffer{} | |
147 pmap, _ := vals[i].Save(false) | 142 pmap, _ := vals[i].Save(false) |
148 » » serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap) | 143 » » dataBytes := serialize.ToBytes(pmap) |
149 » » dataBytes := buf.Bytes() | |
150 | 144 |
151 k, err := func() (ret ds.Key, err error) { | 145 k, err := func() (ret ds.Key, err error) { |
152 d.rwlock.Lock() | 146 d.rwlock.Lock() |
153 defer d.rwlock.Unlock() | 147 defer d.rwlock.Unlock() |
154 | 148 |
155 ents, ret := d.entsKeyLocked(k) | 149 ents, ret := d.entsKeyLocked(k) |
156 incrementLocked(ents, groupMetaKey(ret)) | 150 incrementLocked(ents, groupMetaKey(ret)) |
157 | 151 |
158 » » » old := ents.Get(keyBytes(serialize.WithoutContext, ret)) | 152 » » » old := ents.Get(keyBytes(ret)) |
159 oldPM := ds.PropertyMap(nil) | 153 oldPM := ds.PropertyMap(nil) |
160 if old != nil { | 154 if old != nil { |
161 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { | 155 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { |
162 return | 156 return |
163 } | 157 } |
164 } | 158 } |
165 » » » updateIndicies(d.store, ret, oldPM, pmap) | 159 » » » updateIndicies(d.head, ret, oldPM, pmap) |
166 » » » ents.Set(keyBytes(serialize.WithoutContext, ret), dataBy tes) | 160 » » » ents.Set(keyBytes(ret), dataBytes) |
167 return | 161 return |
168 }() | 162 }() |
169 if cb != nil { | 163 if cb != nil { |
170 cb(k, err) | 164 cb(k, err) |
171 } | 165 } |
172 } | 166 } |
173 } | 167 } |
174 | 168 |
175 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { | 169 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { |
176 ents, err := getColl() | 170 ents, err := getColl() |
177 if err != nil { | 171 if err != nil { |
178 return err | 172 return err |
179 } | 173 } |
180 if ents == nil { | 174 if ents == nil { |
181 for range keys { | 175 for range keys { |
182 cb(nil, ds.ErrNoSuchEntity) | 176 cb(nil, ds.ErrNoSuchEntity) |
183 } | 177 } |
184 return nil | 178 return nil |
185 } | 179 } |
186 | 180 |
187 for _, k := range keys { | 181 for _, k := range keys { |
188 » » pdata := ents.Get(keyBytes(serialize.WithoutContext, k)) | 182 » » pdata := ents.Get(keyBytes(k)) |
189 if pdata == nil { | 183 if pdata == nil { |
190 cb(nil, ds.ErrNoSuchEntity) | 184 cb(nil, ds.ErrNoSuchEntity) |
191 continue | 185 continue |
192 } | 186 } |
193 cb(rpmWoCtx(pdata, k.Namespace())) | 187 cb(rpmWoCtx(pdata, k.Namespace())) |
194 } | 188 } |
195 return nil | 189 return nil |
196 } | 190 } |
197 | 191 |
198 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | 192 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
199 getMultiInner(keys, cb, func() (*memCollection, error) { | 193 getMultiInner(keys, cb, func() (*memCollection, error) { |
200 s := d.takeSnapshot() | 194 s := d.takeSnapshot() |
201 | 195 |
202 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 196 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
203 }) | 197 }) |
204 return nil | 198 return nil |
205 } | 199 } |
206 | 200 |
207 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { | 201 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
208 toDel := make([][]byte, 0, len(keys)) | 202 toDel := make([][]byte, 0, len(keys)) |
209 for _, k := range keys { | 203 for _, k := range keys { |
210 » » toDel = append(toDel, keyBytes(serialize.WithoutContext, k)) | 204 » » toDel = append(toDel, keyBytes(k)) |
211 } | 205 } |
212 ns := keys[0].Namespace() | 206 ns := keys[0].Namespace() |
213 | 207 |
214 d.rwlock.Lock() | 208 d.rwlock.Lock() |
215 defer d.rwlock.Unlock() | 209 defer d.rwlock.Unlock() |
216 | 210 |
217 » ents := d.store.GetCollection("ents:" + ns) | 211 » ents := d.head.GetCollection("ents:" + ns) |
218 | 212 |
219 for i, k := range keys { | 213 for i, k := range keys { |
220 if ents != nil { | 214 if ents != nil { |
221 incrementLocked(ents, groupMetaKey(k)) | 215 incrementLocked(ents, groupMetaKey(k)) |
222 kb := toDel[i] | 216 kb := toDel[i] |
223 if old := ents.Get(kb); old != nil { | 217 if old := ents.Get(kb); old != nil { |
224 oldPM, err := rpmWoCtx(old, ns) | 218 oldPM, err := rpmWoCtx(old, ns) |
225 if err != nil { | 219 if err != nil { |
226 if cb != nil { | 220 if cb != nil { |
227 cb(err) | 221 cb(err) |
228 } | 222 } |
229 continue | 223 continue |
230 } | 224 } |
231 » » » » updateIndicies(d.store, k, oldPM, nil) | 225 » » » » updateIndicies(d.head, k, oldPM, nil) |
232 ents.Delete(kb) | 226 ents.Delete(kb) |
233 } | 227 } |
234 } | 228 } |
235 if cb != nil { | 229 if cb != nil { |
236 cb(nil) | 230 cb(nil) |
237 } | 231 } |
238 } | 232 } |
239 } | 233 } |
240 | 234 |
241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 235 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 236 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
243 | 237 |
244 txn := obj.(*txnDataStoreData) | 238 txn := obj.(*txnDataStoreData) |
245 for rk, muts := range txn.muts { | 239 for rk, muts := range txn.muts { |
246 if len(muts) == 0 { // read-only | 240 if len(muts) == 0 { // read-only |
247 continue | 241 continue |
248 } | 242 } |
249 » » k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize .WithContext, "", "") | 243 » » prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "") |
250 » » if err != nil { | 244 » » memoryCorruption(err) |
251 » » » panic(err) | 245 |
252 » » } | 246 » » k := prop.Value().(ds.Key) |
253 | 247 |
254 entKey := "ents:" + k.Namespace() | 248 entKey := "ents:" + k.Namespace() |
255 mkey := groupMetaKey(k) | 249 mkey := groupMetaKey(k) |
256 » » entsHead := d.store.GetCollection(entKey) | 250 » » entsHead := d.head.GetCollection(entKey) |
257 entsSnap := txn.snap.GetCollection(entKey) | 251 entsSnap := txn.snap.GetCollection(entKey) |
258 vHead := curVersion(entsHead, mkey) | 252 vHead := curVersion(entsHead, mkey) |
259 vSnap := curVersion(entsSnap, mkey) | 253 vSnap := curVersion(entsSnap, mkey) |
260 if vHead != vSnap { | 254 if vHead != vSnap { |
261 return false | 255 return false |
262 } | 256 } |
263 } | 257 } |
264 return true | 258 return true |
265 } | 259 } |
266 | 260 |
267 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | 261 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
268 txn := obj.(*txnDataStoreData) | 262 txn := obj.(*txnDataStoreData) |
269 for _, muts := range txn.muts { | 263 for _, muts := range txn.muts { |
270 if len(muts) == 0 { // read-only | 264 if len(muts) == 0 { // read-only |
271 continue | 265 continue |
272 } | 266 } |
273 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti | 267 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti |
274 for _, m := range muts { | 268 for _, m := range muts { |
275 err := error(nil) | 269 err := error(nil) |
276 k := m.key | 270 k := m.key |
277 if m.data == nil { | 271 if m.data == nil { |
278 d.delMulti([]ds.Key{k}, | 272 d.delMulti([]ds.Key{k}, |
279 func(e error) { err = e }) | 273 func(e error) { err = e }) |
280 } else { | 274 } else { |
281 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, | 275 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, |
282 func(_ ds.Key, e error) { err = e }) | 276 func(_ ds.Key, e error) { err = e }) |
283 } | 277 } |
284 » » » err = errors.SingleError(err) | 278 » » » impossible(err) |
285 » » » if err != nil { | |
286 » » » » panic(err) | |
287 » » » } | |
288 } | 279 } |
289 } | 280 } |
290 } | 281 } |
291 | 282 |
292 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | 283 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
293 return &txnDataStoreData{ | 284 return &txnDataStoreData{ |
294 // alias to the main datastore's so that testing code can have p rimitive | 285 // alias to the main datastore's so that testing code can have p rimitive |
295 // access to break features inside of transactions. | 286 // access to break features inside of transactions. |
296 parent: d, | 287 parent: d, |
297 isXG: o != nil && o.XG, | 288 isXG: o != nil && o.XG, |
298 » » snap: d.store.Snapshot(), | 289 » » snap: d.head.Snapshot(), |
299 muts: map[string][]txnMutation{}, | 290 muts: map[string][]txnMutation{}, |
300 } | 291 } |
301 } | 292 } |
302 | 293 |
303 func (d *dataStoreData) endTxn() {} | 294 func (d *dataStoreData) endTxn() {} |
304 | 295 |
305 /////////////////////////////// txnDataStoreData /////////////////////////////// | 296 /////////////////////////////// txnDataStoreData /////////////////////////////// |
306 | 297 |
307 type txnMutation struct { | 298 type txnMutation struct { |
308 key ds.Key | 299 key ds.Key |
(...skipping 22 matching lines...) Expand all Loading... | |
331 const xgEGLimit = 25 | 322 const xgEGLimit = 25 |
332 | 323 |
333 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 324 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
334 func (td *txnDataStoreData) endTxn() { | 325 func (td *txnDataStoreData) endTxn() { |
335 if atomic.LoadInt32(&td.closed) == 1 { | 326 if atomic.LoadInt32(&td.closed) == 1 { |
336 panic("cannot end transaction twice") | 327 panic("cannot end transaction twice") |
337 } | 328 } |
338 atomic.StoreInt32(&td.closed, 1) | 329 atomic.StoreInt32(&td.closed, 1) |
339 } | 330 } |
340 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 331 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
341 » panic("txnDataStoreData cannot apply transactions") | 332 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
dnj (Google)
2015/08/28 17:54:21
"create"
iannucci
2015/08/28 19:48:55
done
| |
342 } | 333 } |
343 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { | 334 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
344 » panic("impossible") | 335 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
dnj (Google)
2015/08/28 17:54:21
"create"
iannucci
2015/08/28 19:48:55
done
| |
336 » return nil | |
345 } | 337 } |
346 | 338 |
347 func (td *txnDataStoreData) run(f func() error) error { | 339 func (td *txnDataStoreData) run(f func() error) error { |
348 // Slightly different from the SDK... datastore and taskqueue each imple ment | 340 // Slightly different from the SDK... datastore and taskqueue each imple ment |
349 // this here, where in the SDK only datastore.transaction.Call does. | 341 // this here, where in the SDK only datastore.transaction.Call does. |
350 if atomic.LoadInt32(&td.closed) == 1 { | 342 if atomic.LoadInt32(&td.closed) == 1 { |
351 return errors.New("datastore: transaction context has expired") | 343 return errors.New("datastore: transaction context has expired") |
352 } | 344 } |
353 return f() | 345 return f() |
354 } | 346 } |
355 | 347 |
356 // writeMutation ensures that this transaction can support the given key/value | 348 // writeMutation ensures that this transaction can support the given key/value |
357 // mutation. | 349 // mutation. |
358 // | 350 // |
359 // if getOnly is true, don't record the actual mutation data, just ensure that | 351 // if getOnly is true, don't record the actual mutation data, just ensure that |
360 // the key is in an included entity group (or add an empty entry for tha t | 352 // the key is in an included entity group (or add an empty entry for tha t |
361 // group). | 353 // group). |
362 // | 354 // |
363 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 355 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
364 // | 356 // |
365 // Returns an error if this key causes the transaction to cross too many entity | 357 // Returns an error if this key causes the transaction to cross too many entity |
366 // groups. | 358 // groups. |
367 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { | 359 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { |
368 » rk := string(keyBytes(serialize.WithContext, dskey.Root(key))) | 360 » rk := string(keyBytes(dskey.Root(key))) |
369 | 361 |
370 td.Lock() | 362 td.Lock() |
371 defer td.Unlock() | 363 defer td.Unlock() |
372 | 364 |
373 if _, ok := td.muts[rk]; !ok { | 365 if _, ok := td.muts[rk]; !ok { |
374 limit := 1 | 366 limit := 1 |
375 if td.isXG { | 367 if td.isXG { |
376 limit = xgEGLimit | 368 limit = xgEGLimit |
377 } | 369 } |
378 if len(td.muts)+1 > limit { | 370 if len(td.muts)+1 > limit { |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
421 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | 413 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
422 for _, k := range keys { | 414 for _, k := range keys { |
423 err := td.writeMutation(false, k, nil) | 415 err := td.writeMutation(false, k, nil) |
424 if cb != nil { | 416 if cb != nil { |
425 cb(err) | 417 cb(err) |
426 } | 418 } |
427 } | 419 } |
428 return nil | 420 return nil |
429 } | 421 } |
430 | 422 |
431 func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte { | 423 func keyBytes(key ds.Key) []byte { |
432 » buf := &bytes.Buffer{} | 424 » return serialize.ToBytes(ds.MkProperty(key)) |
433 » serialize.WriteKey(buf, ctx, key) | |
434 » return buf.Bytes() | |
435 } | 425 } |
436 | 426 |
437 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | 427 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
438 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 428 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
439 serialize.WithoutContext, globalAppID, ns) | 429 serialize.WithoutContext, globalAppID, ns) |
440 } | 430 } |
441 | 431 |
442 func rpm(data []byte) (ds.PropertyMap, error) { | 432 func rpm(data []byte) (ds.PropertyMap, error) { |
443 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 433 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
444 serialize.WithContext, "", "") | 434 serialize.WithContext, "", "") |
445 } | 435 } |
446 | 436 |
447 type keyitem interface { | 437 type keyitem interface { |
448 Key() ds.Key | 438 Key() ds.Key |
449 } | 439 } |
OLD | NEW |