OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
11 "sync/atomic" | 11 "sync/atomic" |
12 | 12 |
13 » rds "github.com/luci/gae/service/rawdatastore" | 13 » ds "github.com/luci/gae/service/datastore" |
14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
16 ) | 16 ) |
17 | 17 |
18 //////////////////////////////// dataStoreData ///////////////////////////////// | 18 //////////////////////////////// dataStoreData ///////////////////////////////// |
19 | 19 |
20 type dataStoreData struct { | 20 type dataStoreData struct { |
21 rwlock sync.RWMutex | 21 rwlock sync.RWMutex |
22 // See README.md for store schema. | 22 // See README.md for store schema. |
23 store *memStore | 23 store *memStore |
(...skipping 16 matching lines...) Expand all Loading... |
40 func (d *dataStoreData) Lock() { | 40 func (d *dataStoreData) Lock() { |
41 d.rwlock.Lock() | 41 d.rwlock.Lock() |
42 } | 42 } |
43 | 43 |
44 func (d *dataStoreData) Unlock() { | 44 func (d *dataStoreData) Unlock() { |
45 d.rwlock.Unlock() | 45 d.rwlock.Unlock() |
46 } | 46 } |
47 | 47 |
48 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 48 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
49 | 49 |
50 func groupMetaKey(key rds.Key) []byte { | 50 func groupMetaKey(key ds.Key) []byte { |
51 » return keyBytes(rds.WithoutContext, | 51 » return keyBytes(ds.WithoutContext, |
52 » » rds.NewKey("", "", "__entity_group__", "", 1, rds.KeyRoot(key))) | 52 » » ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key))) |
53 } | 53 } |
54 | 54 |
55 func groupIDsKey(key rds.Key) []byte { | 55 func groupIDsKey(key ds.Key) []byte { |
56 » return keyBytes(rds.WithoutContext, | 56 » return keyBytes(ds.WithoutContext, |
57 » » rds.NewKey("", "", "__entity_group_ids__", "", 1, rds.KeyRoot(ke
y))) | 57 » » ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key)
)) |
58 } | 58 } |
59 | 59 |
60 func rootIDsKey(kind string) []byte { | 60 func rootIDsKey(kind string) []byte { |
61 » return keyBytes(rds.WithoutContext, | 61 » return keyBytes(ds.WithoutContext, |
62 » » rds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | 62 » » ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) |
63 } | 63 } |
64 | 64 |
65 func curVersion(ents *memCollection, key []byte) int64 { | 65 func curVersion(ents *memCollection, key []byte) int64 { |
66 if v := ents.Get(key); v != nil { | 66 if v := ents.Get(key); v != nil { |
67 pm, err := rpm(v) | 67 pm, err := rpm(v) |
68 if err != nil { | 68 if err != nil { |
69 panic(err) // memory corruption | 69 panic(err) // memory corruption |
70 } | 70 } |
71 pl, ok := pm["__version__"] | 71 pl, ok := pm["__version__"] |
72 » » if ok && len(pl) > 0 && pl[0].Type() == rds.PTInt { | 72 » » if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
73 return pl[0].Value().(int64) | 73 return pl[0].Value().(int64) |
74 } | 74 } |
75 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) | 75 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm
)) |
76 } | 76 } |
77 return 0 | 77 return 0 |
78 } | 78 } |
79 | 79 |
80 func incrementLocked(ents *memCollection, key []byte) int64 { | 80 func incrementLocked(ents *memCollection, key []byte) int64 { |
81 ret := curVersion(ents, key) + 1 | 81 ret := curVersion(ents, key) + 1 |
82 buf := &bytes.Buffer{} | 82 buf := &bytes.Buffer{} |
83 » rds.PropertyMap{"__version__": {rds.MkPropertyNI(ret)}}.Write( | 83 » ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write( |
84 » » buf, rds.WithContext) | 84 » » buf, ds.WithContext) |
85 ents.Set(key, buf.Bytes()) | 85 ents.Set(key, buf.Bytes()) |
86 return ret | 86 return ret |
87 } | 87 } |
88 | 88 |
89 func (d *dataStoreData) entsKeyLocked(key rds.Key) (*memCollection, rds.Key) { | 89 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
90 coll := "ents:" + key.Namespace() | 90 coll := "ents:" + key.Namespace() |
91 ents := d.store.GetCollection(coll) | 91 ents := d.store.GetCollection(coll) |
92 if ents == nil { | 92 if ents == nil { |
93 ents = d.store.SetCollection(coll, nil) | 93 ents = d.store.SetCollection(coll, nil) |
94 } | 94 } |
95 | 95 |
96 » if rds.KeyIncomplete(key) { | 96 » if ds.KeyIncomplete(key) { |
97 idKey := []byte(nil) | 97 idKey := []byte(nil) |
98 if key.Parent() == nil { | 98 if key.Parent() == nil { |
99 idKey = rootIDsKey(key.Kind()) | 99 idKey = rootIDsKey(key.Kind()) |
100 } else { | 100 } else { |
101 idKey = groupIDsKey(key) | 101 idKey = groupIDsKey(key) |
102 } | 102 } |
103 id := incrementLocked(ents, idKey) | 103 id := incrementLocked(ents, idKey) |
104 » » key = rds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", i
d, key.Parent()) | 104 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) |
105 } | 105 } |
106 | 106 |
107 return ents, key | 107 return ents, key |
108 } | 108 } |
109 | 109 |
110 func (d *dataStoreData) putMulti(keys []rds.Key, vals []rds.PropertyLoadSaver, c
b rds.PutMultiCB) { | 110 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { |
111 for i, k := range keys { | 111 for i, k := range keys { |
112 buf := &bytes.Buffer{} | 112 buf := &bytes.Buffer{} |
113 » » pmap := vals[i].(rds.PropertyMap) | 113 » » pmap, _ := vals[i].Save(false) |
114 » » pmap.Write(buf, rds.WithoutContext) | 114 » » pmap.Write(buf, ds.WithoutContext) |
115 dataBytes := buf.Bytes() | 115 dataBytes := buf.Bytes() |
116 | 116 |
117 » » k, err := func() (ret rds.Key, err error) { | 117 » » k, err := func() (ret ds.Key, err error) { |
118 d.rwlock.Lock() | 118 d.rwlock.Lock() |
119 defer d.rwlock.Unlock() | 119 defer d.rwlock.Unlock() |
120 | 120 |
121 ents, ret := d.entsKeyLocked(k) | 121 ents, ret := d.entsKeyLocked(k) |
122 incrementLocked(ents, groupMetaKey(ret)) | 122 incrementLocked(ents, groupMetaKey(ret)) |
123 | 123 |
124 » » » old := ents.Get(keyBytes(rds.WithoutContext, ret)) | 124 » » » old := ents.Get(keyBytes(ds.WithoutContext, ret)) |
125 » » » oldPM := rds.PropertyMap(nil) | 125 » » » oldPM := ds.PropertyMap(nil) |
126 if old != nil { | 126 if old != nil { |
127 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | 127 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { |
128 return | 128 return |
129 } | 129 } |
130 } | 130 } |
131 updateIndicies(d.store, ret, oldPM, pmap) | 131 updateIndicies(d.store, ret, oldPM, pmap) |
132 » » » ents.Set(keyBytes(rds.WithoutContext, ret), dataBytes) | 132 » » » ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes) |
133 return | 133 return |
134 }() | 134 }() |
135 if cb != nil { | 135 if cb != nil { |
136 cb(k, err) | 136 cb(k, err) |
137 } | 137 } |
138 } | 138 } |
139 } | 139 } |
140 | 140 |
141 func getMultiInner(keys []rds.Key, cb rds.GetMultiCB, getColl func() (*memCollec
tion, error)) error { | 141 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { |
142 ents, err := getColl() | 142 ents, err := getColl() |
143 if err != nil { | 143 if err != nil { |
144 return err | 144 return err |
145 } | 145 } |
146 if ents == nil { | 146 if ents == nil { |
147 for range keys { | 147 for range keys { |
148 » » » cb(nil, rds.ErrNoSuchEntity) | 148 » » » cb(nil, ds.ErrNoSuchEntity) |
149 } | 149 } |
150 return nil | 150 return nil |
151 } | 151 } |
152 | 152 |
153 for _, k := range keys { | 153 for _, k := range keys { |
154 » » pdata := ents.Get(keyBytes(rds.WithoutContext, k)) | 154 » » pdata := ents.Get(keyBytes(ds.WithoutContext, k)) |
155 if pdata == nil { | 155 if pdata == nil { |
156 » » » cb(nil, rds.ErrNoSuchEntity) | 156 » » » cb(nil, ds.ErrNoSuchEntity) |
157 continue | 157 continue |
158 } | 158 } |
159 cb(rpmWoCtx(pdata, k.Namespace())) | 159 cb(rpmWoCtx(pdata, k.Namespace())) |
160 } | 160 } |
161 return nil | 161 return nil |
162 } | 162 } |
163 | 163 |
164 func (d *dataStoreData) getMulti(keys []rds.Key, cb rds.GetMultiCB) error { | 164 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
165 getMultiInner(keys, cb, func() (*memCollection, error) { | 165 getMultiInner(keys, cb, func() (*memCollection, error) { |
166 d.rwlock.RLock() | 166 d.rwlock.RLock() |
167 s := d.store.Snapshot() | 167 s := d.store.Snapshot() |
168 d.rwlock.RUnlock() | 168 d.rwlock.RUnlock() |
169 | 169 |
170 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 170 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
171 }) | 171 }) |
172 return nil | 172 return nil |
173 } | 173 } |
174 | 174 |
175 func (d *dataStoreData) delMulti(keys []rds.Key, cb rds.DeleteMultiCB) { | 175 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
176 toDel := make([][]byte, 0, len(keys)) | 176 toDel := make([][]byte, 0, len(keys)) |
177 for _, k := range keys { | 177 for _, k := range keys { |
178 » » toDel = append(toDel, keyBytes(rds.WithoutContext, k)) | 178 » » toDel = append(toDel, keyBytes(ds.WithoutContext, k)) |
179 } | 179 } |
180 ns := keys[0].Namespace() | 180 ns := keys[0].Namespace() |
181 | 181 |
182 d.rwlock.Lock() | 182 d.rwlock.Lock() |
183 defer d.rwlock.Unlock() | 183 defer d.rwlock.Unlock() |
184 | 184 |
185 ents := d.store.GetCollection("ents:" + ns) | 185 ents := d.store.GetCollection("ents:" + ns) |
186 | 186 |
187 for i, k := range keys { | 187 for i, k := range keys { |
188 if ents != nil { | 188 if ents != nil { |
(...skipping 18 matching lines...) Expand all Loading... |
207 } | 207 } |
208 | 208 |
209 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 209 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
210 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 210 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
211 | 211 |
212 txn := obj.(*txnDataStoreData) | 212 txn := obj.(*txnDataStoreData) |
213 for rk, muts := range txn.muts { | 213 for rk, muts := range txn.muts { |
214 if len(muts) == 0 { // read-only | 214 if len(muts) == 0 { // read-only |
215 continue | 215 continue |
216 } | 216 } |
217 » » k, err := rds.ReadKey(bytes.NewBufferString(rk), rds.WithContext
, "", "") | 217 » » k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext,
"", "") |
218 if err != nil { | 218 if err != nil { |
219 panic(err) | 219 panic(err) |
220 } | 220 } |
221 | 221 |
222 entKey := "ents:" + k.Namespace() | 222 entKey := "ents:" + k.Namespace() |
223 mkey := groupMetaKey(k) | 223 mkey := groupMetaKey(k) |
224 entsHead := d.store.GetCollection(entKey) | 224 entsHead := d.store.GetCollection(entKey) |
225 entsSnap := txn.snap.GetCollection(entKey) | 225 entsSnap := txn.snap.GetCollection(entKey) |
226 vHead := curVersion(entsHead, mkey) | 226 vHead := curVersion(entsHead, mkey) |
227 vSnap := curVersion(entsSnap, mkey) | 227 vSnap := curVersion(entsSnap, mkey) |
228 if vHead != vSnap { | 228 if vHead != vSnap { |
229 return false | 229 return false |
230 } | 230 } |
231 } | 231 } |
232 return true | 232 return true |
233 } | 233 } |
234 | 234 |
235 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | 235 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
236 txn := obj.(*txnDataStoreData) | 236 txn := obj.(*txnDataStoreData) |
237 for _, muts := range txn.muts { | 237 for _, muts := range txn.muts { |
238 if len(muts) == 0 { // read-only | 238 if len(muts) == 0 { // read-only |
239 continue | 239 continue |
240 } | 240 } |
241 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul
ti | 241 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul
ti |
242 for _, m := range muts { | 242 for _, m := range muts { |
243 err := error(nil) | 243 err := error(nil) |
244 k := m.key | 244 k := m.key |
245 if m.data == nil { | 245 if m.data == nil { |
246 » » » » d.delMulti([]rds.Key{k}, | 246 » » » » d.delMulti([]ds.Key{k}, |
247 func(e error) { err = e }) | 247 func(e error) { err = e }) |
248 } else { | 248 } else { |
249 » » » » d.putMulti([]rds.Key{m.key}, []rds.PropertyLoadS
aver{m.data}, | 249 » » » » d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d
ata}, |
250 » » » » » func(_ rds.Key, e error) { err = e }) | 250 » » » » » func(_ ds.Key, e error) { err = e }) |
251 } | 251 } |
252 err = errors.SingleError(err) | 252 err = errors.SingleError(err) |
253 if err != nil { | 253 if err != nil { |
254 panic(err) | 254 panic(err) |
255 } | 255 } |
256 } | 256 } |
257 } | 257 } |
258 } | 258 } |
259 | 259 |
260 func (d *dataStoreData) mkTxn(o *rds.TransactionOptions) memContextObj { | 260 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
261 return &txnDataStoreData{ | 261 return &txnDataStoreData{ |
262 // alias to the main datastore's so that testing code can have p
rimitive | 262 // alias to the main datastore's so that testing code can have p
rimitive |
263 // access to break features inside of transactions. | 263 // access to break features inside of transactions. |
264 parent: d, | 264 parent: d, |
265 isXG: o != nil && o.XG, | 265 isXG: o != nil && o.XG, |
266 snap: d.store.Snapshot(), | 266 snap: d.store.Snapshot(), |
267 muts: map[string][]txnMutation{}, | 267 muts: map[string][]txnMutation{}, |
268 } | 268 } |
269 } | 269 } |
270 | 270 |
271 func (d *dataStoreData) endTxn() {} | 271 func (d *dataStoreData) endTxn() {} |
272 | 272 |
273 /////////////////////////////// txnDataStoreData /////////////////////////////// | 273 /////////////////////////////// txnDataStoreData /////////////////////////////// |
274 | 274 |
275 type txnMutation struct { | 275 type txnMutation struct { |
276 » key rds.Key | 276 » key ds.Key |
277 » data rds.PropertyMap | 277 » data ds.PropertyMap |
278 } | 278 } |
279 | 279 |
280 type txnDataStoreData struct { | 280 type txnDataStoreData struct { |
281 sync.Mutex | 281 sync.Mutex |
282 | 282 |
283 parent *dataStoreData | 283 parent *dataStoreData |
284 | 284 |
285 // boolean 0 or 1, use atomic.*Int32 to access. | 285 // boolean 0 or 1, use atomic.*Int32 to access. |
286 closed int32 | 286 closed int32 |
287 isXG bool | 287 isXG bool |
(...skipping 13 matching lines...) Expand all Loading... |
301 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 301 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
302 func (td *txnDataStoreData) endTxn() { | 302 func (td *txnDataStoreData) endTxn() { |
303 if atomic.LoadInt32(&td.closed) == 1 { | 303 if atomic.LoadInt32(&td.closed) == 1 { |
304 panic("cannot end transaction twice") | 304 panic("cannot end transaction twice") |
305 } | 305 } |
306 atomic.StoreInt32(&td.closed, 1) | 306 atomic.StoreInt32(&td.closed, 1) |
307 } | 307 } |
308 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 308 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
309 panic("txnDataStoreData cannot apply transactions") | 309 panic("txnDataStoreData cannot apply transactions") |
310 } | 310 } |
311 func (*txnDataStoreData) mkTxn(*rds.TransactionOptions) memContextObj { | 311 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
312 panic("impossible") | 312 panic("impossible") |
313 } | 313 } |
314 | 314 |
315 func (td *txnDataStoreData) run(f func() error) error { | 315 func (td *txnDataStoreData) run(f func() error) error { |
316 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 316 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
317 // this here, where in the SDK only datastore.transaction.Call does. | 317 // this here, where in the SDK only datastore.transaction.Call does. |
318 if atomic.LoadInt32(&td.closed) == 1 { | 318 if atomic.LoadInt32(&td.closed) == 1 { |
319 return errors.New("datastore: transaction context has expired") | 319 return errors.New("datastore: transaction context has expired") |
320 } | 320 } |
321 return f() | 321 return f() |
322 } | 322 } |
323 | 323 |
324 // writeMutation ensures that this transaction can support the given key/value | 324 // writeMutation ensures that this transaction can support the given key/value |
325 // mutation. | 325 // mutation. |
326 // | 326 // |
327 // if getOnly is true, don't record the actual mutation data, just ensure that | 327 // if getOnly is true, don't record the actual mutation data, just ensure that |
328 // the key is in an included entity group (or add an empty entry for tha
t | 328 // the key is in an included entity group (or add an empty entry for tha
t |
329 // group). | 329 // group). |
330 // | 330 // |
331 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 331 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
332 // | 332 // |
333 // Returns an error if this key causes the transaction to cross too many entity | 333 // Returns an error if this key causes the transaction to cross too many entity |
334 // groups. | 334 // groups. |
335 func (td *txnDataStoreData) writeMutation(getOnly bool, key rds.Key, data rds.Pr
opertyMap) error { | 335 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { |
336 » rk := string(keyBytes(rds.WithContext, rds.KeyRoot(key))) | 336 » rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key))) |
337 | 337 |
338 td.Lock() | 338 td.Lock() |
339 defer td.Unlock() | 339 defer td.Unlock() |
340 | 340 |
341 if _, ok := td.muts[rk]; !ok { | 341 if _, ok := td.muts[rk]; !ok { |
342 limit := 1 | 342 limit := 1 |
343 if td.isXG { | 343 if td.isXG { |
344 limit = xgEGLimit | 344 limit = xgEGLimit |
345 } | 345 } |
346 if len(td.muts)+1 > limit { | 346 if len(td.muts)+1 > limit { |
347 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | 347 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" |
348 if td.isXG { | 348 if td.isXG { |
349 msg = "operating on too many entity groups in a
single transaction" | 349 msg = "operating on too many entity groups in a
single transaction" |
350 } | 350 } |
351 return errors.New(msg) | 351 return errors.New(msg) |
352 } | 352 } |
353 td.muts[rk] = []txnMutation{} | 353 td.muts[rk] = []txnMutation{} |
354 } | 354 } |
355 if !getOnly { | 355 if !getOnly { |
356 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 356 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
357 } | 357 } |
358 | 358 |
359 return nil | 359 return nil |
360 } | 360 } |
361 | 361 |
362 func (td *txnDataStoreData) putMulti(keys []rds.Key, vals []rds.PropertyLoadSave
r, cb rds.PutMultiCB) { | 362 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds
.PutMultiCB) { |
363 for i, k := range keys { | 363 for i, k := range keys { |
364 func() { | 364 func() { |
365 td.parent.Lock() | 365 td.parent.Lock() |
366 defer td.parent.Unlock() | 366 defer td.parent.Unlock() |
367 _, k = td.parent.entsKeyLocked(k) | 367 _, k = td.parent.entsKeyLocked(k) |
368 }() | 368 }() |
369 » » err := td.writeMutation(false, k, vals[i].(rds.PropertyMap)) | 369 » » err := td.writeMutation(false, k, vals[i]) |
370 if cb != nil { | 370 if cb != nil { |
371 cb(k, err) | 371 cb(k, err) |
372 } | 372 } |
373 } | 373 } |
374 } | 374 } |
375 | 375 |
376 func (td *txnDataStoreData) getMulti(keys []rds.Key, cb rds.GetMultiCB) error { | 376 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
377 return getMultiInner(keys, cb, func() (*memCollection, error) { | 377 return getMultiInner(keys, cb, func() (*memCollection, error) { |
378 err := error(nil) | 378 err := error(nil) |
379 for _, key := range keys { | 379 for _, key := range keys { |
380 err = td.writeMutation(true, key, nil) | 380 err = td.writeMutation(true, key, nil) |
381 if err != nil { | 381 if err != nil { |
382 return nil, err | 382 return nil, err |
383 } | 383 } |
384 } | 384 } |
385 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil | 385 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil |
386 }) | 386 }) |
387 } | 387 } |
388 | 388 |
389 func (td *txnDataStoreData) delMulti(keys []rds.Key, cb rds.DeleteMultiCB) error
{ | 389 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
390 for _, k := range keys { | 390 for _, k := range keys { |
391 err := td.writeMutation(false, k, nil) | 391 err := td.writeMutation(false, k, nil) |
392 if cb != nil { | 392 if cb != nil { |
393 cb(err) | 393 cb(err) |
394 } | 394 } |
395 } | 395 } |
396 return nil | 396 return nil |
397 } | 397 } |
398 | 398 |
399 func keyBytes(ctx rds.KeyContext, key rds.Key) []byte { | 399 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte { |
400 buf := &bytes.Buffer{} | 400 buf := &bytes.Buffer{} |
401 » rds.WriteKey(buf, ctx, key) | 401 » ds.WriteKey(buf, ctx, key) |
402 return buf.Bytes() | 402 return buf.Bytes() |
403 } | 403 } |
404 | 404 |
405 func rpmWoCtx(data []byte, ns string) (rds.PropertyMap, error) { | 405 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
406 » ret := rds.PropertyMap{} | 406 » ret := ds.PropertyMap{} |
407 » err := ret.Read(bytes.NewBuffer(data), rds.WithoutContext, globalAppID,
ns) | 407 » err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n
s) |
408 return ret, err | 408 return ret, err |
409 } | 409 } |
410 | 410 |
411 func rpm(data []byte) (rds.PropertyMap, error) { | 411 func rpm(data []byte) (ds.PropertyMap, error) { |
412 » ret := rds.PropertyMap{} | 412 » ret := ds.PropertyMap{} |
413 » err := ret.Read(bytes.NewBuffer(data), rds.WithContext, "", "") | 413 » err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "") |
414 return ret, err | 414 return ret, err |
415 } | 415 } |
416 | 416 |
417 type keyitem interface { | 417 type keyitem interface { |
418 » Key() rds.Key | 418 » Key() ds.Key |
419 } | 419 } |
OLD | NEW |