| Index: go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
|
| diff --git a/go/src/infra/gae/libs/wrapper/memory/datastore_data.go b/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
|
| similarity index 55%
|
| rename from go/src/infra/gae/libs/wrapper/memory/datastore_data.go
|
| rename to go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
|
| index d36d52ab622c09dc1e2e64da884739f1e3252460..c786835c195b207c06348874f43932d342651c4b 100644
|
| --- a/go/src/infra/gae/libs/wrapper/memory/datastore_data.go
|
| +++ b/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
|
| @@ -5,51 +5,21 @@
|
| package memory
|
|
|
| import (
|
| + "bytes"
|
| "errors"
|
| - "infra/gae/libs/wrapper"
|
| - goon_internal "infra/gae/libs/wrapper/memory/internal/goon"
|
| + "fmt"
|
| + "golang.org/x/net/context"
|
| "sync"
|
| "sync/atomic"
|
|
|
| - "github.com/mjibson/goon"
|
| -
|
| - "appengine/datastore"
|
| - pb "appengine_internal/datastore"
|
| - "golang.org/x/net/context"
|
| + "infra/gae/libs/gae"
|
| + "infra/gae/libs/gae/helper"
|
| )
|
|
|
| -////////////////////////////////// knrKeeper ///////////////////////////////////
|
| -
|
| -type knrKeeper struct {
|
| - knrLock sync.Mutex
|
| - knrFunc goon.KindNameResolver
|
| -}
|
| -
|
| -var _ = wrapper.DSKindSetter((*knrKeeper)(nil))
|
| -
|
| -func (k *knrKeeper) KindNameResolver() goon.KindNameResolver {
|
| - k.knrLock.Lock()
|
| - defer k.knrLock.Unlock()
|
| - if k.knrFunc == nil {
|
| - k.knrFunc = goon.DefaultKindName
|
| - }
|
| - return k.knrFunc
|
| -}
|
| -
|
| -func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) {
|
| - k.knrLock.Lock()
|
| - defer k.knrLock.Unlock()
|
| - if knr == nil {
|
| - knr = goon.DefaultKindName
|
| - }
|
| - k.knrFunc = knr
|
| -}
|
| -
|
| //////////////////////////////// dataStoreData /////////////////////////////////
|
|
|
| type dataStoreData struct {
|
| - wrapper.BrokenFeatures
|
| - knrKeeper
|
| + gae.BrokenFeatures
|
|
|
| rwlock sync.RWMutex
|
| // See README.md for store schema.
|
| @@ -60,14 +30,13 @@ type dataStoreData struct {
|
| var (
|
| _ = memContextObj((*dataStoreData)(nil))
|
| _ = sync.Locker((*dataStoreData)(nil))
|
| - _ = wrapper.Testable((*dataStoreData)(nil))
|
| - _ = wrapper.DSKindSetter((*dataStoreData)(nil))
|
| + _ = gae.Testable((*dataStoreData)(nil))
|
| )
|
|
|
| func newDataStoreData() *dataStoreData {
|
| store := newMemStore()
|
| return &dataStoreData{
|
| - BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError(pb.Error_INTERNAL_ERROR)},
|
| + BrokenFeatures: gae.BrokenFeatures{DefaultError: errors.New("INTERNAL_ERROR")},
|
| store: store,
|
| snap: store.Snapshot(), // empty but better than a nil pointer.
|
| }
|
| @@ -83,59 +52,60 @@ func (d *dataStoreData) Unlock() {
|
|
|
| /////////////////////////// indicies(dataStoreData) ////////////////////////////
|
|
|
| -func groupMetaKey(key *datastore.Key) []byte {
|
| - return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key)))
|
| +func groupMetaKey(key gae.DSKey) []byte {
|
| + return keyBytes(helper.WithoutContext,
|
| + helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyRoot(key)))
|
| }
|
|
|
| -func groupIDsKey(key *datastore.Key) []byte {
|
| - return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey(key)))
|
| +func groupIDsKey(key gae.DSKey) []byte {
|
| + return keyBytes(helper.WithoutContext,
|
| + helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DSKeyRoot(key)))
|
| }
|
|
|
| func rootIDsKey(kind string) []byte {
|
| - return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil))
|
| + return keyBytes(helper.WithoutContext,
|
| + helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil))
|
| }
|
|
|
| func curVersion(ents *memCollection, key []byte) (int64, error) {
|
| if v := ents.Get(key); v != nil {
|
| - numData := &propertyList{}
|
| - if err := numData.UnmarshalBinary(v); err != nil {
|
| + pm, err := rpm(v)
|
| + if err != nil {
|
| return 0, err
|
| }
|
| - return (*numData)[0].Value.(int64), nil
|
| + pl, ok := pm["__version__"]
|
| + if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt {
|
| + return pl[0].Value().(int64), nil
|
| + }
|
| + return 0, fmt.Errorf("__version__ property missing or wrong: %v", pm)
|
| }
|
| return 0, nil
|
| }
|
|
|
| -func incrementLocked(ents *memCollection, key []byte) (int64, error) {
|
| - num := int64(0)
|
| - numData := &propertyList{}
|
| - if v := ents.Get(key); v != nil {
|
| - if err := numData.UnmarshalBinary(v); err != nil {
|
| - return 0, err
|
| - }
|
| - num = (*numData)[0].Value.(int64)
|
| - } else {
|
| - *numData = append(*numData, datastore.Property{Name: "__version__"})
|
| +func incrementLocked(ents *memCollection, key []byte) (ret int64, err error) {
|
| + if ret, err = curVersion(ents, key); err != nil {
|
| + ret = 0
|
| }
|
| - num++
|
| - (*numData)[0].Value = num
|
| - incData, err := numData.MarshalBinary()
|
| - if err != nil {
|
| - return 0, err
|
| + ret++
|
| + p := gae.DSProperty{}
|
| + if err = p.SetValue(ret, true); err != nil {
|
| + return
|
| }
|
| - ents.Set(key, incData)
|
| -
|
| - return num, nil
|
| + buf := &bytes.Buffer{}
|
| + helper.WriteDSPropertyMap(
|
| + buf, gae.DSPropertyMap{"__version__": {p}}, helper.WithContext)
|
| + ents.Set(key, buf.Bytes())
|
| + return
|
| }
|
|
|
| -func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *datastore.Key, error) {
|
| +func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey, error) {
|
| coll := "ents:" + key.Namespace()
|
| ents := d.store.GetCollection(coll)
|
| if ents == nil {
|
| ents = d.store.SetCollection(coll, nil)
|
| }
|
|
|
| - if key.Incomplete() {
|
| + if helper.DSKeyIncomplete(key) {
|
| idKey := []byte(nil)
|
| if key.Parent() == nil {
|
| idKey = rootIDsKey(key.Kind())
|
| @@ -146,39 +116,40 @@ func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data
|
| if err != nil {
|
| return nil, nil, err
|
| }
|
| - key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent())
|
| + key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent())
|
| }
|
|
|
| return ents, key, nil
|
| }
|
|
|
| -func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastore.Key, *propertyList, error) {
|
| - key := newKeyObj(ns, knr, src)
|
| - if !keyCouldBeValid(ns, key, userKeyOnly) {
|
| +func putPrelim(ns string, key gae.DSKey, src interface{}) (gae.DSPropertyMap, error) {
|
| + if !keyCouldBeValid(key, ns, false) {
|
| // TODO(riannucci): different error for Put-ing to reserved Keys?
|
| - return nil, nil, datastore.ErrInvalidKey
|
| + return nil, gae.ErrDSInvalidKey
|
| }
|
|
|
| - data, err := toPL(src)
|
| - return key, data, err
|
| + pls, err := helper.GetPLS(src)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return pls.Save()
|
| }
|
|
|
| -func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
|
| - key, plData, err := putPrelim(ns, d.KindNameResolver(), src)
|
| +func (d *dataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.DSKey, error) {
|
| + pmData, err := putPrelim(ns, key, src)
|
| if err != nil {
|
| return nil, err
|
| }
|
| - if key, err = d.putInner(key, plData); err != nil {
|
| + if key, err = d.putInner(key, pmData); err != nil {
|
| return nil, err
|
| }
|
| - return key, goon_internal.SetStructKey(src, key, d.KindNameResolver())
|
| + return key, nil
|
| }
|
|
|
| -func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datastore.Key, error) {
|
| - dataBytes, err := data.MarshalBinary()
|
| - if err != nil {
|
| - return nil, err
|
| - }
|
| +func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSKey, error) {
|
| + buf := &bytes.Buffer{}
|
| + helper.WriteDSPropertyMap(buf, data, helper.WithoutContext)
|
| + dataBytes := buf.Bytes()
|
|
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
| @@ -191,49 +162,57 @@ func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas
|
| return nil, err
|
| }
|
|
|
| - old := ents.Get(keyBytes(noNS, key))
|
| - oldPl := (*propertyList)(nil)
|
| + old := ents.Get(keyBytes(helper.WithoutContext, key))
|
| + oldPM := gae.DSPropertyMap(nil)
|
| if old != nil {
|
| - oldPl = &propertyList{}
|
| - if err = oldPl.UnmarshalBinary(old); err != nil {
|
| + if oldPM, err = rpmWoCtx(old, key.Namespace()); err != nil {
|
| return nil, err
|
| }
|
| }
|
| - if err = updateIndicies(d.store, key, oldPl, data); err != nil {
|
| + if err = updateIndicies(d.store, key, oldPM, data); err != nil {
|
| return nil, err
|
| }
|
|
|
| - ents.Set(keyBytes(noNS, key), dataBytes)
|
| + ents.Set(keyBytes(helper.WithoutContext, key), dataBytes)
|
|
|
| return key, nil
|
| }
|
|
|
| -func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl func(*datastore.Key) (*memCollection, error)) error {
|
| - key := newKeyObj(ns, knr, dst)
|
| - if !keyValid(ns, key, allowSpecialKeys) {
|
| - return datastore.ErrInvalidKey
|
| +func getInner(ns string, key gae.DSKey, dst interface{}, getColl func() (*memCollection, error)) error {
|
| + if helper.DSKeyIncomplete(key) || !helper.DSKeyValid(key, ns, true) {
|
| + return gae.ErrDSInvalidKey
|
| }
|
|
|
| - ents, err := getColl(key)
|
| + ents, err := getColl()
|
| if err != nil {
|
| return err
|
| }
|
| if ents == nil {
|
| - return datastore.ErrNoSuchEntity
|
| + return gae.ErrDSNoSuchEntity
|
| }
|
| - pdata := ents.Get(keyBytes(noNS, key))
|
| + pdata := ents.Get(keyBytes(helper.WithoutContext, key))
|
| if pdata == nil {
|
| - return datastore.ErrNoSuchEntity
|
| + return gae.ErrDSNoSuchEntity
|
| + }
|
| +
|
| + pm, err := rpmWoCtx(pdata, ns)
|
| + if err != nil {
|
| + return err
|
| }
|
| - pl := &propertyList{}
|
| - if err = pl.UnmarshalBinary(pdata); err != nil {
|
| +
|
| + pls, err := helper.GetPLS(dst)
|
| + if err != nil {
|
| return err
|
| }
|
| - return fromPL(pl, dst)
|
| +
|
| + // TODO(riannucci): should the Get API reveal conversion errors instead of
|
| + // swallowing them?
|
| + _, err = pls.Load(pm)
|
| + return err
|
| }
|
|
|
| -func (d *dataStoreData) get(ns string, dst interface{}) error {
|
| - return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*memCollection, error) {
|
| +func (d *dataStoreData) get(ns string, key gae.DSKey, dst interface{}) error {
|
| + return getInner(ns, key, dst, func() (*memCollection, error) {
|
| d.rwlock.RLock()
|
| s := d.store.Snapshot()
|
| d.rwlock.RUnlock()
|
| @@ -242,12 +221,12 @@ func (d *dataStoreData) get(ns string, dst interface{}) error {
|
| })
|
| }
|
|
|
| -func (d *dataStoreData) del(ns string, key *datastore.Key) error {
|
| - if !keyValid(ns, key, userKeyOnly) {
|
| - return datastore.ErrInvalidKey
|
| +func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) {
|
| + if !helper.DSKeyValid(key, ns, false) {
|
| + return gae.ErrDSInvalidKey
|
| }
|
|
|
| - keyBuf := keyBytes(noNS, key)
|
| + keyBuf := keyBytes(helper.WithoutContext, key)
|
|
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
| @@ -256,19 +235,18 @@ func (d *dataStoreData) del(ns string, key *datastore.Key) error {
|
| if ents == nil {
|
| return nil
|
| }
|
| - if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil {
|
| - return err
|
| + if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
|
| + return
|
| }
|
|
|
| old := ents.Get(keyBuf)
|
| - oldPl := (*propertyList)(nil)
|
| + oldPM := gae.DSPropertyMap(nil)
|
| if old != nil {
|
| - oldPl = &propertyList{}
|
| - if err := oldPl.UnmarshalBinary(old); err != nil {
|
| - return err
|
| + if oldPM, err = rpmWoCtx(old, ns); err != nil {
|
| + return
|
| }
|
| }
|
| - if err := updateIndicies(d.store, key, oldPl, nil); err != nil {
|
| + if err := updateIndicies(d.store, key, oldPM, nil); err != nil {
|
| return err
|
| }
|
|
|
| @@ -284,10 +262,11 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
|
| if len(muts) == 0 { // read-only
|
| continue
|
| }
|
| - k, err := keyFromByteString(withNS, rk, "")
|
| + k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.WithContext, "", "")
|
| if err != nil {
|
| panic(err)
|
| }
|
| +
|
| entKey := "ents:" + k.Namespace()
|
| mkey := groupMetaKey(k)
|
| entsHead := d.store.GetCollection(entKey)
|
| @@ -327,13 +306,12 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
|
| }
|
| }
|
|
|
| -func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, error) {
|
| +func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error) {
|
| return &txnDataStoreData{
|
| // alias to the main datastore's so that testing code can have primitive
|
| // access to break features inside of transactions.
|
| BrokenFeatures: &d.BrokenFeatures,
|
| parent: d,
|
| - knrKeeper: knrKeeper{knrFunc: d.knrFunc},
|
| isXG: o != nil && o.XG,
|
| snap: d.store.Snapshot(),
|
| muts: map[string][]txnMutation{},
|
| @@ -345,13 +323,12 @@ func (d *dataStoreData) endTxn() {}
|
| /////////////////////////////// txnDataStoreData ///////////////////////////////
|
|
|
| type txnMutation struct {
|
| - key *datastore.Key
|
| - data *propertyList
|
| + key gae.DSKey
|
| + data gae.DSPropertyMap
|
| }
|
|
|
| type txnDataStoreData struct {
|
| - *wrapper.BrokenFeatures
|
| - knrKeeper
|
| + *gae.BrokenFeatures
|
| sync.Mutex
|
|
|
| parent *dataStoreData
|
| @@ -371,8 +348,7 @@ type txnDataStoreData struct {
|
| var (
|
| _ = memContextObj((*txnDataStoreData)(nil))
|
| _ = sync.Locker((*txnDataStoreData)(nil))
|
| - _ = wrapper.Testable((*txnDataStoreData)(nil))
|
| - _ = wrapper.DSKindSetter((*txnDataStoreData)(nil))
|
| + _ = gae.Testable((*txnDataStoreData)(nil))
|
| )
|
|
|
| const xgEGLimit = 25
|
| @@ -387,17 +363,17 @@ func (td *txnDataStoreData) endTxn() {
|
| func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
|
| panic("txnDataStoreData cannot apply transactions")
|
| }
|
| -func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
|
| +func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) {
|
| return nil, errors.New("datastore: nested transactions are not supported")
|
| }
|
|
|
| -func (td *txnDataStoreData) IsBroken() error {
|
| +func (td *txnDataStoreData) RunIfNotBroken(f func() error) error {
|
| // Slightly different from the SDK... datastore and taskqueue each implement
|
| // this here, where in the SDK only datastore.transaction.Call does.
|
| if atomic.LoadInt32(&td.closed) == 1 {
|
| return errors.New("datastore: transaction context has expired")
|
| }
|
| - return td.BrokenFeatures.IsBroken()
|
| + return td.BrokenFeatures.RunIfNotBroken(f)
|
| }
|
|
|
| // writeMutation ensures that this transaction can support the given key/value
|
| @@ -411,8 +387,8 @@ func (td *txnDataStoreData) IsBroken() error {
|
| //
|
| // Returns an error if this key causes the transaction to cross too many entity
|
| // groups.
|
| -func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error {
|
| - rk := string(keyBytes(withNS, rootKey(key)))
|
| +func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.DSPropertyMap) error {
|
| + rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key)))
|
|
|
| td.Lock()
|
| defer td.Unlock()
|
| @@ -427,7 +403,7 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data
|
| if td.isXG {
|
| msg = "operating on too many entity groups in a single transaction"
|
| }
|
| - return newDSError(pb.Error_BAD_REQUEST, msg)
|
| + return errors.New(msg)
|
| }
|
| td.muts[rk] = []txnMutation{}
|
| }
|
| @@ -438,8 +414,8 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data
|
| return nil
|
| }
|
|
|
| -func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
|
| - key, plData, err := putPrelim(ns, td.KindNameResolver(), src)
|
| +func (td *txnDataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.DSKey, error) {
|
| + pMap, err := putPrelim(ns, key, src)
|
| if err != nil {
|
| return nil, err
|
| }
|
| @@ -453,15 +429,15 @@ func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err
|
| return nil, err
|
| }
|
|
|
| - if err = td.writeMutation(false, key, plData); err != nil {
|
| + if err = td.writeMutation(false, key, pMap); err != nil {
|
| return nil, err
|
| }
|
|
|
| - return key, goon_internal.SetStructKey(src, key, td.KindNameResolver())
|
| + return key, nil
|
| }
|
|
|
| -func (td *txnDataStoreData) get(ns string, dst interface{}) error {
|
| - return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) {
|
| +func (td *txnDataStoreData) get(ns string, key gae.DSKey, dst interface{}) error {
|
| + return getInner(ns, key, dst, func() (*memCollection, error) {
|
| if err := td.writeMutation(true, key, nil); err != nil {
|
| return nil, err
|
| }
|
| @@ -469,9 +445,31 @@ func (td *txnDataStoreData) get(ns string, dst interface{}) error {
|
| })
|
| }
|
|
|
| -func (td *txnDataStoreData) del(ns string, key *datastore.Key) error {
|
| - if !keyValid(ns, key, userKeyOnly) {
|
| - return datastore.ErrInvalidKey
|
| +func (td *txnDataStoreData) del(ns string, key gae.DSKey) error {
|
| + if !helper.DSKeyValid(key, ns, false) {
|
| + return gae.ErrDSInvalidKey
|
| }
|
| return td.writeMutation(false, key, nil)
|
| }
|
| +
|
| +func keyCouldBeValid(k gae.DSKey, ns string, allowSpecial bool) bool {
|
| + // adds an id to k if it's incomplete.
|
| + if helper.DSKeyIncomplete(k) {
|
| + k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k.Parent())
|
| + }
|
| + return helper.DSKeyValid(k, ns, allowSpecial)
|
| +}
|
| +
|
| +func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte {
|
| + buf := &bytes.Buffer{}
|
| + helper.WriteDSKey(buf, ctx, key)
|
| + return buf.Bytes()
|
| +}
|
| +
|
| +func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) {
|
| + return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutContext, globalAppID, ns)
|
| +}
|
| +
|
| +func rpm(data []byte) (gae.DSPropertyMap, error) {
|
| + return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContext, "", "")
|
| +}
|
|
|