Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(174)

Side by Side Diff: impl/memory/datastore_index.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: minor fixes Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sort" 10 "sort"
11 11
12 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize" 13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/gkvlite" 14 "github.com/luci/gkvlite"
15 ) 15 )
16 16
17 var indexCreationDeterministic = false
18
19 type qIndexSlice []*ds.IndexDefinition 17 type qIndexSlice []*ds.IndexDefinition
20 18
21 func (s qIndexSlice) Len() int { return len(s) } 19 func (s qIndexSlice) Len() int { return len(s) }
22 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 20 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
23 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } 21 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) }
24 22
25 func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { 23 func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
26 ret := make(qIndexSlice, 0, 2*len(pmap)+1) 24 ret := make(qIndexSlice, 0, 2*len(pmap)+1)
27 ret = append(ret, &ds.IndexDefinition{Kind: kind}) 25 ret = append(ret, &ds.IndexDefinition{Kind: kind})
28 for name, pvals := range pmap { 26 for name, pvals := range pmap {
29 needsIndex := false 27 needsIndex := false
30 for _, v := range pvals { 28 for _, v := range pvals {
31 if v.IndexSetting() == ds.ShouldIndex { 29 if v.IndexSetting() == ds.ShouldIndex {
32 needsIndex = true 30 needsIndex = true
33 break 31 break
34 } 32 }
35 } 33 }
36 if !needsIndex { 34 if !needsIndex {
37 continue 35 continue
38 } 36 }
39 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I ndexColumn{{Property: name}}}) 37 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I ndexColumn{{Property: name}}})
40 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I ndexColumn{{Property: name, Direction: ds.DESCENDING}}}) 38 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I ndexColumn{{Property: name, Direction: ds.DESCENDING}}})
41 } 39 }
42 » if indexCreationDeterministic { 40 » if serializationDeterministic {
43 sort.Sort(ret) 41 sort.Sort(ret)
44 } 42 }
45 return ret 43 return ret
46 } 44 }
47 45
48 func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.Ind exDefinition) *memStore { 46 func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.Ind exDefinition) *memStore {
49 » sip := partiallySerialize(pm) 47 » sip := partiallySerialize(k, pm)
50 » return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complex Idxs...)) 48 » return sip.indexEntries(k.Namespace(), append(defaultIndicies(k.Kind(), pm), complexIdxs...))
51 } 49 }
52 50
53 // serializedPvals is all of the serialized DSProperty values in qASC order. 51 // serializedPvals is all of the serialized DSProperty values in qASC order.
54 type serializedPvals [][]byte 52 type serializedPvals [][]byte
55 53
56 func (s serializedPvals) Len() int { return len(s) } 54 func (s serializedPvals) Len() int { return len(s) }
57 func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 55 func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
58 func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 } 56 func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 }
59 57
60 // prop name -> [<serialized DSProperty>, ...] 58 // prop name -> [<serialized DSProperty>, ...]
59 // includes special values '__key__' and '__ancestor__' which contains all of
60 // the ancestor entries for this key.
61 type serializedIndexablePmap map[string]serializedPvals 61 type serializedIndexablePmap map[string]serializedPvals
62 62
63 func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) { 63 func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePma p) {
64 » if len(pm) == 0 { 64 » ret = make(serializedIndexablePmap, len(pm)+2)
65 » » return 65 » ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))}
66 » for k != nil {
67 » » ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBy tes(ds.MkProperty(k)))
68 » » k = k.Parent()
66 } 69 }
67
68 buf := &bytes.Buffer{}
69 ret = make(serializedIndexablePmap, len(pm))
70 for k, vals := range pm { 70 for k, vals := range pm {
71 dups := map[string]struct{}{}
71 newVals := make(serializedPvals, 0, len(vals)) 72 newVals := make(serializedPvals, 0, len(vals))
72 for _, v := range vals { 73 for _, v := range vals {
73 if v.IndexSetting() == ds.NoIndex { 74 if v.IndexSetting() == ds.NoIndex {
74 continue 75 continue
75 } 76 }
76 » » » buf.Reset() 77 » » » data := serialize.ToBytes(v)
77 » » » serialize.WriteProperty(buf, serialize.WithoutContext, v ) 78 » » » dataS := string(data)
78 » » » newVal := make([]byte, buf.Len()) 79 » » » if _, ok := dups[dataS]; ok {
79 » » » copy(newVal, buf.Bytes()) 80 » » » » continue
80 » » » newVals = append(newVals, newVal) 81 » » » }
82 » » » dups[dataS] = struct{}{}
83 » » » newVals = append(newVals, data)
81 } 84 }
82 if len(newVals) > 0 { 85 if len(newVals) > 0 {
83 sort.Sort(newVals)
84 ret[k] = newVals 86 ret[k] = newVals
85 } 87 }
86 } 88 }
87 return 89 return
88 } 90 }
89 91
90 // indexRowGen contains enough information to generate all of the index rows whi ch 92 // indexRowGen contains enough information to generate all of the index rows whi ch
91 // correspond with a propertyList and a ds.IndexDefinition. 93 // correspond with a propertyList and a ds.IndexDefinition.
92 type indexRowGen struct { 94 type indexRowGen struct {
93 propVec []serializedPvals 95 propVec []serializedPvals
94 orders []ds.IndexDirection 96 orders []ds.IndexDirection
95 } 97 }
96 98
97 // permute calls cb for each index row, in the sorted order of the rows. 99 // permute calls cb for each index row, in the sorted order of the rows.
98 func (s indexRowGen) permute(cb func([]byte)) { 100 func (s indexRowGen) permute(collSetFn func(k, v []byte)) {
99 iVec := make([]int, len(s.propVec)) 101 iVec := make([]int, len(s.propVec))
100 iVecLim := make([]int, len(s.propVec)) 102 iVecLim := make([]int, len(s.propVec))
101 103
102 incPos := func() bool { 104 incPos := func() bool {
103 for i := len(iVec) - 1; i >= 0; i-- { 105 for i := len(iVec) - 1; i >= 0; i-- {
104 var done bool 106 var done bool
105 var newVal int 107 var newVal int
106 if s.orders[i] == ds.ASCENDING { 108 if s.orders[i] == ds.ASCENDING {
107 newVal = (iVec[i] + 1) % iVecLim[i] 109 newVal = (iVec[i] + 1) % iVecLim[i]
108 done = newVal != 0 110 done = newVal != 0
(...skipping 21 matching lines...) Expand all
130 if s.orders[i] == ds.DESCENDING { 132 if s.orders[i] == ds.DESCENDING {
131 iVec[i] = iVecLim[i] - 1 133 iVec[i] = iVecLim[i] - 1
132 } 134 }
133 } 135 }
134 136
135 for { 137 for {
136 bufsiz := 0 138 bufsiz := 0
137 for pvalSliceIdx, pvalIdx := range iVec { 139 for pvalSliceIdx, pvalIdx := range iVec {
138 bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) 140 bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx])
139 } 141 }
140 » » buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) 142 » » buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufs iz)))
141 for pvalSliceIdx, pvalIdx := range iVec { 143 for pvalSliceIdx, pvalIdx := range iVec {
142 data := s.propVec[pvalSliceIdx][pvalIdx] 144 data := s.propVec[pvalSliceIdx][pvalIdx]
143 » » » if s.orders[pvalSliceIdx] == ds.ASCENDING { 145 » » » buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING)
144 » » » » buf.Write(data) 146 » » » buf.Write(data)
145 » » » } else {
146 » » » » for _, b := range data {
147 » » » » » buf.WriteByte(b ^ 0xFF)
148 » » » » }
149 » » » }
150 } 147 }
151 » » cb(buf.Bytes()) 148 » » collSetFn(buf.Bytes(), []byte{})
152 if !incPos() { 149 if !incPos() {
153 break 150 break
154 } 151 }
155 } 152 }
156 } 153 }
157 154
158 type matcher struct { 155 type matcher struct {
159 buf indexRowGen 156 buf indexRowGen
160 } 157 }
161 158
162 // matcher.match checks to see if the mapped, serialized property values 159 // matcher.match checks to see if the mapped, serialized property values
163 // match the index. If they do, it returns a indexRowGen. Do not write or modify 160 // match the index. If they do, it returns a indexRowGen. Do not write or modify
164 // the data in the indexRowGen. 161 // the data in the indexRowGen.
165 func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i ndexRowGen, bool) { 162 func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (i ndexRowGen, bool) {
166 m.buf.propVec = m.buf.propVec[:0] 163 m.buf.propVec = m.buf.propVec[:0]
167 m.buf.orders = m.buf.orders[:0] 164 m.buf.orders = m.buf.orders[:0]
168 » for _, sb := range idx.SortBy { 165 » for _, sb := range sortBy {
169 » » if sb.Property == "__key__" {
170 » » » panic("don't know how to build compound index on __key__ ")
171 » » }
172 if pv, ok := sip[sb.Property]; ok { 166 if pv, ok := sip[sb.Property]; ok {
173 m.buf.propVec = append(m.buf.propVec, pv) 167 m.buf.propVec = append(m.buf.propVec, pv)
174 m.buf.orders = append(m.buf.orders, sb.Direction) 168 m.buf.orders = append(m.buf.orders, sb.Direction)
175 } else { 169 } else {
176 return indexRowGen{}, false 170 return indexRowGen{}, false
177 } 171 }
178 } 172 }
179 return m.buf, true 173 return m.buf, true
180 } 174 }
181 175
182 func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefini tion) *memStore { 176 func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefin ition) *memStore {
183 ret := newMemStore() 177 ret := newMemStore()
184 idxColl := ret.SetCollection("idx", nil) 178 idxColl := ret.SetCollection("idx", nil)
185 // getIdxEnts retrieves an index collection or adds it if it's not there .
186 getIdxEnts := func(qi *ds.IndexDefinition) *memCollection {
187 b := serialize.ToBytes(*qi)
188 idxColl.Set(b, []byte{})
189 return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil)
190 }
191
192 keyData := serialize.ToBytes(k)
193
194 walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollec tion) {
195 prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil.
196 irg.permute(func(data []byte) {
197 buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(d ata)+len(keyData)))
198 buf.Write(prefix)
199 buf.Write(data)
200 buf.Write(keyData)
201 ents.Set(buf.Bytes(), prev)
202 prev = data
203 })
204 }
205 179
206 mtch := matcher{} 180 mtch := matcher{}
207 for _, idx := range idxs { 181 for _, idx := range idxs {
208 » » if irg, ok := mtch.match(idx, sip); ok { 182 » » idx = idx.Normalize()
209 » » » idxEnts := getIdxEnts(idx) 183 » » if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok {
210 » » » if len(irg.propVec) == 0 { 184 » » » idxBin := serialize.ToBytes(*idx.PrepForIdxTable())
211 » » » » idxEnts.Set(keyData, []byte{}) // propless index , e.g. kind -> key = nil 185 » » » idxColl.Set(idxBin, []byte{})
dnj (Google) 2015/08/28 17:54:21 Use []byte(nil) here, as []byte{} allocates an emp
iannucci 2015/08/28 19:48:55 If gkvlite didn't panic on nil values, I would tot
212 » » » } else if idx.Ancestor { 186 » » » coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, i dxBin), nil)
213 » » » » for ancKey := k; ancKey != nil; ancKey = ancKey. Parent() { 187 » » » irg.permute(coll.Set)
214 » » » » » walkPermutations(serialize.ToBytes(ancKe y), irg, idxEnts)
215 » » » » }
216 » » » } else {
217 » » » » walkPermutations(nil, irg, idxEnts)
218 » » » }
219 } 188 }
220 } 189 }
221 190
222 return ret 191 return ret
223 } 192 }
224 193
225 func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition { 194 // walkCompIdxs walks the table of compound indexes in the store. If `endsWith`
226 » // load all current complex query index definitions. 195 // is provided, this will only walk over compound indexes which match
227 » compIdx := []*ds.IndexDefinition{} 196 // Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix.
228 » complexQueryPrefix := ds.IndexComplexQueryPrefix() 197 func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.Ind exDefinition) bool) {
229 » idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item ) bool {
230 » » if !bytes.HasPrefix(i.Key, complexQueryPrefix) {
231 » » » return false
232 » » }
233 » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key))
234 » » if err != nil {
235 » » » panic(err) // memory corruption
236 » » }
237 » » compIdx = append(compIdx, &qi)
238 » » return true
239 » })
240 » return compIdx
241 }
242
243 func getIdxColl(store *memStore) *memCollection {
244 idxColl := store.GetCollection("idx") 198 idxColl := store.GetCollection("idx")
245 if idxColl == nil { 199 if idxColl == nil {
246 » » idxColl = store.SetCollection("idx", nil) 200 » » return
247 } 201 }
248 » return idxColl 202 » itrDef := iterDefinition{c: idxColl}
203
204 » if endsWith != nil {
205 » » full := serialize.ToBytes(*endsWith.Flip())
206 » » // chop off the null terminating byte
207 » » itrDef.prefix = full[:len(full)-1]
208 » }
209
210 » for it := itrDef.mkIter(); !it.stopped; {
211 » » it.next(nil, func(i *gkvlite.Item) {
212 » » » if i == nil {
213 » » » » return
214 » » » }
215 » » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer (i.Key))
216 » » » memoryCorruption(err)
217 » » » if !cb(qi.Flip()) {
218 » » » » it.stop()
dnj (Google) 2015/08/28 17:54:21 It'd be better to arrange to do this via defer, as
iannucci 2015/08/28 19:48:55 I think you mean 'additionally do this via a defer
219 » » » }
220 » » })
221 » }
249 } 222 }
250 223
251 func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { 224 func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
252 idxColl := getIdxColl(store)
253 prefix := "idx:" + ns + ":" 225 prefix := "idx:" + ns + ":"
254 gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun c(k, ov, nv []byte) { 226 gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun c(k, ov, nv []byte) {
255 ks := prefix + string(k) 227 ks := prefix + string(k)
dnj (Google) 2015/08/28 17:54:21 Since you do this a lot, you should probably alloc
iannucci 2015/08/28 19:48:55 done. though this is definitely a micro-optimizati
256 if idxColl.Get(k) == nil {
257 // avoids unnecessary mutation, otherwise the idx collec tion thrashes on
258 // every update.
259 idxColl.Set(k, []byte{})
260 }
261 228
262 coll := store.GetCollection(ks) 229 coll := store.GetCollection(ks)
263 if coll == nil { 230 if coll == nil {
264 coll = store.SetCollection(ks, nil) 231 coll = store.SetCollection(ks, nil)
265 } 232 }
233
266 oldColl := oldIdx.GetCollection(ks) 234 oldColl := oldIdx.GetCollection(ks)
267 newColl := newIdx.GetCollection(ks) 235 newColl := newIdx.GetCollection(ks)
268 236
269 switch { 237 switch {
270 case ov == nil && nv != nil: // all additions 238 case ov == nil && nv != nil: // all additions
271 newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite m) bool { 239 newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite m) bool {
272 » » » » coll.Set(i.Key, i.Val) 240 » » » » coll.Set(i.Key, []byte{})
273 return true 241 return true
274 }) 242 })
275 case ov != nil && nv == nil: // all deletions 243 case ov != nil && nv == nil: // all deletions
276 oldColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite m) bool { 244 oldColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite m) bool {
277 coll.Delete(i.Key) 245 coll.Delete(i.Key)
278 return true 246 return true
279 }) 247 })
280 case ov != nil && nv != nil: // merge 248 case ov != nil && nv != nil: // merge
281 gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { 249 gkvCollide(oldColl, newColl, func(k, ov, nv []byte) {
282 if nv == nil { 250 if nv == nil {
283 coll.Delete(k) 251 coll.Delete(k)
284 } else { 252 } else {
285 » » » » » coll.Set(k, nv) 253 » » » » » coll.Set(k, []byte{})
286 } 254 }
287 }) 255 })
288 default: 256 default:
289 » » » panic("impossible") 257 » » » impossible(fmt.Errorf("both values from gkvCollide were nil?"))
290 } 258 }
291 // TODO(riannucci): remove entries from idxColl and remove index collections 259 // TODO(riannucci): remove entries from idxColl and remove index collections
292 // when there are no index entries for that index any more. 260 // when there are no index entries for that index any more.
293 }) 261 })
294 } 262 }
295 263
296 func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) { 264 func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) {
297 » store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvl ite.Item) bool { 265 » normalized := make([]*ds.IndexDefinition, len(compIdx))
298 » » pm, err := rpmWoCtx(i.Val, ns) 266 » idxColl := store.SetCollection("idx", nil)
299 » » if err != nil { 267 » for i, idx := range compIdx {
300 » » » panic(err) // memory corruption 268 » » normalized[i] = idx.Normalize()
301 » » } 269 » » idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{})
302 » » k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.Wi thoutContext, globalAppID, ns) 270 » }
303 » » if err != nil { 271
304 » » » panic(err) 272 » if allEnts := store.GetCollection("ents:" + ns); allEnts != nil {
305 » » } 273 » » allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
306 » » sip := partiallySerialize(pm) 274 » » » pm, err := rpmWoCtx(i.Val, ns)
307 » » mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compI dx)) 275 » » » memoryCorruption(err)
308 » » return true 276
309 » }) 277 » » » prop, err := serialize.ReadProperty(bytes.NewBuffer(i.Ke y), serialize.WithoutContext, globalAppID, ns)
278 » » » memoryCorruption(err)
279
280 » » » k := prop.Value().(ds.Key)
281
282 » » » sip := partiallySerialize(k, pm)
283
284 » » » mergeIndexes(ns, store,
285 » » » » newMemStore(),
286 » » » » sip.indexEntries(ns, normalized))
287 » » » return true
288 » » })
289 » }
310 } 290 }
311 291
312 func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) { 292 func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
313 // load all current complex query index definitions. 293 // load all current complex query index definitions.
314 » compIdx := getCompIdxs(getIdxColl(store)) 294 » compIdx := []*ds.IndexDefinition{}
295 » walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool {
296 » » compIdx = append(compIdx, i)
297 » » return true
298 » })
315 299
316 mergeIndexes(key.Namespace(), store, 300 mergeIndexes(key.Namespace(), store,
317 indexEntriesWithBuiltins(key, oldEnt, compIdx), 301 indexEntriesWithBuiltins(key, oldEnt, compIdx),
318 indexEntriesWithBuiltins(key, newEnt, compIdx)) 302 indexEntriesWithBuiltins(key, newEnt, compIdx))
319 } 303 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698