Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(591)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: Baby's first query! Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 91
92 func rootIDsKey(kind string) []byte { 92 func rootIDsKey(kind string) []byte {
93 return keyBytes(serialize.WithoutContext, 93 return keyBytes(serialize.WithoutContext,
94 dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) 94 dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
95 } 95 }
96 96
97 func curVersion(ents *memCollection, key []byte) int64 { 97 func curVersion(ents *memCollection, key []byte) int64 {
98 if ents != nil { 98 if ents != nil {
99 if v := ents.Get(key); v != nil { 99 if v := ents.Get(key); v != nil {
100 pm, err := rpm(v) 100 pm, err := rpm(v)
101 » » » if err != nil { 101 » » » memoryCorruption(err)
102 » » » » panic(err) // memory corruption 102
103 » » » }
104 pl, ok := pm["__version__"] 103 pl, ok := pm["__version__"]
105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { 104 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
106 return pl[0].Value().(int64) 105 return pl[0].Value().(int64)
107 } 106 }
108 » » » panic(fmt.Errorf("__version__ property missing or wrong: %v", pm)) 107
108 » » » memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm))
109 } 109 }
110 } 110 }
111 return 0 111 return 0
112 } 112 }
113 113
114 func incrementLocked(ents *memCollection, key []byte) int64 { 114 func incrementLocked(ents *memCollection, key []byte) int64 {
115 ret := curVersion(ents, key) + 1 115 ret := curVersion(ents, key) + 1
116 buf := &bytes.Buffer{} 116 buf := &bytes.Buffer{}
117 serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ 117 serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{
118 "__version__": {ds.MkPropertyNI(ret)}}) 118 "__version__": {ds.MkPropertyNI(ret)}})
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
240 240
241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
243 243
244 txn := obj.(*txnDataStoreData) 244 txn := obj.(*txnDataStoreData)
245 for rk, muts := range txn.muts { 245 for rk, muts := range txn.muts {
246 if len(muts) == 0 { // read-only 246 if len(muts) == 0 { // read-only
247 continue 247 continue
248 } 248 }
249 k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize .WithContext, "", "") 249 k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize .WithContext, "", "")
250 » » if err != nil { 250 » » memoryCorruption(err)
251 » » » panic(err)
252 » » }
253 251
254 entKey := "ents:" + k.Namespace() 252 entKey := "ents:" + k.Namespace()
255 mkey := groupMetaKey(k) 253 mkey := groupMetaKey(k)
256 entsHead := d.store.GetCollection(entKey) 254 entsHead := d.store.GetCollection(entKey)
257 entsSnap := txn.snap.GetCollection(entKey) 255 entsSnap := txn.snap.GetCollection(entKey)
258 vHead := curVersion(entsHead, mkey) 256 vHead := curVersion(entsHead, mkey)
259 vSnap := curVersion(entsSnap, mkey) 257 vSnap := curVersion(entsSnap, mkey)
260 if vHead != vSnap { 258 if vHead != vSnap {
261 return false 259 return false
262 } 260 }
(...skipping 11 matching lines...) Expand all
274 for _, m := range muts { 272 for _, m := range muts {
275 err := error(nil) 273 err := error(nil)
276 k := m.key 274 k := m.key
277 if m.data == nil { 275 if m.data == nil {
278 d.delMulti([]ds.Key{k}, 276 d.delMulti([]ds.Key{k},
279 func(e error) { err = e }) 277 func(e error) { err = e })
280 } else { 278 } else {
281 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, 279 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata},
282 func(_ ds.Key, e error) { err = e }) 280 func(_ ds.Key, e error) { err = e })
283 } 281 }
284 » » » err = errors.SingleError(err) 282 » » » impossible(err)
285 » » » if err != nil {
286 » » » » panic(err)
287 » » » }
288 } 283 }
289 } 284 }
290 } 285 }
291 286
292 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { 287 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
293 return &txnDataStoreData{ 288 return &txnDataStoreData{
294 // alias to the main datastore's so that testing code can have p rimitive 289 // alias to the main datastore's so that testing code can have p rimitive
295 // access to break features inside of transactions. 290 // access to break features inside of transactions.
296 parent: d, 291 parent: d,
297 isXG: o != nil && o.XG, 292 isXG: o != nil && o.XG,
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
331 const xgEGLimit = 25 326 const xgEGLimit = 25
332 327
333 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } 328 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
334 func (td *txnDataStoreData) endTxn() { 329 func (td *txnDataStoreData) endTxn() {
335 if atomic.LoadInt32(&td.closed) == 1 { 330 if atomic.LoadInt32(&td.closed) == 1 {
336 panic("cannot end transaction twice") 331 panic("cannot end transaction twice")
337 } 332 }
338 atomic.StoreInt32(&td.closed, 1) 333 atomic.StoreInt32(&td.closed, 1)
339 } 334 }
340 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { 335 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
341 » panic("txnDataStoreData cannot apply transactions") 336 » impossible(fmt.Errorf("cannot creat a recursive transaction"))
342 } 337 }
343 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { 338 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj {
344 » panic("impossible") 339 » impossible(fmt.Errorf("cannot creat a recursive transaction"))
340 » return nil
345 } 341 }
346 342
347 func (td *txnDataStoreData) run(f func() error) error { 343 func (td *txnDataStoreData) run(f func() error) error {
348 // Slightly different from the SDK... datastore and taskqueue each imple ment 344 // Slightly different from the SDK... datastore and taskqueue each imple ment
349 // this here, where in the SDK only datastore.transaction.Call does. 345 // this here, where in the SDK only datastore.transaction.Call does.
350 if atomic.LoadInt32(&td.closed) == 1 { 346 if atomic.LoadInt32(&td.closed) == 1 {
351 return errors.New("datastore: transaction context has expired") 347 return errors.New("datastore: transaction context has expired")
352 } 348 }
353 return f() 349 return f()
354 } 350 }
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
440 } 436 }
441 437
442 func rpm(data []byte) (ds.PropertyMap, error) { 438 func rpm(data []byte) (ds.PropertyMap, error) {
443 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 439 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
444 serialize.WithContext, "", "") 440 serialize.WithContext, "", "")
445 } 441 }
446 442
447 type keyitem interface { 443 type keyitem interface {
448 Key() ds.Key 444 Key() ds.Key
449 } 445 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698