OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
91 | 91 |
92 func rootIDsKey(kind string) []byte { | 92 func rootIDsKey(kind string) []byte { |
93 return keyBytes(serialize.WithoutContext, | 93 return keyBytes(serialize.WithoutContext, |
94 dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) | 94 dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
95 } | 95 } |
96 | 96 |
97 func curVersion(ents *memCollection, key []byte) int64 { | 97 func curVersion(ents *memCollection, key []byte) int64 { |
98 if ents != nil { | 98 if ents != nil { |
99 if v := ents.Get(key); v != nil { | 99 if v := ents.Get(key); v != nil { |
100 pm, err := rpm(v) | 100 pm, err := rpm(v) |
101 » » » if err != nil { | 101 » » » memoryCorruption(err) |
102 » » » » panic(err) // memory corruption | 102 |
103 » » » } | |
104 pl, ok := pm["__version__"] | 103 pl, ok := pm["__version__"] |
105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 104 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
106 return pl[0].Value().(int64) | 105 return pl[0].Value().(int64) |
107 } | 106 } |
108 » » » panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) | 107 |
| 108 » » » memoryCorruption(fmt.Errorf("__version__ property missin
g or wrong: %v", pm)) |
109 } | 109 } |
110 } | 110 } |
111 return 0 | 111 return 0 |
112 } | 112 } |
113 | 113 |
114 func incrementLocked(ents *memCollection, key []byte) int64 { | 114 func incrementLocked(ents *memCollection, key []byte) int64 { |
115 ret := curVersion(ents, key) + 1 | 115 ret := curVersion(ents, key) + 1 |
116 buf := &bytes.Buffer{} | 116 buf := &bytes.Buffer{} |
117 serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ | 117 serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ |
118 "__version__": {ds.MkPropertyNI(ret)}}) | 118 "__version__": {ds.MkPropertyNI(ret)}}) |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
240 | 240 |
241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
243 | 243 |
244 txn := obj.(*txnDataStoreData) | 244 txn := obj.(*txnDataStoreData) |
245 for rk, muts := range txn.muts { | 245 for rk, muts := range txn.muts { |
246 if len(muts) == 0 { // read-only | 246 if len(muts) == 0 { // read-only |
247 continue | 247 continue |
248 } | 248 } |
249 k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize
.WithContext, "", "") | 249 k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize
.WithContext, "", "") |
250 » » if err != nil { | 250 » » memoryCorruption(err) |
251 » » » panic(err) | |
252 » » } | |
253 | 251 |
254 entKey := "ents:" + k.Namespace() | 252 entKey := "ents:" + k.Namespace() |
255 mkey := groupMetaKey(k) | 253 mkey := groupMetaKey(k) |
256 entsHead := d.store.GetCollection(entKey) | 254 entsHead := d.store.GetCollection(entKey) |
257 entsSnap := txn.snap.GetCollection(entKey) | 255 entsSnap := txn.snap.GetCollection(entKey) |
258 vHead := curVersion(entsHead, mkey) | 256 vHead := curVersion(entsHead, mkey) |
259 vSnap := curVersion(entsSnap, mkey) | 257 vSnap := curVersion(entsSnap, mkey) |
260 if vHead != vSnap { | 258 if vHead != vSnap { |
261 return false | 259 return false |
262 } | 260 } |
(...skipping 11 matching lines...) Expand all Loading... |
274 for _, m := range muts { | 272 for _, m := range muts { |
275 err := error(nil) | 273 err := error(nil) |
276 k := m.key | 274 k := m.key |
277 if m.data == nil { | 275 if m.data == nil { |
278 d.delMulti([]ds.Key{k}, | 276 d.delMulti([]ds.Key{k}, |
279 func(e error) { err = e }) | 277 func(e error) { err = e }) |
280 } else { | 278 } else { |
281 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, | 279 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, |
282 func(_ ds.Key, e error) { err = e }) | 280 func(_ ds.Key, e error) { err = e }) |
283 } | 281 } |
284 » » » err = errors.SingleError(err) | 282 » » » impossible(err) |
285 » » » if err != nil { | |
286 » » » » panic(err) | |
287 » » » } | |
288 } | 283 } |
289 } | 284 } |
290 } | 285 } |
291 | 286 |
292 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | 287 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
293 return &txnDataStoreData{ | 288 return &txnDataStoreData{ |
294 // alias to the main datastore's so that testing code can have p
rimitive | 289 // alias to the main datastore's so that testing code can have p
rimitive |
295 // access to break features inside of transactions. | 290 // access to break features inside of transactions. |
296 parent: d, | 291 parent: d, |
297 isXG: o != nil && o.XG, | 292 isXG: o != nil && o.XG, |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
331 const xgEGLimit = 25 | 326 const xgEGLimit = 25 |
332 | 327 |
333 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 328 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
334 func (td *txnDataStoreData) endTxn() { | 329 func (td *txnDataStoreData) endTxn() { |
335 if atomic.LoadInt32(&td.closed) == 1 { | 330 if atomic.LoadInt32(&td.closed) == 1 { |
336 panic("cannot end transaction twice") | 331 panic("cannot end transaction twice") |
337 } | 332 } |
338 atomic.StoreInt32(&td.closed, 1) | 333 atomic.StoreInt32(&td.closed, 1) |
339 } | 334 } |
340 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 335 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
341 » panic("txnDataStoreData cannot apply transactions") | 336 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
342 } | 337 } |
343 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { | 338 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
344 » panic("impossible") | 339 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
| 340 » return nil |
345 } | 341 } |
346 | 342 |
347 func (td *txnDataStoreData) run(f func() error) error { | 343 func (td *txnDataStoreData) run(f func() error) error { |
348 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 344 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
349 // this here, where in the SDK only datastore.transaction.Call does. | 345 // this here, where in the SDK only datastore.transaction.Call does. |
350 if atomic.LoadInt32(&td.closed) == 1 { | 346 if atomic.LoadInt32(&td.closed) == 1 { |
351 return errors.New("datastore: transaction context has expired") | 347 return errors.New("datastore: transaction context has expired") |
352 } | 348 } |
353 return f() | 349 return f() |
354 } | 350 } |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
440 } | 436 } |
441 | 437 |
442 func rpm(data []byte) (ds.PropertyMap, error) { | 438 func rpm(data []byte) (ds.PropertyMap, error) { |
443 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 439 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
444 serialize.WithContext, "", "") | 440 serialize.WithContext, "", "") |
445 } | 441 } |
446 | 442 |
447 type keyitem interface { | 443 type keyitem interface { |
448 Key() ds.Key | 444 Key() ds.Key |
449 } | 445 } |
OLD | NEW |