Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: impl/memory/datastore_data.go

Issue 2302743002: Interface update, per-method Contexts. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "strings" 10 "strings"
11 "sync" 11 "sync"
12 "sync/atomic"
13 12
14 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
15 "github.com/luci/gae/service/datastore/serialize" 14 "github.com/luci/gae/service/datastore/serialize"
16 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16
17 "golang.org/x/net/context" 17 "golang.org/x/net/context"
18 ) 18 )
19 19
20 //////////////////////////////// dataStoreData ///////////////////////////////// 20 //////////////////////////////// dataStoreData /////////////////////////////////
21 21
22 type dataStoreData struct { 22 type dataStoreData struct {
23 rwlock sync.RWMutex 23 rwlock sync.RWMutex
24 24
25 // the 'appid' of this datastore 25 // the 'appid' of this datastore
26 aid string 26 aid string
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after
169 func (d *dataStoreData) namespaces() []string { 169 func (d *dataStoreData) namespaces() []string {
170 d.rwlock.Lock() 170 d.rwlock.Lock()
171 defer d.rwlock.Unlock() 171 defer d.rwlock.Unlock()
172 172
173 return namespaces(d.head) 173 return namespaces(d.head)
174 } 174 }
175 175
176 /////////////////////////// indexes(dataStoreData) //////////////////////////// 176 /////////////////////////// indexes(dataStoreData) ////////////////////////////
177 177
178 func groupMetaKey(key *ds.Key) []byte { 178 func groupMetaKey(key *ds.Key) []byte {
179 » return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) 179 » return keyBytes(ds.KeyContext{}.NewKey("__entity_group__", "", 1, key.Ro ot()))
180 } 180 }
181 181
182 func groupIDsKey(key *ds.Key) []byte { 182 func groupIDsKey(key *ds.Key) []byte {
183 » return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo t())) 183 » return keyBytes(ds.KeyContext{}.NewKey("__entity_group_ids__", "", 1, ke y.Root()))
184 } 184 }
185 185
186 func rootIDsKey(kind string) []byte { 186 func rootIDsKey(kind string) []byte {
187 » return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) 187 » return keyBytes(ds.KeyContext{}.NewKey("__entity_root_ids__", kind, 0, n il))
188 } 188 }
189 189
190 func curVersion(ents memCollection, key []byte) int64 { 190 func curVersion(ents memCollection, key []byte) int64 {
191 if ents != nil { 191 if ents != nil {
192 if v := ents.Get(key); v != nil { 192 if v := ents.Get(key); v != nil {
193 pm, err := rpm(v) 193 pm, err := rpm(v)
194 memoryCorruption(err) 194 memoryCorruption(err)
195 195
196 pl, ok := pm["__version__"] 196 pl, ok := pm["__version__"]
197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { 197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
272 } 272 }
273 return incrementLocked(ents, idKey, n), nil 273 return incrementLocked(ents, idKey, n), nil
274 } 274 }
275 275
276 func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key, error) { 276 func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key, error) {
277 if key.IsIncomplete() { 277 if key.IsIncomplete() {
278 id, err := d.allocateIDsLocked(ents, key, 1) 278 id, err := d.allocateIDsLocked(ents, key, 1)
279 if err != nil { 279 if err != nil {
280 return key, err 280 return key, err
281 } 281 }
282 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) 282 » » key = key.KeyContext().NewKey(key.Kind(), "", id, key.Parent())
283 } 283 }
284 return key, nil 284 return key, nil
285 } 285 }
286 286
287 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Ne wKeyCB) error { 287 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Ne wKeyCB) error {
288 ns := keys[0].Namespace() 288 ns := keys[0].Namespace()
289 289
290 for i, k := range keys { 290 for i, k := range keys {
291 pmap, _ := vals[i].Save(false) 291 pmap, _ := vals[i].Save(false)
292 dataBytes := serialize.ToBytes(pmap) 292 dataBytes := serialize.ToBytes(pmap)
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
405 } 405 }
406 406
407 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { 407 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
408 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 408 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
409 409
410 txn := obj.(*txnDataStoreData) 410 txn := obj.(*txnDataStoreData)
411 for rk, muts := range txn.muts { 411 for rk, muts := range txn.muts {
412 if len(muts) == 0 { // read-only 412 if len(muts) == 0 { // read-only
413 continue 413 continue
414 } 414 }
415 » » prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "") 415 » » prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, ds.KeyContext{})
416 memoryCorruption(err) 416 memoryCorruption(err)
417 417
418 k := prop.Value().(*ds.Key) 418 k := prop.Value().(*ds.Key)
419 419
420 entKey := "ents:" + k.Namespace() 420 entKey := "ents:" + k.Namespace()
421 mkey := groupMetaKey(k) 421 mkey := groupMetaKey(k)
422 entsHead := d.head.GetCollection(entKey) 422 entsHead := d.head.GetCollection(entKey)
423 entsSnap := txn.snap.GetCollection(entKey) 423 entsSnap := txn.snap.GetCollection(entKey)
424 vHead := curVersion(entsHead, mkey) 424 vHead := curVersion(entsHead, mkey)
425 vSnap := curVersion(entsSnap, mkey) 425 vSnap := curVersion(entsSnap, mkey)
(...skipping 22 matching lines...) Expand all
448 } 448 }
449 } 449 }
450 } 450 }
451 } 451 }
452 452
453 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { 453 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
454 return &txnDataStoreData{ 454 return &txnDataStoreData{
455 // alias to the main datastore's so that testing code can have p rimitive 455 // alias to the main datastore's so that testing code can have p rimitive
456 // access to break features inside of transactions. 456 // access to break features inside of transactions.
457 parent: d, 457 parent: d,
458 » » isXG: o != nil && o.XG, 458 » » txn: &transactionImpl{
459 » » snap: d.head.Snapshot(), 459 » » » isXG: o != nil && o.XG,
460 » » muts: map[string][]txnMutation{}, 460 » » },
461 » » snap: d.head.Snapshot(),
462 » » muts: map[string][]txnMutation{},
461 } 463 }
462 } 464 }
463 465
464 func (d *dataStoreData) endTxn() {} 466 func (d *dataStoreData) endTxn() {}
465 467
466 /////////////////////////////// txnDataStoreData /////////////////////////////// 468 /////////////////////////////// txnDataStoreData ///////////////////////////////
467 469
468 type txnMutation struct { 470 type txnMutation struct {
469 key *ds.Key 471 key *ds.Key
470 data ds.PropertyMap 472 data ds.PropertyMap
471 } 473 }
472 474
473 type txnDataStoreData struct { 475 type txnDataStoreData struct {
474 sync.Mutex 476 sync.Mutex
475 477
476 parent *dataStoreData 478 parent *dataStoreData
477 479
478 » // boolean 0 or 1, use atomic.*Int32 to access. 480 » txn *transactionImpl
479 » closed int32
480 » isXG bool
481 481
482 snap memStore 482 snap memStore
483 483
484 // string is the raw-bytes encoding of the entity root incl. namespace 484 // string is the raw-bytes encoding of the entity root incl. namespace
485 muts map[string][]txnMutation 485 muts map[string][]txnMutation
486 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing 486 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
487 // length of encoded keys + values. 487 // length of encoded keys + values.
488 } 488 }
489 489
490 var _ memContextObj = (*txnDataStoreData)(nil) 490 var _ memContextObj = (*txnDataStoreData)(nil)
491 491
492 const xgEGLimit = 25 492 const xgEGLimit = 25
493 493
494 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } 494 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
495
495 func (td *txnDataStoreData) endTxn() { 496 func (td *txnDataStoreData) endTxn() {
496 » if atomic.LoadInt32(&td.closed) == 1 { 497 » if err := td.txn.close(); err != nil {
497 » » panic("cannot end transaction twice") 498 » » panic(err)
498 } 499 }
499 atomic.StoreInt32(&td.closed, 1)
500 } 500 }
501 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { 501 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
502 impossible(fmt.Errorf("cannot create a recursive transaction")) 502 impossible(fmt.Errorf("cannot create a recursive transaction"))
503 } 503 }
504 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { 504 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj {
505 impossible(fmt.Errorf("cannot create a recursive transaction")) 505 impossible(fmt.Errorf("cannot create a recursive transaction"))
506 return nil 506 return nil
507 } 507 }
508 508
509 func (td *txnDataStoreData) run(f func() error) error { 509 func (td *txnDataStoreData) run(f func() error) error {
510 // Slightly different from the SDK... datastore and taskqueue each imple ment 510 // Slightly different from the SDK... datastore and taskqueue each imple ment
511 // this here, where in the SDK only datastore.transaction.Call does. 511 // this here, where in the SDK only datastore.transaction.Call does.
512 » if atomic.LoadInt32(&td.closed) == 1 { 512 » if err := td.txn.valid(); err != nil {
513 » » return errors.New("datastore: transaction context has expired") 513 » » return err
514 } 514 }
515 return f() 515 return f()
516 } 516 }
517 517
518 // writeMutation ensures that this transaction can support the given key/value 518 // writeMutation ensures that this transaction can support the given key/value
519 // mutation. 519 // mutation.
520 // 520 //
521 // if getOnly is true, don't record the actual mutation data, just ensure that 521 // if getOnly is true, don't record the actual mutation data, just ensure that
522 // the key is in an included entity group (or add an empty entry for tha t 522 // the key is in an included entity group (or add an empty entry for tha t
523 // group). 523 // group).
524 // 524 //
525 // if !getOnly && data == nil, this counts as a deletion instead of a Put. 525 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
526 // 526 //
527 // Returns an error if this key causes the transaction to cross too many entity 527 // Returns an error if this key causes the transaction to cross too many entity
528 // groups. 528 // groups.
529 func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro pertyMap) error { 529 func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro pertyMap) error {
530 rk := string(keyBytes(key.Root())) 530 rk := string(keyBytes(key.Root()))
531 531
532 td.Lock() 532 td.Lock()
533 defer td.Unlock() 533 defer td.Unlock()
534 534
535 if _, ok := td.muts[rk]; !ok { 535 if _, ok := td.muts[rk]; !ok {
536 limit := 1 536 limit := 1
537 » » if td.isXG { 537 » » if td.txn.isXG {
538 limit = xgEGLimit 538 limit = xgEGLimit
539 } 539 }
540 if len(td.muts)+1 > limit { 540 if len(td.muts)+1 > limit {
541 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" 541 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
542 » » » if td.isXG { 542 » » » if td.txn.isXG {
543 msg = "operating on too many entity groups in a single transaction" 543 msg = "operating on too many entity groups in a single transaction"
544 } 544 }
545 return errors.New(msg) 545 return errors.New(msg)
546 } 546 }
547 td.muts[rk] = []txnMutation{} 547 td.muts[rk] = []txnMutation{}
548 } 548 }
549 if !getOnly { 549 if !getOnly {
550 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) 550 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
551 } 551 }
552 552
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
596 } 596 }
597 return nil 597 return nil
598 } 598 }
599 599
600 func keyBytes(key *ds.Key) []byte { 600 func keyBytes(key *ds.Key) []byte {
601 return serialize.ToBytes(ds.MkProperty(key)) 601 return serialize.ToBytes(ds.MkProperty(key))
602 } 602 }
603 603
604 func rpm(data []byte) (ds.PropertyMap, error) { 604 func rpm(data []byte) (ds.PropertyMap, error) {
605 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 605 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
606 » » serialize.WithContext, "", "") 606 » » serialize.WithContext, ds.KeyContext{})
607 } 607 }
608 608
609 func namespaces(store memStore) []string { 609 func namespaces(store memStore) []string {
610 var namespaces []string 610 var namespaces []string
611 for _, c := range store.GetCollectionNames() { 611 for _, c := range store.GetCollectionNames() {
612 ns, has := trimPrefix(c, "ents:") 612 ns, has := trimPrefix(c, "ents:")
613 if !has { 613 if !has {
614 if len(namespaces) > 0 { 614 if len(namespaces) > 0 {
615 break 615 break
616 } 616 }
617 continue 617 continue
618 } 618 }
619 namespaces = append(namespaces, ns) 619 namespaces = append(namespaces, ns)
620 } 620 }
621 return namespaces 621 return namespaces
622 } 622 }
623 623
624 func trimPrefix(v, p string) (string, bool) { 624 func trimPrefix(v, p string) (string, bool) {
625 if strings.HasPrefix(v, p) { 625 if strings.HasPrefix(v, p) {
626 return v[len(p):], true 626 return v[len(p):], true
627 } 627 }
628 return v, false 628 return v, false
629 } 629 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698