Index: filter/txnBuf/ds_txn.go |
diff --git a/filter/txnBuf/ds_txn.go b/filter/txnBuf/ds_txn.go |
index ef426ef55164d332385d830744a3d6672a7a8df0..29868f959326e1088b7082050428e9c498b2ef23 100644 |
--- a/filter/txnBuf/ds_txn.go |
+++ b/filter/txnBuf/ds_txn.go |
@@ -21,8 +21,9 @@ var ErrTooManyRoots = errors.New( |
"operating on too many entity groups in nested transaction") |
type dsTxnBuf struct { |
- ic context.Context |
- state *txnBufState |
+ ic context.Context |
+ state *txnBufState |
+ haveLock bool |
dnj
2015/11/11 16:08:28
WDYT about replacing this with a "sync.Locker" tha
iannucci
2015/11/11 18:06:40
I agree it would be cleaner (fewer conditionals),
|
} |
var _ ds.RawInterface = (*dsTxnBuf)(nil) |
@@ -36,105 +37,15 @@ func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro |
} |
func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error { |
- data, err := d.state.getMulti(keys) |
- if err != nil { |
- return err |
- } |
- |
- idxMap := []int(nil) |
- getKeys := []*ds.Key(nil) |
- getMetas := ds.MultiMetaGetter(nil) |
- lme := errors.NewLazyMultiError(len(keys)) |
- |
- for i, itm := range data { |
- if !itm.buffered { |
- idxMap = append(idxMap, i) |
- getKeys = append(getKeys, itm.key) |
- getMetas = append(getMetas, metas.GetSingle(i)) |
- } |
- } |
- |
- if len(idxMap) > 0 { |
- j := 0 |
- err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.PropertyMap, err error) { |
- if err != ds.ErrNoSuchEntity { |
- i := idxMap[j] |
- if !lme.Assign(i, err) { |
- data[i].data = pm |
- } |
- } |
- j++ |
- }) |
- if err != nil { |
- return err |
- } |
- } |
- |
- for i, itm := range data { |
- err := lme.GetOne(i) |
- if err != nil { |
- cb(nil, err) |
- } else if itm.data == nil { |
- cb(nil, ds.ErrNoSuchEntity) |
- } else { |
- cb(itm.data, nil) |
- } |
- } |
- return nil |
+ return d.state.getMulti(keys, metas, cb, d.haveLock) |
} |
func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error { |
- lme := errors.NewLazyMultiError(len(keys)) |
- realKeys := []*ds.Key(nil) |
- for i, key := range keys { |
- if key.Incomplete() { |
- start, err := d.AllocateIDs(key, 1) |
- if !lme.Assign(i, err) { |
- if realKeys == nil { |
- realKeys = make([]*ds.Key, len(keys)) |
- copy(realKeys, keys) |
- } |
- |
- aid, ns, toks := key.Split() |
- toks[len(toks)-1].IntID = start |
- realKeys[i] = ds.NewKeyToks(aid, ns, toks) |
- } |
- } |
- } |
- if err := lme.Get(); err != nil { |
- for _, e := range err.(errors.MultiError) { |
- if e == nil { |
- e = errors.New("putMulti failed because some keys were unable to AllocateIDs") |
- } |
- cb(nil, e) |
- } |
- return nil |
- } |
- |
- if realKeys == nil { |
- realKeys = keys |
- } |
- |
- err := d.state.putMulti(realKeys, vals) |
- if err != nil { |
- return err |
- } |
- |
- for _, k := range realKeys { |
- cb(k, nil) |
- } |
- return nil |
+ return d.state.putMulti(keys, vals, cb, d.haveLock) |
} |
func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { |
- if err := d.state.deleteMulti(keys); err != nil { |
- return err |
- } |
- |
- for range keys { |
- cb(nil) |
- } |
- return nil |
+ return d.state.deleteMulti(keys, cb, d.haveLock) |
} |
func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) { |
@@ -165,13 +76,17 @@ func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
project := fq.Project() |
- d.state.Lock() |
- memDS := d.state.memDS |
+ if !d.haveLock { |
+ d.state.Lock() |
+ } |
+ bufDS := d.state.bufDS |
parentDS := d.state.parentDS |
sizes := d.state.entState.dup() |
- d.state.Unlock() |
+ if !d.haveLock { |
dnj
2015/11/11 16:08:28
Can we make this block an inline function and use
iannucci
2015/11/11 18:06:40
yep. I guess the `dup` call could crash.
|
+ d.state.Unlock() |
+ } |
- return runMergedQueries(fq, sizes, memDS, parentDS, func(key *ds.Key, data ds.PropertyMap) bool { |
+ return runMergedQueries(fq, sizes, bufDS, parentDS, func(key *ds.Key, data ds.PropertyMap) bool { |
if offset > 0 { |
offset-- |
return true |
@@ -196,6 +111,10 @@ func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
} |
func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.TransactionOptions) error { |
+ if !d.haveLock { |
+ d.state.Lock() |
+ defer d.state.Unlock() |
+ } |
return withTxnBuf(d.ic, cb, opts) |
} |