| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 91 | 91 |
| 92 func rootIDsKey(kind string) []byte { | 92 func rootIDsKey(kind string) []byte { |
| 93 return keyBytes(serialize.WithoutContext, | 93 return keyBytes(serialize.WithoutContext, |
| 94 dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) | 94 dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
| 95 } | 95 } |
| 96 | 96 |
| 97 func curVersion(ents *memCollection, key []byte) int64 { | 97 func curVersion(ents *memCollection, key []byte) int64 { |
| 98 if ents != nil { | 98 if ents != nil { |
| 99 if v := ents.Get(key); v != nil { | 99 if v := ents.Get(key); v != nil { |
| 100 pm, err := rpm(v) | 100 pm, err := rpm(v) |
| 101 » » » if err != nil { | 101 » » » memoryCorruption(err) |
| 102 » » » » panic(err) // memory corruption | 102 |
| 103 » » » } | |
| 104 pl, ok := pm["__version__"] | 103 pl, ok := pm["__version__"] |
| 105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 104 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
| 106 return pl[0].Value().(int64) | 105 return pl[0].Value().(int64) |
| 107 } | 106 } |
| 108 » » » panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) | 107 |
| 108 » » » memoryCorruption(fmt.Errorf("__version__ property missin
g or wrong: %v", pm)) |
| 109 } | 109 } |
| 110 } | 110 } |
| 111 return 0 | 111 return 0 |
| 112 } | 112 } |
| 113 | 113 |
| 114 func incrementLocked(ents *memCollection, key []byte) int64 { | 114 func incrementLocked(ents *memCollection, key []byte) int64 { |
| 115 ret := curVersion(ents, key) + 1 | 115 ret := curVersion(ents, key) + 1 |
| 116 buf := &bytes.Buffer{} | 116 buf := &bytes.Buffer{} |
| 117 serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ | 117 serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ |
| 118 "__version__": {ds.MkPropertyNI(ret)}}) | 118 "__version__": {ds.MkPropertyNI(ret)}}) |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 240 | 240 |
| 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| 243 | 243 |
| 244 txn := obj.(*txnDataStoreData) | 244 txn := obj.(*txnDataStoreData) |
| 245 for rk, muts := range txn.muts { | 245 for rk, muts := range txn.muts { |
| 246 if len(muts) == 0 { // read-only | 246 if len(muts) == 0 { // read-only |
| 247 continue | 247 continue |
| 248 } | 248 } |
| 249 k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize
.WithContext, "", "") | 249 k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize
.WithContext, "", "") |
| 250 » » if err != nil { | 250 » » memoryCorruption(err) |
| 251 » » » panic(err) | |
| 252 » » } | |
| 253 | 251 |
| 254 entKey := "ents:" + k.Namespace() | 252 entKey := "ents:" + k.Namespace() |
| 255 mkey := groupMetaKey(k) | 253 mkey := groupMetaKey(k) |
| 256 entsHead := d.store.GetCollection(entKey) | 254 entsHead := d.store.GetCollection(entKey) |
| 257 entsSnap := txn.snap.GetCollection(entKey) | 255 entsSnap := txn.snap.GetCollection(entKey) |
| 258 vHead := curVersion(entsHead, mkey) | 256 vHead := curVersion(entsHead, mkey) |
| 259 vSnap := curVersion(entsSnap, mkey) | 257 vSnap := curVersion(entsSnap, mkey) |
| 260 if vHead != vSnap { | 258 if vHead != vSnap { |
| 261 return false | 259 return false |
| 262 } | 260 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 274 for _, m := range muts { | 272 for _, m := range muts { |
| 275 err := error(nil) | 273 err := error(nil) |
| 276 k := m.key | 274 k := m.key |
| 277 if m.data == nil { | 275 if m.data == nil { |
| 278 d.delMulti([]ds.Key{k}, | 276 d.delMulti([]ds.Key{k}, |
| 279 func(e error) { err = e }) | 277 func(e error) { err = e }) |
| 280 } else { | 278 } else { |
| 281 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, | 279 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, |
| 282 func(_ ds.Key, e error) { err = e }) | 280 func(_ ds.Key, e error) { err = e }) |
| 283 } | 281 } |
| 284 » » » err = errors.SingleError(err) | 282 » » » impossible(err) |
| 285 » » » if err != nil { | |
| 286 » » » » panic(err) | |
| 287 » » » } | |
| 288 } | 283 } |
| 289 } | 284 } |
| 290 } | 285 } |
| 291 | 286 |
| 292 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | 287 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
| 293 return &txnDataStoreData{ | 288 return &txnDataStoreData{ |
| 294 // alias to the main datastore's so that testing code can have p
rimitive | 289 // alias to the main datastore's so that testing code can have p
rimitive |
| 295 // access to break features inside of transactions. | 290 // access to break features inside of transactions. |
| 296 parent: d, | 291 parent: d, |
| 297 isXG: o != nil && o.XG, | 292 isXG: o != nil && o.XG, |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 331 const xgEGLimit = 25 | 326 const xgEGLimit = 25 |
| 332 | 327 |
| 333 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 328 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
| 334 func (td *txnDataStoreData) endTxn() { | 329 func (td *txnDataStoreData) endTxn() { |
| 335 if atomic.LoadInt32(&td.closed) == 1 { | 330 if atomic.LoadInt32(&td.closed) == 1 { |
| 336 panic("cannot end transaction twice") | 331 panic("cannot end transaction twice") |
| 337 } | 332 } |
| 338 atomic.StoreInt32(&td.closed, 1) | 333 atomic.StoreInt32(&td.closed, 1) |
| 339 } | 334 } |
| 340 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 335 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
| 341 » panic("txnDataStoreData cannot apply transactions") | 336 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
| 342 } | 337 } |
| 343 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { | 338 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
| 344 » panic("impossible") | 339 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
| 340 » return nil |
| 345 } | 341 } |
| 346 | 342 |
| 347 func (td *txnDataStoreData) run(f func() error) error { | 343 func (td *txnDataStoreData) run(f func() error) error { |
| 348 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 344 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
| 349 // this here, where in the SDK only datastore.transaction.Call does. | 345 // this here, where in the SDK only datastore.transaction.Call does. |
| 350 if atomic.LoadInt32(&td.closed) == 1 { | 346 if atomic.LoadInt32(&td.closed) == 1 { |
| 351 return errors.New("datastore: transaction context has expired") | 347 return errors.New("datastore: transaction context has expired") |
| 352 } | 348 } |
| 353 return f() | 349 return f() |
| 354 } | 350 } |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 440 } | 436 } |
| 441 | 437 |
| 442 func rpm(data []byte) (ds.PropertyMap, error) { | 438 func rpm(data []byte) (ds.PropertyMap, error) { |
| 443 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 439 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 444 serialize.WithContext, "", "") | 440 serialize.WithContext, "", "") |
| 445 } | 441 } |
| 446 | 442 |
| 447 type keyitem interface { | 443 type keyitem interface { |
| 448 Key() ds.Key | 444 Key() ds.Key |
| 449 } | 445 } |
| OLD | NEW |