Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(162)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1292913002: Split off serialization and key functions to their own packages. (Closed) Base URL: https://github.com/luci/gae.git@make_queries_better
Patch Set: rebase Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/dskey"
15 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/luci-go/common/errors" 16 "github.com/luci/luci-go/common/errors"
15 "golang.org/x/net/context" 17 "golang.org/x/net/context"
16 ) 18 )
17 19
18 //////////////////////////////// dataStoreData ///////////////////////////////// 20 //////////////////////////////// dataStoreData /////////////////////////////////
19 21
20 type dataStoreData struct { 22 type dataStoreData struct {
21 rwlock sync.RWMutex 23 rwlock sync.RWMutex
22 // See README.md for store schema. 24 // See README.md for store schema.
23 store *memStore 25 store *memStore
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 73
72 func (d *dataStoreData) catchupIndexes() { 74 func (d *dataStoreData) catchupIndexes() {
73 d.rwlock.Lock() 75 d.rwlock.Lock()
74 defer d.rwlock.Unlock() 76 defer d.rwlock.Unlock()
75 d.snap = d.store.Snapshot() 77 d.snap = d.store.Snapshot()
76 } 78 }
77 79
78 /////////////////////////// indicies(dataStoreData) //////////////////////////// 80 /////////////////////////// indicies(dataStoreData) ////////////////////////////
79 81
80 func groupMetaKey(key ds.Key) []byte { 82 func groupMetaKey(key ds.Key) []byte {
81 » return keyBytes(ds.WithoutContext, 83 » return keyBytes(serialize.WithoutContext,
82 » » ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key))) 84 » » dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key)))
83 } 85 }
84 86
85 func groupIDsKey(key ds.Key) []byte { 87 func groupIDsKey(key ds.Key) []byte {
86 » return keyBytes(ds.WithoutContext, 88 » return keyBytes(serialize.WithoutContext,
87 » » ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key) )) 89 » » dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key) ))
88 } 90 }
89 91
90 func rootIDsKey(kind string) []byte { 92 func rootIDsKey(kind string) []byte {
91 » return keyBytes(ds.WithoutContext, 93 » return keyBytes(serialize.WithoutContext,
92 » » ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) 94 » » dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
93 } 95 }
94 96
95 func curVersion(ents *memCollection, key []byte) int64 { 97 func curVersion(ents *memCollection, key []byte) int64 {
96 if ents != nil { 98 if ents != nil {
97 if v := ents.Get(key); v != nil { 99 if v := ents.Get(key); v != nil {
98 pm, err := rpm(v) 100 pm, err := rpm(v)
99 if err != nil { 101 if err != nil {
100 panic(err) // memory corruption 102 panic(err) // memory corruption
101 } 103 }
102 pl, ok := pm["__version__"] 104 pl, ok := pm["__version__"]
103 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { 105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
104 return pl[0].Value().(int64) 106 return pl[0].Value().(int64)
105 } 107 }
106 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm)) 108 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm))
107 } 109 }
108 } 110 }
109 return 0 111 return 0
110 } 112 }
111 113
112 func incrementLocked(ents *memCollection, key []byte) int64 { 114 func incrementLocked(ents *memCollection, key []byte) int64 {
113 ret := curVersion(ents, key) + 1 115 ret := curVersion(ents, key) + 1
114 buf := &bytes.Buffer{} 116 buf := &bytes.Buffer{}
115 » ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write( 117 » serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{
116 » » buf, ds.WithContext) 118 » » "__version__": {ds.MkPropertyNI(ret)}})
117 ents.Set(key, buf.Bytes()) 119 ents.Set(key, buf.Bytes())
118 return ret 120 return ret
119 } 121 }
120 122
121 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { 123 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) {
122 coll := "ents:" + key.Namespace() 124 coll := "ents:" + key.Namespace()
123 ents := d.store.GetCollection(coll) 125 ents := d.store.GetCollection(coll)
124 if ents == nil { 126 if ents == nil {
125 ents = d.store.SetCollection(coll, nil) 127 ents = d.store.SetCollection(coll, nil)
126 } 128 }
127 129
128 » if ds.KeyIncomplete(key) { 130 » if dskey.Incomplete(key) {
129 idKey := []byte(nil) 131 idKey := []byte(nil)
130 if key.Parent() == nil { 132 if key.Parent() == nil {
131 idKey = rootIDsKey(key.Kind()) 133 idKey = rootIDsKey(key.Kind())
132 } else { 134 } else {
133 idKey = groupIDsKey(key) 135 idKey = groupIDsKey(key)
134 } 136 }
135 id := incrementLocked(ents, idKey) 137 id := incrementLocked(ents, idKey)
136 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) 138 » » key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent())
137 } 139 }
138 140
139 return ents, key 141 return ents, key
140 } 142 }
141 143
142 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { 144 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) {
143 for i, k := range keys { 145 for i, k := range keys {
144 buf := &bytes.Buffer{} 146 buf := &bytes.Buffer{}
145 pmap, _ := vals[i].Save(false) 147 pmap, _ := vals[i].Save(false)
146 » » pmap.Write(buf, ds.WithoutContext) 148 » » serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap)
147 dataBytes := buf.Bytes() 149 dataBytes := buf.Bytes()
148 150
149 k, err := func() (ret ds.Key, err error) { 151 k, err := func() (ret ds.Key, err error) {
150 d.rwlock.Lock() 152 d.rwlock.Lock()
151 defer d.rwlock.Unlock() 153 defer d.rwlock.Unlock()
152 154
153 ents, ret := d.entsKeyLocked(k) 155 ents, ret := d.entsKeyLocked(k)
154 incrementLocked(ents, groupMetaKey(ret)) 156 incrementLocked(ents, groupMetaKey(ret))
155 157
156 » » » old := ents.Get(keyBytes(ds.WithoutContext, ret)) 158 » » » old := ents.Get(keyBytes(serialize.WithoutContext, ret))
157 oldPM := ds.PropertyMap(nil) 159 oldPM := ds.PropertyMap(nil)
158 if old != nil { 160 if old != nil {
159 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { 161 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
160 return 162 return
161 } 163 }
162 } 164 }
163 updateIndicies(d.store, ret, oldPM, pmap) 165 updateIndicies(d.store, ret, oldPM, pmap)
164 » » » ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes) 166 » » » ents.Set(keyBytes(serialize.WithoutContext, ret), dataBy tes)
165 return 167 return
166 }() 168 }()
167 if cb != nil { 169 if cb != nil {
168 cb(k, err) 170 cb(k, err)
169 } 171 }
170 } 172 }
171 } 173 }
172 174
173 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { 175 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error {
174 ents, err := getColl() 176 ents, err := getColl()
175 if err != nil { 177 if err != nil {
176 return err 178 return err
177 } 179 }
178 if ents == nil { 180 if ents == nil {
179 for range keys { 181 for range keys {
180 cb(nil, ds.ErrNoSuchEntity) 182 cb(nil, ds.ErrNoSuchEntity)
181 } 183 }
182 return nil 184 return nil
183 } 185 }
184 186
185 for _, k := range keys { 187 for _, k := range keys {
186 » » pdata := ents.Get(keyBytes(ds.WithoutContext, k)) 188 » » pdata := ents.Get(keyBytes(serialize.WithoutContext, k))
187 if pdata == nil { 189 if pdata == nil {
188 cb(nil, ds.ErrNoSuchEntity) 190 cb(nil, ds.ErrNoSuchEntity)
189 continue 191 continue
190 } 192 }
191 cb(rpmWoCtx(pdata, k.Namespace())) 193 cb(rpmWoCtx(pdata, k.Namespace()))
192 } 194 }
193 return nil 195 return nil
194 } 196 }
195 197
196 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { 198 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error {
197 getMultiInner(keys, cb, func() (*memCollection, error) { 199 getMultiInner(keys, cb, func() (*memCollection, error) {
198 s := d.takeSnapshot() 200 s := d.takeSnapshot()
199 201
200 return s.GetCollection("ents:" + keys[0].Namespace()), nil 202 return s.GetCollection("ents:" + keys[0].Namespace()), nil
201 }) 203 })
202 return nil 204 return nil
203 } 205 }
204 206
205 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { 207 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) {
206 toDel := make([][]byte, 0, len(keys)) 208 toDel := make([][]byte, 0, len(keys))
207 for _, k := range keys { 209 for _, k := range keys {
208 » » toDel = append(toDel, keyBytes(ds.WithoutContext, k)) 210 » » toDel = append(toDel, keyBytes(serialize.WithoutContext, k))
209 } 211 }
210 ns := keys[0].Namespace() 212 ns := keys[0].Namespace()
211 213
212 d.rwlock.Lock() 214 d.rwlock.Lock()
213 defer d.rwlock.Unlock() 215 defer d.rwlock.Unlock()
214 216
215 ents := d.store.GetCollection("ents:" + ns) 217 ents := d.store.GetCollection("ents:" + ns)
216 218
217 for i, k := range keys { 219 for i, k := range keys {
218 if ents != nil { 220 if ents != nil {
(...skipping 18 matching lines...) Expand all
237 } 239 }
238 240
239 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
240 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
241 243
242 txn := obj.(*txnDataStoreData) 244 txn := obj.(*txnDataStoreData)
243 for rk, muts := range txn.muts { 245 for rk, muts := range txn.muts {
244 if len(muts) == 0 { // read-only 246 if len(muts) == 0 { // read-only
245 continue 247 continue
246 } 248 }
247 » » k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext, "", "") 249 » » k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize .WithContext, "", "")
248 if err != nil { 250 if err != nil {
249 panic(err) 251 panic(err)
250 } 252 }
251 253
252 entKey := "ents:" + k.Namespace() 254 entKey := "ents:" + k.Namespace()
253 mkey := groupMetaKey(k) 255 mkey := groupMetaKey(k)
254 entsHead := d.store.GetCollection(entKey) 256 entsHead := d.store.GetCollection(entKey)
255 entsSnap := txn.snap.GetCollection(entKey) 257 entsSnap := txn.snap.GetCollection(entKey)
256 vHead := curVersion(entsHead, mkey) 258 vHead := curVersion(entsHead, mkey)
257 vSnap := curVersion(entsSnap, mkey) 259 vSnap := curVersion(entsSnap, mkey)
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
356 // 358 //
357 // if getOnly is true, don't record the actual mutation data, just ensure that 359 // if getOnly is true, don't record the actual mutation data, just ensure that
358 // the key is in an included entity group (or add an empty entry for tha t 360 // the key is in an included entity group (or add an empty entry for tha t
359 // group). 361 // group).
360 // 362 //
361 // if !getOnly && data == nil, this counts as a deletion instead of a Put. 363 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
362 // 364 //
363 // Returns an error if this key causes the transaction to cross too many entity 365 // Returns an error if this key causes the transaction to cross too many entity
364 // groups. 366 // groups.
365 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { 367 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error {
366 » rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key))) 368 » rk := string(keyBytes(serialize.WithContext, dskey.Root(key)))
367 369
368 td.Lock() 370 td.Lock()
369 defer td.Unlock() 371 defer td.Unlock()
370 372
371 if _, ok := td.muts[rk]; !ok { 373 if _, ok := td.muts[rk]; !ok {
372 limit := 1 374 limit := 1
373 if td.isXG { 375 if td.isXG {
374 limit = xgEGLimit 376 limit = xgEGLimit
375 } 377 }
376 if len(td.muts)+1 > limit { 378 if len(td.muts)+1 > limit {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
419 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { 421 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
420 for _, k := range keys { 422 for _, k := range keys {
421 err := td.writeMutation(false, k, nil) 423 err := td.writeMutation(false, k, nil)
422 if cb != nil { 424 if cb != nil {
423 cb(err) 425 cb(err)
424 } 426 }
425 } 427 }
426 return nil 428 return nil
427 } 429 }
428 430
429 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte { 431 func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte {
430 buf := &bytes.Buffer{} 432 buf := &bytes.Buffer{}
431 » ds.WriteKey(buf, ctx, key) 433 » serialize.WriteKey(buf, ctx, key)
432 return buf.Bytes() 434 return buf.Bytes()
433 } 435 }
434 436
435 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { 437 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
436 » ret := ds.PropertyMap{} 438 » return serialize.ReadPropertyMap(bytes.NewBuffer(data),
437 » err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n s) 439 » » serialize.WithoutContext, globalAppID, ns)
438 » return ret, err
439 } 440 }
440 441
441 func rpm(data []byte) (ds.PropertyMap, error) { 442 func rpm(data []byte) (ds.PropertyMap, error) {
442 » ret := ds.PropertyMap{} 443 » return serialize.ReadPropertyMap(bytes.NewBuffer(data),
443 » err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "") 444 » » serialize.WithContext, "", "")
444 » return ret, err
445 } 445 }
446 446
447 type keyitem interface { 447 type keyitem interface {
448 Key() ds.Key 448 Key() ds.Key
449 } 449 }
OLDNEW
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698