Index: impl/memory/datastore_data.go |
diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go |
index ad8515ee62919e1cf7a84fe942870127170a4043..ed7e56b0bd5c729577e5a0ae163f37b80633791c 100644 |
--- a/impl/memory/datastore_data.go |
+++ b/impl/memory/datastore_data.go |
@@ -9,11 +9,11 @@ import ( |
"fmt" |
"strings" |
"sync" |
- "sync/atomic" |
ds "github.com/luci/gae/service/datastore" |
"github.com/luci/gae/service/datastore/serialize" |
"github.com/luci/luci-go/common/errors" |
+ |
"golang.org/x/net/context" |
) |
@@ -176,15 +176,15 @@ func (d *dataStoreData) namespaces() []string { |
/////////////////////////// indexes(dataStoreData) //////////////////////////// |
func groupMetaKey(key *ds.Key) []byte { |
- return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root())) |
+ return keyBytes(ds.KeyContext{}.NewKey("__entity_group__", "", 1, key.Root())) |
} |
func groupIDsKey(key *ds.Key) []byte { |
- return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Root())) |
+ return keyBytes(ds.KeyContext{}.NewKey("__entity_group_ids__", "", 1, key.Root())) |
} |
func rootIDsKey(kind string) []byte { |
- return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) |
+ return keyBytes(ds.KeyContext{}.NewKey("__entity_root_ids__", kind, 0, nil)) |
} |
func curVersion(ents memCollection, key []byte) int64 { |
@@ -279,7 +279,7 @@ func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key, |
if err != nil { |
return key, err |
} |
- key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent()) |
+ key = key.KeyContext().NewKey(key.Kind(), "", id, key.Parent()) |
} |
return key, nil |
} |
@@ -412,7 +412,7 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
if len(muts) == 0 { // read-only |
continue |
} |
- prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), serialize.WithContext, "", "") |
+ prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), serialize.WithContext, ds.KeyContext{}) |
memoryCorruption(err) |
k := prop.Value().(*ds.Key) |
@@ -455,9 +455,11 @@ func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
// alias to the main datastore's so that testing code can have primitive |
// access to break features inside of transactions. |
parent: d, |
- isXG: o != nil && o.XG, |
- snap: d.head.Snapshot(), |
- muts: map[string][]txnMutation{}, |
+ txn: &transactionImpl{ |
+ isXG: o != nil && o.XG, |
+ }, |
+ snap: d.head.Snapshot(), |
+ muts: map[string][]txnMutation{}, |
} |
} |
@@ -475,9 +477,7 @@ type txnDataStoreData struct { |
parent *dataStoreData |
- // boolean 0 or 1, use atomic.*Int32 to access. |
- closed int32 |
- isXG bool |
+ txn *transactionImpl |
snap memStore |
@@ -492,11 +492,11 @@ var _ memContextObj = (*txnDataStoreData)(nil) |
const xgEGLimit = 25 |
func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
+ |
func (td *txnDataStoreData) endTxn() { |
- if atomic.LoadInt32(&td.closed) == 1 { |
- panic("cannot end transaction twice") |
+ if err := td.txn.close(); err != nil { |
+ panic(err) |
} |
- atomic.StoreInt32(&td.closed, 1) |
} |
func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
impossible(fmt.Errorf("cannot create a recursive transaction")) |
@@ -509,8 +509,8 @@ func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
func (td *txnDataStoreData) run(f func() error) error { |
// Slightly different from the SDK... datastore and taskqueue each implement |
// this here, where in the SDK only datastore.transaction.Call does. |
- if atomic.LoadInt32(&td.closed) == 1 { |
- return errors.New("datastore: transaction context has expired") |
+ if err := td.txn.valid(); err != nil { |
+ return err |
} |
return f() |
} |
@@ -534,12 +534,12 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro |
if _, ok := td.muts[rk]; !ok { |
limit := 1 |
- if td.isXG { |
+ if td.txn.isXG { |
limit = xgEGLimit |
} |
if len(td.muts)+1 > limit { |
msg := "cross-group transaction need to be explicitly specified (xg=True)" |
- if td.isXG { |
+ if td.txn.isXG { |
msg = "operating on too many entity groups in a single transaction" |
} |
return errors.New(msg) |
@@ -603,7 +603,7 @@ func keyBytes(key *ds.Key) []byte { |
func rpm(data []byte) (ds.PropertyMap, error) { |
return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
- serialize.WithContext, "", "") |
+ serialize.WithContext, ds.KeyContext{}) |
} |
func namespaces(store memStore) []string { |