| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| 11 "sync/atomic" | 11 "sync/atomic" |
| 12 | 12 |
| 13 » rds "github.com/luci/gae/service/rawdatastore" | 13 » ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 //////////////////////////////// dataStoreData ///////////////////////////////// | 18 //////////////////////////////// dataStoreData ///////////////////////////////// |
| 19 | 19 |
| 20 type dataStoreData struct { | 20 type dataStoreData struct { |
| 21 rwlock sync.RWMutex | 21 rwlock sync.RWMutex |
| 22 // See README.md for store schema. | 22 // See README.md for store schema. |
| 23 store *memStore | 23 store *memStore |
| (...skipping 16 matching lines...) Expand all Loading... |
| 40 func (d *dataStoreData) Lock() { | 40 func (d *dataStoreData) Lock() { |
| 41 d.rwlock.Lock() | 41 d.rwlock.Lock() |
| 42 } | 42 } |
| 43 | 43 |
| 44 func (d *dataStoreData) Unlock() { | 44 func (d *dataStoreData) Unlock() { |
| 45 d.rwlock.Unlock() | 45 d.rwlock.Unlock() |
| 46 } | 46 } |
| 47 | 47 |
| 48 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 48 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
| 49 | 49 |
| 50 func groupMetaKey(key rds.Key) []byte { | 50 func groupMetaKey(key ds.Key) []byte { |
| 51 » return keyBytes(rds.WithoutContext, | 51 » return keyBytes(ds.WithoutContext, |
| 52 » » rds.NewKey("", "", "__entity_group__", "", 1, rds.KeyRoot(key))) | 52 » » ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key))) |
| 53 } | 53 } |
| 54 | 54 |
| 55 func groupIDsKey(key rds.Key) []byte { | 55 func groupIDsKey(key ds.Key) []byte { |
| 56 » return keyBytes(rds.WithoutContext, | 56 » return keyBytes(ds.WithoutContext, |
| 57 » » rds.NewKey("", "", "__entity_group_ids__", "", 1, rds.KeyRoot(ke
y))) | 57 » » ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key)
)) |
| 58 } | 58 } |
| 59 | 59 |
| 60 func rootIDsKey(kind string) []byte { | 60 func rootIDsKey(kind string) []byte { |
| 61 » return keyBytes(rds.WithoutContext, | 61 » return keyBytes(ds.WithoutContext, |
| 62 » » rds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | 62 » » ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) |
| 63 } | 63 } |
| 64 | 64 |
| 65 func curVersion(ents *memCollection, key []byte) int64 { | 65 func curVersion(ents *memCollection, key []byte) int64 { |
| 66 if v := ents.Get(key); v != nil { | 66 if v := ents.Get(key); v != nil { |
| 67 pm, err := rpm(v) | 67 pm, err := rpm(v) |
| 68 if err != nil { | 68 if err != nil { |
| 69 panic(err) // memory corruption | 69 panic(err) // memory corruption |
| 70 } | 70 } |
| 71 pl, ok := pm["__version__"] | 71 pl, ok := pm["__version__"] |
| 72 » » if ok && len(pl) > 0 && pl[0].Type() == rds.PTInt { | 72 » » if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
| 73 return pl[0].Value().(int64) | 73 return pl[0].Value().(int64) |
| 74 } | 74 } |
| 75 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) | 75 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) |
| 76 } | 76 } |
| 77 return 0 | 77 return 0 |
| 78 } | 78 } |
| 79 | 79 |
| 80 func incrementLocked(ents *memCollection, key []byte) int64 { | 80 func incrementLocked(ents *memCollection, key []byte) int64 { |
| 81 ret := curVersion(ents, key) + 1 | 81 ret := curVersion(ents, key) + 1 |
| 82 buf := &bytes.Buffer{} | 82 buf := &bytes.Buffer{} |
| 83 » rds.PropertyMap{"__version__": {rds.MkPropertyNI(ret)}}.Write( | 83 » ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write( |
| 84 » » buf, rds.WithContext) | 84 » » buf, ds.WithContext) |
| 85 ents.Set(key, buf.Bytes()) | 85 ents.Set(key, buf.Bytes()) |
| 86 return ret | 86 return ret |
| 87 } | 87 } |
| 88 | 88 |
| 89 func (d *dataStoreData) entsKeyLocked(key rds.Key) (*memCollection, rds.Key) { | 89 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
| 90 coll := "ents:" + key.Namespace() | 90 coll := "ents:" + key.Namespace() |
| 91 ents := d.store.GetCollection(coll) | 91 ents := d.store.GetCollection(coll) |
| 92 if ents == nil { | 92 if ents == nil { |
| 93 ents = d.store.SetCollection(coll, nil) | 93 ents = d.store.SetCollection(coll, nil) |
| 94 } | 94 } |
| 95 | 95 |
| 96 » if rds.KeyIncomplete(key) { | 96 » if ds.KeyIncomplete(key) { |
| 97 idKey := []byte(nil) | 97 idKey := []byte(nil) |
| 98 if key.Parent() == nil { | 98 if key.Parent() == nil { |
| 99 idKey = rootIDsKey(key.Kind()) | 99 idKey = rootIDsKey(key.Kind()) |
| 100 } else { | 100 } else { |
| 101 idKey = groupIDsKey(key) | 101 idKey = groupIDsKey(key) |
| 102 } | 102 } |
| 103 id := incrementLocked(ents, idKey) | 103 id := incrementLocked(ents, idKey) |
| 104 » » key = rds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", i
d, key.Parent()) | 104 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) |
| 105 } | 105 } |
| 106 | 106 |
| 107 return ents, key | 107 return ents, key |
| 108 } | 108 } |
| 109 | 109 |
| 110 func (d *dataStoreData) putMulti(keys []rds.Key, vals []rds.PropertyLoadSaver, c
b rds.PutMultiCB) { | 110 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { |
| 111 for i, k := range keys { | 111 for i, k := range keys { |
| 112 buf := &bytes.Buffer{} | 112 buf := &bytes.Buffer{} |
| 113 » » pmap := vals[i].(rds.PropertyMap) | 113 » » pmap, _ := vals[i].Save(false) |
| 114 » » pmap.Write(buf, rds.WithoutContext) | 114 » » pmap.Write(buf, ds.WithoutContext) |
| 115 dataBytes := buf.Bytes() | 115 dataBytes := buf.Bytes() |
| 116 | 116 |
| 117 » » k, err := func() (ret rds.Key, err error) { | 117 » » k, err := func() (ret ds.Key, err error) { |
| 118 d.rwlock.Lock() | 118 d.rwlock.Lock() |
| 119 defer d.rwlock.Unlock() | 119 defer d.rwlock.Unlock() |
| 120 | 120 |
| 121 ents, ret := d.entsKeyLocked(k) | 121 ents, ret := d.entsKeyLocked(k) |
| 122 incrementLocked(ents, groupMetaKey(ret)) | 122 incrementLocked(ents, groupMetaKey(ret)) |
| 123 | 123 |
| 124 » » » old := ents.Get(keyBytes(rds.WithoutContext, ret)) | 124 » » » old := ents.Get(keyBytes(ds.WithoutContext, ret)) |
| 125 » » » oldPM := rds.PropertyMap(nil) | 125 » » » oldPM := ds.PropertyMap(nil) |
| 126 if old != nil { | 126 if old != nil { |
| 127 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | 127 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { |
| 128 return | 128 return |
| 129 } | 129 } |
| 130 } | 130 } |
| 131 updateIndicies(d.store, ret, oldPM, pmap) | 131 updateIndicies(d.store, ret, oldPM, pmap) |
| 132 » » » ents.Set(keyBytes(rds.WithoutContext, ret), dataBytes) | 132 » » » ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes) |
| 133 return | 133 return |
| 134 }() | 134 }() |
| 135 if cb != nil { | 135 if cb != nil { |
| 136 cb(k, err) | 136 cb(k, err) |
| 137 } | 137 } |
| 138 } | 138 } |
| 139 } | 139 } |
| 140 | 140 |
| 141 func getMultiInner(keys []rds.Key, cb rds.GetMultiCB, getColl func() (*memCollec
tion, error)) error { | 141 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { |
| 142 ents, err := getColl() | 142 ents, err := getColl() |
| 143 if err != nil { | 143 if err != nil { |
| 144 return err | 144 return err |
| 145 } | 145 } |
| 146 if ents == nil { | 146 if ents == nil { |
| 147 for range keys { | 147 for range keys { |
| 148 » » » cb(nil, rds.ErrNoSuchEntity) | 148 » » » cb(nil, ds.ErrNoSuchEntity) |
| 149 } | 149 } |
| 150 return nil | 150 return nil |
| 151 } | 151 } |
| 152 | 152 |
| 153 for _, k := range keys { | 153 for _, k := range keys { |
| 154 » » pdata := ents.Get(keyBytes(rds.WithoutContext, k)) | 154 » » pdata := ents.Get(keyBytes(ds.WithoutContext, k)) |
| 155 if pdata == nil { | 155 if pdata == nil { |
| 156 » » » cb(nil, rds.ErrNoSuchEntity) | 156 » » » cb(nil, ds.ErrNoSuchEntity) |
| 157 continue | 157 continue |
| 158 } | 158 } |
| 159 cb(rpmWoCtx(pdata, k.Namespace())) | 159 cb(rpmWoCtx(pdata, k.Namespace())) |
| 160 } | 160 } |
| 161 return nil | 161 return nil |
| 162 } | 162 } |
| 163 | 163 |
| 164 func (d *dataStoreData) getMulti(keys []rds.Key, cb rds.GetMultiCB) error { | 164 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
| 165 getMultiInner(keys, cb, func() (*memCollection, error) { | 165 getMultiInner(keys, cb, func() (*memCollection, error) { |
| 166 d.rwlock.RLock() | 166 d.rwlock.RLock() |
| 167 s := d.store.Snapshot() | 167 s := d.store.Snapshot() |
| 168 d.rwlock.RUnlock() | 168 d.rwlock.RUnlock() |
| 169 | 169 |
| 170 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 170 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
| 171 }) | 171 }) |
| 172 return nil | 172 return nil |
| 173 } | 173 } |
| 174 | 174 |
| 175 func (d *dataStoreData) delMulti(keys []rds.Key, cb rds.DeleteMultiCB) { | 175 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
| 176 toDel := make([][]byte, 0, len(keys)) | 176 toDel := make([][]byte, 0, len(keys)) |
| 177 for _, k := range keys { | 177 for _, k := range keys { |
| 178 » » toDel = append(toDel, keyBytes(rds.WithoutContext, k)) | 178 » » toDel = append(toDel, keyBytes(ds.WithoutContext, k)) |
| 179 } | 179 } |
| 180 ns := keys[0].Namespace() | 180 ns := keys[0].Namespace() |
| 181 | 181 |
| 182 d.rwlock.Lock() | 182 d.rwlock.Lock() |
| 183 defer d.rwlock.Unlock() | 183 defer d.rwlock.Unlock() |
| 184 | 184 |
| 185 ents := d.store.GetCollection("ents:" + ns) | 185 ents := d.store.GetCollection("ents:" + ns) |
| 186 | 186 |
| 187 for i, k := range keys { | 187 for i, k := range keys { |
| 188 if ents != nil { | 188 if ents != nil { |
| (...skipping 18 matching lines...) Expand all Loading... |
| 207 } | 207 } |
| 208 | 208 |
| 209 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 209 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| 210 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 210 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| 211 | 211 |
| 212 txn := obj.(*txnDataStoreData) | 212 txn := obj.(*txnDataStoreData) |
| 213 for rk, muts := range txn.muts { | 213 for rk, muts := range txn.muts { |
| 214 if len(muts) == 0 { // read-only | 214 if len(muts) == 0 { // read-only |
| 215 continue | 215 continue |
| 216 } | 216 } |
| 217 » » k, err := rds.ReadKey(bytes.NewBufferString(rk), rds.WithContext
, "", "") | 217 » » k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext,
"", "") |
| 218 if err != nil { | 218 if err != nil { |
| 219 panic(err) | 219 panic(err) |
| 220 } | 220 } |
| 221 | 221 |
| 222 entKey := "ents:" + k.Namespace() | 222 entKey := "ents:" + k.Namespace() |
| 223 mkey := groupMetaKey(k) | 223 mkey := groupMetaKey(k) |
| 224 entsHead := d.store.GetCollection(entKey) | 224 entsHead := d.store.GetCollection(entKey) |
| 225 entsSnap := txn.snap.GetCollection(entKey) | 225 entsSnap := txn.snap.GetCollection(entKey) |
| 226 vHead := curVersion(entsHead, mkey) | 226 vHead := curVersion(entsHead, mkey) |
| 227 vSnap := curVersion(entsSnap, mkey) | 227 vSnap := curVersion(entsSnap, mkey) |
| 228 if vHead != vSnap { | 228 if vHead != vSnap { |
| 229 return false | 229 return false |
| 230 } | 230 } |
| 231 } | 231 } |
| 232 return true | 232 return true |
| 233 } | 233 } |
| 234 | 234 |
| 235 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | 235 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
| 236 txn := obj.(*txnDataStoreData) | 236 txn := obj.(*txnDataStoreData) |
| 237 for _, muts := range txn.muts { | 237 for _, muts := range txn.muts { |
| 238 if len(muts) == 0 { // read-only | 238 if len(muts) == 0 { // read-only |
| 239 continue | 239 continue |
| 240 } | 240 } |
| 241 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul
ti | 241 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul
ti |
| 242 for _, m := range muts { | 242 for _, m := range muts { |
| 243 err := error(nil) | 243 err := error(nil) |
| 244 k := m.key | 244 k := m.key |
| 245 if m.data == nil { | 245 if m.data == nil { |
| 246 » » » » d.delMulti([]rds.Key{k}, | 246 » » » » d.delMulti([]ds.Key{k}, |
| 247 func(e error) { err = e }) | 247 func(e error) { err = e }) |
| 248 } else { | 248 } else { |
| 249 » » » » d.putMulti([]rds.Key{m.key}, []rds.PropertyLoadS
aver{m.data}, | 249 » » » » d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, |
| 250 » » » » » func(_ rds.Key, e error) { err = e }) | 250 » » » » » func(_ ds.Key, e error) { err = e }) |
| 251 } | 251 } |
| 252 err = errors.SingleError(err) | 252 err = errors.SingleError(err) |
| 253 if err != nil { | 253 if err != nil { |
| 254 panic(err) | 254 panic(err) |
| 255 } | 255 } |
| 256 } | 256 } |
| 257 } | 257 } |
| 258 } | 258 } |
| 259 | 259 |
| 260 func (d *dataStoreData) mkTxn(o *rds.TransactionOptions) memContextObj { | 260 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
| 261 return &txnDataStoreData{ | 261 return &txnDataStoreData{ |
| 262 // alias to the main datastore's so that testing code can have p
rimitive | 262 // alias to the main datastore's so that testing code can have p
rimitive |
| 263 // access to break features inside of transactions. | 263 // access to break features inside of transactions. |
| 264 parent: d, | 264 parent: d, |
| 265 isXG: o != nil && o.XG, | 265 isXG: o != nil && o.XG, |
| 266 snap: d.store.Snapshot(), | 266 snap: d.store.Snapshot(), |
| 267 muts: map[string][]txnMutation{}, | 267 muts: map[string][]txnMutation{}, |
| 268 } | 268 } |
| 269 } | 269 } |
| 270 | 270 |
| 271 func (d *dataStoreData) endTxn() {} | 271 func (d *dataStoreData) endTxn() {} |
| 272 | 272 |
| 273 /////////////////////////////// txnDataStoreData /////////////////////////////// | 273 /////////////////////////////// txnDataStoreData /////////////////////////////// |
| 274 | 274 |
| 275 type txnMutation struct { | 275 type txnMutation struct { |
| 276 » key rds.Key | 276 » key ds.Key |
| 277 » data rds.PropertyMap | 277 » data ds.PropertyMap |
| 278 } | 278 } |
| 279 | 279 |
| 280 type txnDataStoreData struct { | 280 type txnDataStoreData struct { |
| 281 sync.Mutex | 281 sync.Mutex |
| 282 | 282 |
| 283 parent *dataStoreData | 283 parent *dataStoreData |
| 284 | 284 |
| 285 // boolean 0 or 1, use atomic.*Int32 to access. | 285 // boolean 0 or 1, use atomic.*Int32 to access. |
| 286 closed int32 | 286 closed int32 |
| 287 isXG bool | 287 isXG bool |
| (...skipping 13 matching lines...) Expand all Loading... |
| 301 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 301 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
| 302 func (td *txnDataStoreData) endTxn() { | 302 func (td *txnDataStoreData) endTxn() { |
| 303 if atomic.LoadInt32(&td.closed) == 1 { | 303 if atomic.LoadInt32(&td.closed) == 1 { |
| 304 panic("cannot end transaction twice") | 304 panic("cannot end transaction twice") |
| 305 } | 305 } |
| 306 atomic.StoreInt32(&td.closed, 1) | 306 atomic.StoreInt32(&td.closed, 1) |
| 307 } | 307 } |
| 308 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 308 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
| 309 panic("txnDataStoreData cannot apply transactions") | 309 panic("txnDataStoreData cannot apply transactions") |
| 310 } | 310 } |
| 311 func (*txnDataStoreData) mkTxn(*rds.TransactionOptions) memContextObj { | 311 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
| 312 panic("impossible") | 312 panic("impossible") |
| 313 } | 313 } |
| 314 | 314 |
| 315 func (td *txnDataStoreData) run(f func() error) error { | 315 func (td *txnDataStoreData) run(f func() error) error { |
| 316 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 316 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
| 317 // this here, where in the SDK only datastore.transaction.Call does. | 317 // this here, where in the SDK only datastore.transaction.Call does. |
| 318 if atomic.LoadInt32(&td.closed) == 1 { | 318 if atomic.LoadInt32(&td.closed) == 1 { |
| 319 return errors.New("datastore: transaction context has expired") | 319 return errors.New("datastore: transaction context has expired") |
| 320 } | 320 } |
| 321 return f() | 321 return f() |
| 322 } | 322 } |
| 323 | 323 |
| 324 // writeMutation ensures that this transaction can support the given key/value | 324 // writeMutation ensures that this transaction can support the given key/value |
| 325 // mutation. | 325 // mutation. |
| 326 // | 326 // |
| 327 // if getOnly is true, don't record the actual mutation data, just ensure that | 327 // if getOnly is true, don't record the actual mutation data, just ensure that |
| 328 // the key is in an included entity group (or add an empty entry for tha
t | 328 // the key is in an included entity group (or add an empty entry for tha
t |
| 329 // group). | 329 // group). |
| 330 // | 330 // |
| 331 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 331 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
| 332 // | 332 // |
| 333 // Returns an error if this key causes the transaction to cross too many entity | 333 // Returns an error if this key causes the transaction to cross too many entity |
| 334 // groups. | 334 // groups. |
| 335 func (td *txnDataStoreData) writeMutation(getOnly bool, key rds.Key, data rds.Pr
opertyMap) error { | 335 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { |
| 336 » rk := string(keyBytes(rds.WithContext, rds.KeyRoot(key))) | 336 » rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key))) |
| 337 | 337 |
| 338 td.Lock() | 338 td.Lock() |
| 339 defer td.Unlock() | 339 defer td.Unlock() |
| 340 | 340 |
| 341 if _, ok := td.muts[rk]; !ok { | 341 if _, ok := td.muts[rk]; !ok { |
| 342 limit := 1 | 342 limit := 1 |
| 343 if td.isXG { | 343 if td.isXG { |
| 344 limit = xgEGLimit | 344 limit = xgEGLimit |
| 345 } | 345 } |
| 346 if len(td.muts)+1 > limit { | 346 if len(td.muts)+1 > limit { |
| 347 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | 347 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" |
| 348 if td.isXG { | 348 if td.isXG { |
| 349 msg = "operating on too many entity groups in a
single transaction" | 349 msg = "operating on too many entity groups in a
single transaction" |
| 350 } | 350 } |
| 351 return errors.New(msg) | 351 return errors.New(msg) |
| 352 } | 352 } |
| 353 td.muts[rk] = []txnMutation{} | 353 td.muts[rk] = []txnMutation{} |
| 354 } | 354 } |
| 355 if !getOnly { | 355 if !getOnly { |
| 356 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 356 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
| 357 } | 357 } |
| 358 | 358 |
| 359 return nil | 359 return nil |
| 360 } | 360 } |
| 361 | 361 |
| 362 func (td *txnDataStoreData) putMulti(keys []rds.Key, vals []rds.PropertyLoadSave
r, cb rds.PutMultiCB) { | 362 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds
.PutMultiCB) { |
| 363 for i, k := range keys { | 363 for i, k := range keys { |
| 364 func() { | 364 func() { |
| 365 td.parent.Lock() | 365 td.parent.Lock() |
| 366 defer td.parent.Unlock() | 366 defer td.parent.Unlock() |
| 367 _, k = td.parent.entsKeyLocked(k) | 367 _, k = td.parent.entsKeyLocked(k) |
| 368 }() | 368 }() |
| 369 » » err := td.writeMutation(false, k, vals[i].(rds.PropertyMap)) | 369 » » err := td.writeMutation(false, k, vals[i]) |
| 370 if cb != nil { | 370 if cb != nil { |
| 371 cb(k, err) | 371 cb(k, err) |
| 372 } | 372 } |
| 373 } | 373 } |
| 374 } | 374 } |
| 375 | 375 |
| 376 func (td *txnDataStoreData) getMulti(keys []rds.Key, cb rds.GetMultiCB) error { | 376 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
| 377 return getMultiInner(keys, cb, func() (*memCollection, error) { | 377 return getMultiInner(keys, cb, func() (*memCollection, error) { |
| 378 err := error(nil) | 378 err := error(nil) |
| 379 for _, key := range keys { | 379 for _, key := range keys { |
| 380 err = td.writeMutation(true, key, nil) | 380 err = td.writeMutation(true, key, nil) |
| 381 if err != nil { | 381 if err != nil { |
| 382 return nil, err | 382 return nil, err |
| 383 } | 383 } |
| 384 } | 384 } |
| 385 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil | 385 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil |
| 386 }) | 386 }) |
| 387 } | 387 } |
| 388 | 388 |
| 389 func (td *txnDataStoreData) delMulti(keys []rds.Key, cb rds.DeleteMultiCB) error
{ | 389 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
| 390 for _, k := range keys { | 390 for _, k := range keys { |
| 391 err := td.writeMutation(false, k, nil) | 391 err := td.writeMutation(false, k, nil) |
| 392 if cb != nil { | 392 if cb != nil { |
| 393 cb(err) | 393 cb(err) |
| 394 } | 394 } |
| 395 } | 395 } |
| 396 return nil | 396 return nil |
| 397 } | 397 } |
| 398 | 398 |
| 399 func keyBytes(ctx rds.KeyContext, key rds.Key) []byte { | 399 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte { |
| 400 buf := &bytes.Buffer{} | 400 buf := &bytes.Buffer{} |
| 401 » rds.WriteKey(buf, ctx, key) | 401 » ds.WriteKey(buf, ctx, key) |
| 402 return buf.Bytes() | 402 return buf.Bytes() |
| 403 } | 403 } |
| 404 | 404 |
| 405 func rpmWoCtx(data []byte, ns string) (rds.PropertyMap, error) { | 405 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
| 406 » ret := rds.PropertyMap{} | 406 » ret := ds.PropertyMap{} |
| 407 » err := ret.Read(bytes.NewBuffer(data), rds.WithoutContext, globalAppID,
ns) | 407 » err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n
s) |
| 408 return ret, err | 408 return ret, err |
| 409 } | 409 } |
| 410 | 410 |
| 411 func rpm(data []byte) (rds.PropertyMap, error) { | 411 func rpm(data []byte) (ds.PropertyMap, error) { |
| 412 » ret := rds.PropertyMap{} | 412 » ret := ds.PropertyMap{} |
| 413 » err := ret.Read(bytes.NewBuffer(data), rds.WithContext, "", "") | 413 » err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "") |
| 414 return ret, err | 414 return ret, err |
| 415 } | 415 } |
| 416 | 416 |
| 417 type keyitem interface { | 417 type keyitem interface { |
| 418 » Key() rds.Key | 418 » Key() ds.Key |
| 419 } | 419 } |
| OLD | NEW |