Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(473)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1355783002: Refactor keys and queries in datastore service and implementation. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/dskey"
15 "github.com/luci/gae/service/datastore/serialize" 14 "github.com/luci/gae/service/datastore/serialize"
16 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
17 "golang.org/x/net/context" 16 "golang.org/x/net/context"
18 ) 17 )
19 18
20 //////////////////////////////// dataStoreData ///////////////////////////////// 19 //////////////////////////////// dataStoreData /////////////////////////////////
21 20
22 type dataStoreData struct { 21 type dataStoreData struct {
23 rwlock sync.RWMutex 22 rwlock sync.RWMutex
24 // See README.md for head schema. 23 // See README.md for head schema.
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
74 } 73 }
75 74
76 func (d *dataStoreData) catchupIndexes() { 75 func (d *dataStoreData) catchupIndexes() {
77 d.rwlock.Lock() 76 d.rwlock.Lock()
78 defer d.rwlock.Unlock() 77 defer d.rwlock.Unlock()
79 d.snap = d.head.Snapshot() 78 d.snap = d.head.Snapshot()
80 } 79 }
81 80
82 /////////////////////////// indexes(dataStoreData) //////////////////////////// 81 /////////////////////////// indexes(dataStoreData) ////////////////////////////
83 82
84 func groupMetaKey(key ds.Key) []byte { 83 func groupMetaKey(key *ds.Key) []byte {
85 » return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root( key))) 84 » return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) )
86 } 85 }
87 86
88 func groupIDsKey(key ds.Key) []byte { 87 func groupIDsKey(key *ds.Key) []byte {
89 » return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.R oot(key))) 88 » return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo t()))
90 } 89 }
91 90
92 func rootIDsKey(kind string) []byte { 91 func rootIDsKey(kind string) []byte {
93 » return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) 92 » return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil))
94 } 93 }
95 94
96 func curVersion(ents *memCollection, key []byte) int64 { 95 func curVersion(ents *memCollection, key []byte) int64 {
97 if ents != nil { 96 if ents != nil {
98 if v := ents.Get(key); v != nil { 97 if v := ents.Get(key); v != nil {
99 pm, err := rpm(v) 98 pm, err := rpm(v)
100 memoryCorruption(err) 99 memoryCorruption(err)
101 100
102 pl, ok := pm["__version__"] 101 pl, ok := pm["__version__"]
103 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { 102 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
104 return pl[0].Value().(int64) 103 return pl[0].Value().(int64)
105 } 104 }
106 105
107 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm)) 106 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm))
108 } 107 }
109 } 108 }
110 return 0 109 return 0
111 } 110 }
112 111
113 func incrementLocked(ents *memCollection, key []byte) int64 { 112 func incrementLocked(ents *memCollection, key []byte) int64 {
114 ret := curVersion(ents, key) + 1 113 ret := curVersion(ents, key) + 1
115 ents.Set(key, serialize.ToBytes(ds.PropertyMap{ 114 ents.Set(key, serialize.ToBytes(ds.PropertyMap{
116 "__version__": {ds.MkPropertyNI(ret)}, 115 "__version__": {ds.MkPropertyNI(ret)},
117 })) 116 }))
118 return ret 117 return ret
119 } 118 }
120 119
121 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { 120 func (d *dataStoreData) entsKeyLocked(key *ds.Key) (*memCollection, *ds.Key) {
122 coll := "ents:" + key.Namespace() 121 coll := "ents:" + key.Namespace()
123 ents := d.head.GetCollection(coll) 122 ents := d.head.GetCollection(coll)
124 if ents == nil { 123 if ents == nil {
125 ents = d.head.SetCollection(coll, nil) 124 ents = d.head.SetCollection(coll, nil)
126 } 125 }
127 126
128 » if dskey.Incomplete(key) { 127 » if key.Incomplete() {
129 idKey := []byte(nil) 128 idKey := []byte(nil)
130 if key.Parent() == nil { 129 if key.Parent() == nil {
131 » » » idKey = rootIDsKey(key.Kind()) 130 » » » idKey = rootIDsKey(key.Last().Kind)
132 } else { 131 } else {
133 idKey = groupIDsKey(key) 132 idKey = groupIDsKey(key)
134 } 133 }
135 id := incrementLocked(ents, idKey) 134 id := incrementLocked(ents, idKey)
136 » » key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) 135 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Last().Kind, " ", id, key.Parent())
137 } 136 }
138 137
139 return ents, key 138 return ents, key
140 } 139 }
141 140
142 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { 141 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) {
143 for i, k := range keys { 142 for i, k := range keys {
144 pmap, _ := vals[i].Save(false) 143 pmap, _ := vals[i].Save(false)
145 dataBytes := serialize.ToBytes(pmap) 144 dataBytes := serialize.ToBytes(pmap)
146 145
147 » » k, err := func() (ret ds.Key, err error) { 146 » » k, err := func() (ret *ds.Key, err error) {
148 d.rwlock.Lock() 147 d.rwlock.Lock()
149 defer d.rwlock.Unlock() 148 defer d.rwlock.Unlock()
150 149
151 ents, ret := d.entsKeyLocked(k) 150 ents, ret := d.entsKeyLocked(k)
152 incrementLocked(ents, groupMetaKey(ret)) 151 incrementLocked(ents, groupMetaKey(ret))
153 152
154 old := ents.Get(keyBytes(ret)) 153 old := ents.Get(keyBytes(ret))
155 oldPM := ds.PropertyMap(nil) 154 oldPM := ds.PropertyMap(nil)
156 if old != nil { 155 if old != nil {
157 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { 156 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
158 return 157 return
159 } 158 }
160 } 159 }
161 updateIndexes(d.head, ret, oldPM, pmap) 160 updateIndexes(d.head, ret, oldPM, pmap)
162 ents.Set(keyBytes(ret), dataBytes) 161 ents.Set(keyBytes(ret), dataBytes)
163 return 162 return
164 }() 163 }()
165 if cb != nil { 164 if cb != nil {
166 cb(k, err) 165 cb(k, err)
167 } 166 }
168 } 167 }
169 } 168 }
170 169
171 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { 170 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error {
172 ents, err := getColl() 171 ents, err := getColl()
173 if err != nil { 172 if err != nil {
174 return err 173 return err
175 } 174 }
176 if ents == nil { 175 if ents == nil {
177 for range keys { 176 for range keys {
178 cb(nil, ds.ErrNoSuchEntity) 177 cb(nil, ds.ErrNoSuchEntity)
179 } 178 }
180 return nil 179 return nil
181 } 180 }
182 181
183 for _, k := range keys { 182 for _, k := range keys {
184 pdata := ents.Get(keyBytes(k)) 183 pdata := ents.Get(keyBytes(k))
185 if pdata == nil { 184 if pdata == nil {
186 cb(nil, ds.ErrNoSuchEntity) 185 cb(nil, ds.ErrNoSuchEntity)
187 continue 186 continue
188 } 187 }
189 cb(rpmWoCtx(pdata, k.Namespace())) 188 cb(rpmWoCtx(pdata, k.Namespace()))
190 } 189 }
191 return nil 190 return nil
192 } 191 }
193 192
194 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { 193 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
195 getMultiInner(keys, cb, func() (*memCollection, error) { 194 getMultiInner(keys, cb, func() (*memCollection, error) {
196 s := d.takeSnapshot() 195 s := d.takeSnapshot()
197 196
198 return s.GetCollection("ents:" + keys[0].Namespace()), nil 197 return s.GetCollection("ents:" + keys[0].Namespace()), nil
199 }) 198 })
200 return nil 199 return nil
201 } 200 }
202 201
203 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { 202 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) {
204 toDel := make([][]byte, 0, len(keys)) 203 toDel := make([][]byte, 0, len(keys))
205 for _, k := range keys { 204 for _, k := range keys {
206 toDel = append(toDel, keyBytes(k)) 205 toDel = append(toDel, keyBytes(k))
207 } 206 }
208 ns := keys[0].Namespace() 207 ns := keys[0].Namespace()
209 208
210 d.rwlock.Lock() 209 d.rwlock.Lock()
211 defer d.rwlock.Unlock() 210 defer d.rwlock.Unlock()
212 211
213 ents := d.head.GetCollection("ents:" + ns) 212 ents := d.head.GetCollection("ents:" + ns)
(...skipping 24 matching lines...) Expand all
238 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 237 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
239 238
240 txn := obj.(*txnDataStoreData) 239 txn := obj.(*txnDataStoreData)
241 for rk, muts := range txn.muts { 240 for rk, muts := range txn.muts {
242 if len(muts) == 0 { // read-only 241 if len(muts) == 0 { // read-only
243 continue 242 continue
244 } 243 }
245 prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "") 244 prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "")
246 memoryCorruption(err) 245 memoryCorruption(err)
247 246
248 » » k := prop.Value().(ds.Key) 247 » » k := prop.Value().(*ds.Key)
249 248
250 entKey := "ents:" + k.Namespace() 249 entKey := "ents:" + k.Namespace()
251 mkey := groupMetaKey(k) 250 mkey := groupMetaKey(k)
252 entsHead := d.head.GetCollection(entKey) 251 entsHead := d.head.GetCollection(entKey)
253 entsSnap := txn.snap.GetCollection(entKey) 252 entsSnap := txn.snap.GetCollection(entKey)
254 vHead := curVersion(entsHead, mkey) 253 vHead := curVersion(entsHead, mkey)
255 vSnap := curVersion(entsSnap, mkey) 254 vSnap := curVersion(entsSnap, mkey)
256 if vHead != vSnap { 255 if vHead != vSnap {
257 return false 256 return false
258 } 257 }
259 } 258 }
260 return true 259 return true
261 } 260 }
262 261
263 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { 262 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
264 txn := obj.(*txnDataStoreData) 263 txn := obj.(*txnDataStoreData)
265 for _, muts := range txn.muts { 264 for _, muts := range txn.muts {
266 if len(muts) == 0 { // read-only 265 if len(muts) == 0 { // read-only
267 continue 266 continue
268 } 267 }
269 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti 268 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti
270 for _, m := range muts { 269 for _, m := range muts {
271 err := error(nil) 270 err := error(nil)
272 k := m.key 271 k := m.key
273 if m.data == nil { 272 if m.data == nil {
274 » » » » d.delMulti([]ds.Key{k}, 273 » » » » d.delMulti([]*ds.Key{k},
275 func(e error) { err = e }) 274 func(e error) { err = e })
276 } else { 275 } else {
277 » » » » d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, 276 » » » » d.putMulti([]*ds.Key{m.key}, []ds.PropertyMap{m. data},
278 » » » » » func(_ ds.Key, e error) { err = e }) 277 » » » » » func(_ *ds.Key, e error) { err = e })
279 } 278 }
280 impossible(err) 279 impossible(err)
281 } 280 }
282 } 281 }
283 } 282 }
284 283
285 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { 284 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
286 return &txnDataStoreData{ 285 return &txnDataStoreData{
287 // alias to the main datastore's so that testing code can have p rimitive 286 // alias to the main datastore's so that testing code can have p rimitive
288 // access to break features inside of transactions. 287 // access to break features inside of transactions.
289 parent: d, 288 parent: d,
290 isXG: o != nil && o.XG, 289 isXG: o != nil && o.XG,
291 snap: d.head.Snapshot(), 290 snap: d.head.Snapshot(),
292 muts: map[string][]txnMutation{}, 291 muts: map[string][]txnMutation{},
293 } 292 }
294 } 293 }
295 294
296 func (d *dataStoreData) endTxn() {} 295 func (d *dataStoreData) endTxn() {}
297 296
298 /////////////////////////////// txnDataStoreData /////////////////////////////// 297 /////////////////////////////// txnDataStoreData ///////////////////////////////
299 298
300 type txnMutation struct { 299 type txnMutation struct {
301 » key ds.Key 300 » key *ds.Key
302 data ds.PropertyMap 301 data ds.PropertyMap
303 } 302 }
304 303
305 type txnDataStoreData struct { 304 type txnDataStoreData struct {
306 sync.Mutex 305 sync.Mutex
307 306
308 parent *dataStoreData 307 parent *dataStoreData
309 308
310 // boolean 0 or 1, use atomic.*Int32 to access. 309 // boolean 0 or 1, use atomic.*Int32 to access.
311 closed int32 310 closed int32
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 // mutation. 350 // mutation.
352 // 351 //
353 // if getOnly is true, don't record the actual mutation data, just ensure that 352 // if getOnly is true, don't record the actual mutation data, just ensure that
354 // the key is in an included entity group (or add an empty entry for tha t 353 // the key is in an included entity group (or add an empty entry for tha t
355 // group). 354 // group).
356 // 355 //
357 // if !getOnly && data == nil, this counts as a deletion instead of a Put. 356 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
358 // 357 //
359 // Returns an error if this key causes the transaction to cross too many entity 358 // Returns an error if this key causes the transaction to cross too many entity
360 // groups. 359 // groups.
361 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { 360 func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro pertyMap) error {
362 » rk := string(keyBytes(dskey.Root(key))) 361 » rk := string(keyBytes(key.Root()))
363 362
364 td.Lock() 363 td.Lock()
365 defer td.Unlock() 364 defer td.Unlock()
366 365
367 if _, ok := td.muts[rk]; !ok { 366 if _, ok := td.muts[rk]; !ok {
368 limit := 1 367 limit := 1
369 if td.isXG { 368 if td.isXG {
370 limit = xgEGLimit 369 limit = xgEGLimit
371 } 370 }
372 if len(td.muts)+1 > limit { 371 if len(td.muts)+1 > limit {
373 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" 372 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
374 if td.isXG { 373 if td.isXG {
375 msg = "operating on too many entity groups in a single transaction" 374 msg = "operating on too many entity groups in a single transaction"
376 } 375 }
377 return errors.New(msg) 376 return errors.New(msg)
378 } 377 }
379 td.muts[rk] = []txnMutation{} 378 td.muts[rk] = []txnMutation{}
380 } 379 }
381 if !getOnly { 380 if !getOnly {
382 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) 381 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
383 } 382 }
384 383
385 return nil 384 return nil
386 } 385 }
387 386
388 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds .PutMultiCB) { 387 func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d s.PutMultiCB) {
389 for i, k := range keys { 388 for i, k := range keys {
390 func() { 389 func() {
391 td.parent.Lock() 390 td.parent.Lock()
392 defer td.parent.Unlock() 391 defer td.parent.Unlock()
393 _, k = td.parent.entsKeyLocked(k) 392 _, k = td.parent.entsKeyLocked(k)
394 }() 393 }()
395 err := td.writeMutation(false, k, vals[i]) 394 err := td.writeMutation(false, k, vals[i])
396 if cb != nil { 395 if cb != nil {
397 cb(k, err) 396 cb(k, err)
398 } 397 }
399 } 398 }
400 } 399 }
401 400
402 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { 401 func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
403 return getMultiInner(keys, cb, func() (*memCollection, error) { 402 return getMultiInner(keys, cb, func() (*memCollection, error) {
404 err := error(nil) 403 err := error(nil)
405 for _, key := range keys { 404 for _, key := range keys {
406 err = td.writeMutation(true, key, nil) 405 err = td.writeMutation(true, key, nil)
407 if err != nil { 406 if err != nil {
408 return nil, err 407 return nil, err
409 } 408 }
410 } 409 }
411 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil 410 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil
412 }) 411 })
413 } 412 }
414 413
415 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { 414 func (td *txnDataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
416 for _, k := range keys { 415 for _, k := range keys {
417 err := td.writeMutation(false, k, nil) 416 err := td.writeMutation(false, k, nil)
418 if cb != nil { 417 if cb != nil {
419 cb(err) 418 cb(err)
420 } 419 }
421 } 420 }
422 return nil 421 return nil
423 } 422 }
424 423
425 func keyBytes(key ds.Key) []byte { 424 func keyBytes(key *ds.Key) []byte {
426 return serialize.ToBytes(ds.MkProperty(key)) 425 return serialize.ToBytes(ds.MkProperty(key))
427 } 426 }
428 427
429 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { 428 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
430 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 429 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
431 serialize.WithoutContext, globalAppID, ns) 430 serialize.WithoutContext, globalAppID, ns)
432 } 431 }
433 432
434 func rpm(data []byte) (ds.PropertyMap, error) { 433 func rpm(data []byte) (ds.PropertyMap, error) {
435 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 434 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
436 serialize.WithContext, "", "") 435 serialize.WithContext, "", "")
437 } 436 }
438
439 type keyitem interface {
440 Key() ds.Key
441 }
iannucci 2015/09/18 04:31:52 stale code is stale!
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698