Chromium Code Reviews| Index: go/src/infra/gae/libs/wrapper/memory/datastore_data.go | 
| diff --git a/go/src/infra/gae/libs/wrapper/memory/datastore_data.go b/go/src/infra/gae/libs/wrapper/memory/datastore_data.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..eb75eb6ccda3c0a232ca85cb2eaff041b932715d | 
| --- /dev/null | 
| +++ b/go/src/infra/gae/libs/wrapper/memory/datastore_data.go | 
| @@ -0,0 +1,469 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package memory | 
| + | 
| +import ( | 
| + "errors" | 
| + "infra/gae/libs/wrapper" | 
| + goon_internal "infra/gae/libs/wrapper/memory/internal/goon" | 
| + "math/rand" | 
| + "sync" | 
| + "sync/atomic" | 
| + | 
| + "github.com/mjibson/goon" | 
| + | 
| + "appengine/datastore" | 
| + pb "appengine_internal/datastore" | 
| +) | 
| + | 
| +////////////////////////////////// knrKeeper /////////////////////////////////// | 
| + | 
| +type knrKeeper struct { | 
| + knrLock sync.Mutex | 
| + knrFunc goon.KindNameResolver | 
| +} | 
| + | 
| +func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { | 
| + k.knrLock.Lock() | 
| + defer k.knrLock.Unlock() | 
| + ret := k.knrFunc | 
| + if ret == nil { | 
| + ret = goon.DefaultKindName | 
| + k.knrFunc = ret | 
| + } | 
| + return ret | 
| +} | 
| + | 
| +func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { | 
| + k.knrLock.Lock() | 
| + defer k.knrLock.Unlock() | 
| + if knr == nil { | 
| + knr = goon.DefaultKindName | 
| + } | 
| + k.knrFunc = knr | 
| +} | 
| + | 
| +//////////////////////////////// dataStoreData ///////////////////////////////// | 
| + | 
| +type dataStoreData struct { | 
| + wrapper.BrokenFeatures | 
| + knrKeeper | 
| + | 
| + rwlock sync.RWMutex | 
| + store *memStore | 
| + snap *memStore | 
| + | 
| + /* collections: | 
| 
 
M-A Ruel
2015/05/28 22:42:38
FTR, I prefer to use // all the time so if I need
 
iannucci
2015/05/28 23:00:34
removed in favor of a comment pointing to the READ
 
 | 
| + * ents:ns -> key -> value | 
| + * (rootkind, rootid, __entity_group__,1) -> {__version__: int} | 
| + * (rootkind, rootid, __entity_group_ids__,1) -> {__version__: int} | 
| + * (__entity_group_ids__,1) -> {__version__: int} | 
| + * idx -> kind,A?,[-?prop]* | 
| + * idx:ns:kind -> key = nil | 
| + * idx:ns:kind|prop -> propval|key = [prev val] | 
| + * idx:ns:kind|-prop -> -propval|key = [next val] | 
| + * idx:ns:kind|A|?prop|?prop -> A|propval|propval|key = [prev/next val]|[prev/next val] | 
| + * idx:ns:kind|?prop|?prop -> propval|propval|key = [prev/next val]|[prev/next val] | 
| + */ | 
| +} | 
| + | 
| +////////////////////////////// New(dataStoreData) ////////////////////////////// | 
| + | 
| +func newDataStoreData() *dataStoreData { | 
| + store := newMemStore() | 
| + return &dataStoreData{ | 
| + BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError(pb.Error_INTERNAL_ERROR)}, | 
| + store: store, | 
| + snap: store.Snapshot(), // empty but better than a nil pointer. | 
| + } | 
| +} | 
| + | 
| +////////////////////////////// Locker(Datastore) /////////////////////////////// | 
| + | 
| +func (d *dataStoreData) Lock() { | 
| + d.rwlock.Lock() | 
| +} | 
| + | 
| +func (d *dataStoreData) Unlock() { | 
| + d.rwlock.Unlock() | 
| +} | 
| + | 
| +func groupMetaKey(key *datastore.Key) []byte { | 
| + return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key))) | 
| +} | 
| + | 
| +func groupIDsKey(key *datastore.Key) []byte { | 
| + return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey(key))) | 
| +} | 
| + | 
| +func rootIDsKey(kind string) []byte { | 
| + return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) | 
| +} | 
| + | 
| +func curVersion(ents *memCollection, key []byte) (int64, error) { | 
| + var err error | 
| + v := ents.Get(key) | 
| + num := int64(0) | 
| + numData := &propertyList{} | 
| + if v != nil { | 
| + err = numData.UnmarshalBinary(v) | 
| + num = (*numData)[0].Value.(int64) | 
| + } | 
| + return num, err | 
| +} | 
| + | 
| +func incrementLocked(ents *memCollection, key []byte) (int64, error) { | 
| + v := ents.Get(key) | 
| + | 
| + num := int64(0) | 
| + numData := &propertyList{} | 
| + if v == nil { | 
| + num++ | 
| + *numData = append(*numData, datastore.Property{Name: "__version__"}) | 
| + } else { | 
| + err := numData.UnmarshalBinary(v) | 
| + if err != nil { | 
| + return 0, err | 
| + } | 
| + num = (*numData)[0].Value.(int64) | 
| + num++ | 
| 
 
M-A Ruel
2015/05/28 22:42:38
Remove both num++, insert before line 133:
num++
 
iannucci
2015/05/28 23:00:34
done
 
 | 
| + } | 
| + (*numData)[0].Value = num | 
| + incData, err := numData.MarshalBinary() | 
| + if err != nil { | 
| + return 0, err | 
| + } | 
| + ents.Set(key, incData) | 
| + | 
| + return num, nil | 
| +} | 
| + | 
| +func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *datastore.Key, error) { | 
| + coll := "ents:" + key.Namespace() | 
| + ents := d.store.GetCollection(coll) | 
| + if ents == nil { | 
| + ents = d.store.SetCollection(coll, nil) | 
| + } | 
| + | 
| + if key.Incomplete() { | 
| + var idKey []byte | 
| + if key.Parent() == nil { | 
| + idKey = rootIDsKey(key.Kind()) | 
| + } else { | 
| + idKey = groupIDsKey(key) | 
| + } | 
| + | 
| + id, err := incrementLocked(ents, idKey) | 
| + if err != nil { | 
| + return nil, nil, err | 
| + } | 
| + key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) | 
| + } | 
| + | 
| + return ents, key, nil | 
| +} | 
| + | 
| +func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastore.Key, *propertyList, error) { | 
| + key := newKeyObj(ns, knr, src) | 
| + if !KeyCouldBeValid(ns, key, UserKeyOnly) { | 
| + // TODO(riannucci): different error for Put-ing to reserved Keys? | 
| + return nil, nil, datastore.ErrInvalidKey | 
| + } | 
| + | 
| + data, err := toPL(src) | 
| + return key, data, err | 
| +} | 
| + | 
| +func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { | 
| + key, plData, err := putPrelim(ns, d.KindNameResolver(), src) | 
| + if err != nil { | 
| + return nil, err | 
| + } | 
| + key, err = d.putInner(key, plData) | 
| + if err != nil { | 
| + return nil, err | 
| + } | 
| + return key, goon_internal.SetStructKey(src, key, d.KindNameResolver()) | 
| +} | 
| + | 
| +func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datastore.Key, error) { | 
| + dataBytes, err := data.MarshalBinary() | 
| + if err != nil { | 
| + return nil, err | 
| + } | 
| + | 
| + d.rwlock.Lock() | 
| + defer d.Unlock() | 
| 
 
M-A Ruel
2015/05/28 22:42:38
Stay symmetrical between the calls.
 
iannucci
2015/05/28 23:00:34
oops good catch.
 
 | 
| + | 
| + ents, key, err := d.entsKeyLocked(key) | 
| + if err != nil { | 
| + return nil, err | 
| + } | 
| + | 
| + if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | 
| + return nil, err | 
| + } | 
| + | 
| + ents.Set(keyBytes(noNS, key), dataBytes) | 
| + | 
| + return key, nil | 
| +} | 
| + | 
| +func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl func(*datastore.Key) (*memCollection, error)) error { | 
| + key := newKeyObj(ns, knr, dst) | 
| + if !KeyValid(ns, key, AllowSpecialKeys) { | 
| + return datastore.ErrInvalidKey | 
| + } | 
| + | 
| + ents, err := getColl(key) | 
| + if err != nil { | 
| + return err | 
| + } | 
| + if ents == nil { | 
| + return datastore.ErrNoSuchEntity | 
| + } | 
| + pdata := ents.Get(keyBytes(noNS, key)) | 
| + if pdata == nil { | 
| + return datastore.ErrNoSuchEntity | 
| + } | 
| + pl := &propertyList{} | 
| + err = pl.UnmarshalBinary(pdata) | 
| 
 
M-A Ruel
2015/05/28 22:42:38
merge lines 232 et 233.
 
iannucci
2015/05/28 23:00:34
done
 
 | 
| + if err != nil { | 
| + return err | 
| + } | 
| + return fromPL(pl, dst) | 
| +} | 
| + | 
| +func (d *dataStoreData) get(ns string, dst interface{}) error { | 
| + return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*memCollection, error) { | 
| + d.rwlock.RLock() | 
| 
 
M-A Ruel
2015/05/28 22:42:38
There will be enough concurrent calls to warrant a
 
iannucci
2015/05/28 23:00:35
I think there will be. Writes are heavy (esp when
 
 | 
| + s := d.store.Snapshot() | 
| + d.rwlock.RUnlock() | 
| + | 
| + return s.GetCollection("ents:" + ns), nil | 
| + }) | 
| +} | 
| + | 
| +func (d *dataStoreData) del(ns string, key *datastore.Key) error { | 
| + if !KeyValid(ns, key, UserKeyOnly) { | 
| + return datastore.ErrInvalidKey | 
| + } | 
| + | 
| + keyBuf := keyBytes(noNS, key) | 
| + | 
| + d.rwlock.Lock() | 
| + defer d.Unlock() | 
| + | 
| + ents := d.store.GetCollection("ents:" + ns) | 
| + if ents == nil { | 
| + return nil | 
| + } | 
| + | 
| + _, err := incrementLocked(ents, groupMetaKey(key)) | 
| + if err != nil { | 
| + return err | 
| + } | 
| + | 
| + ents.Delete(keyBuf) | 
| + return nil | 
| +} | 
| + | 
| +///////////////////////// memContextObj(dataStoreData) ///////////////////////// | 
| + | 
| +func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 
| + // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 
| + | 
| + txn := obj.(*txnDataStoreData) | 
| + for rk, muts := range txn.muts { | 
| + if len(muts) == 0 { // read-only | 
| + continue | 
| + } | 
| + k, err := keyFromByteString(withNS, rk) | 
| + if err != nil { | 
| + panic(err) | 
| + } | 
| + entKey := "ents:" + k.Namespace() | 
| + mkey := groupMetaKey(k) | 
| + entsHead := d.store.GetCollection(entKey) | 
| + entsSnap := txn.snap.GetCollection(entKey) | 
| + vHead, err := curVersion(entsHead, mkey) | 
| + if err != nil { | 
| + panic(err) | 
| + } | 
| + vSnap, err := curVersion(entsSnap, mkey) | 
| + if err != nil { | 
| + panic(err) | 
| + } | 
| + if vHead != vSnap { | 
| + return false | 
| + } | 
| + } | 
| + return true | 
| +} | 
| + | 
| +func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) { | 
| + txn := obj.(*txnDataStoreData) | 
| + for _, muts := range txn.muts { | 
| + if len(muts) == 0 { // read-only | 
| + continue | 
| + } | 
| + for _, m := range muts { | 
| + var err error | 
| + if m.data == nil { | 
| + err = d.del(m.key.Namespace(), m.key) | 
| + } else { | 
| + _, err = d.putInner(m.key, m.data) | 
| + } | 
| + if err != nil { | 
| + panic(err) | 
| + } | 
| + } | 
| + } | 
| +} | 
| + | 
| +func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, error) { | 
| + return &txnDataStoreData{ | 
| + // alias to the main datastore's so that testing code can have primitive | 
| + // access to break features inside of transactions. | 
| + BrokenFeatures: &d.BrokenFeatures, | 
| + parent: d, | 
| + knrKeeper: knrKeeper{knrFunc: d.knrFunc}, | 
| + isXG: o != nil && o.XG, | 
| + snap: d.store.Snapshot(), | 
| + muts: map[string][]txnMutation{}, | 
| + }, nil | 
| +} | 
| + | 
| +func (d *dataStoreData) endTxn() {} | 
| + | 
| +/////////////////////////////// txnDataStoreData /////////////////////////////// | 
| +type txnMutation struct { | 
| + key *datastore.Key | 
| + data *propertyList | 
| +} | 
| + | 
| +type txnDataStoreData struct { | 
| + *wrapper.BrokenFeatures | 
| + knrKeeper | 
| + sync.Mutex | 
| + | 
| + parent *dataStoreData | 
| + | 
| + // boolean 0 or 1, use atomic.*Int32 to access. | 
| + closed int32 | 
| + isXG bool | 
| + | 
| + snap *memStore | 
| + | 
| + // string is the raw-bytes encoding of the entity root incl. namespace | 
| + muts map[string][]txnMutation | 
| + // TODO(riannucci): account for 'transaction size' limit of 10MB by summing | 
| + // length of encoded keys + values. | 
| +} | 
| + | 
| +const xgEGLimit = 25 | 
| + | 
| +/////////////////////// memContextObj(txnDataStoreData) //////////////////////// | 
| + | 
| +func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 
| +func (td *txnDataStoreData) endTxn() { | 
| + if atomic.LoadInt32(&td.closed) == 1 { | 
| + panic("cannot end transaction twice") | 
| + } | 
| + atomic.StoreInt32(&td.closed, 1) | 
| +} | 
| +func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) { | 
| + panic("txnDataStoreData cannot apply transactions") | 
| +} | 
| +func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { | 
| + return nil, errors.New("datastore: nested transactions are not supported") | 
| +} | 
| + | 
| +/////////////////// wrapper.BrokenFeatures(txnDataStoreData) /////////////////// | 
| + | 
| +func (td *txnDataStoreData) IsBroken() error { | 
| + // Slightly different from the SDK... datastore and taskqueue each implement | 
| + // this here, where in the SDK only datastore.transaction.Call does. | 
| + if atomic.LoadInt32(&td.closed) == 1 { | 
| + return errors.New("datastore: transaction context has expired") | 
| + } | 
| + return td.BrokenFeatures.IsBroken() | 
| +} | 
| + | 
| +// writeMutation ensures that this transaction can support the given key/value | 
| +// mutation. | 
| +// | 
| +// if getOnly is true, don't record the actual mutation data, just ensure that | 
| +// the key is in an included entity group (or add an empty entry for that | 
| +// group). | 
| +// | 
| +// if !getOnly && data == nil, this counts as a deletion instead of a Put. | 
| +// | 
| +// Returns an error if this key causes the transaction to cross too many entity | 
| +// groups. | 
| +func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error { | 
| + rk := string(keyBytes(withNS, rootKey(key))) | 
| + | 
| + td.Lock() | 
| + defer td.Unlock() | 
| + | 
| + if _, ok := td.muts[rk]; !ok { | 
| + limit := 1 | 
| + if td.isXG { | 
| + limit = xgEGLimit | 
| + } | 
| + if len(td.muts)+1 > limit { | 
| + msg := "cross-group transaction need to be explicitly specified (xg=True)" | 
| + if td.isXG { | 
| + msg = "operating on too many entity groups in a single transaction" | 
| + } | 
| + return newDSError(pb.Error_BAD_REQUEST, msg) | 
| + } | 
| + td.muts[rk] = []txnMutation{} | 
| + } | 
| + if !getOnly { | 
| + td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 
| + } | 
| + | 
| + return nil | 
| +} | 
| + | 
| +func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { | 
| + key, plData, err := putPrelim(ns, td.KindNameResolver(), src) | 
| + if err != nil { | 
| + return nil, err | 
| + } | 
| + | 
| + func() { | 
| + td.parent.Lock() | 
| + defer td.parent.Unlock() | 
| + _, key, err = td.parent.entsKeyLocked(key) | 
| + }() | 
| + if err != nil { | 
| + return nil, err | 
| + } | 
| + | 
| + if err = td.writeMutation(false, key, plData); err != nil { | 
| + return nil, err | 
| + } | 
| + | 
| + return key, goon_internal.SetStructKey(src, key, td.KindNameResolver()) | 
| +} | 
| + | 
| +func (td *txnDataStoreData) get(ns string, dst interface{}) error { | 
| + return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) { | 
| + if err := td.writeMutation(true, key, nil); err != nil { | 
| + return nil, err | 
| + } | 
| + return td.snap.GetCollection("ents:" + ns), nil | 
| + }) | 
| +} | 
| + | 
| +func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { | 
| + if !KeyValid(ns, key, UserKeyOnly) { | 
| + return datastore.ErrInvalidKey | 
| + } | 
| + return td.writeMutation(false, key, nil) | 
| +} |