| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sort" | 10 "sort" |
| 11 | 11 |
| 12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/gkvlite" | 14 "github.com/luci/gkvlite" |
| 15 ) | 15 ) |
| 16 | 16 |
| 17 var indexCreationDeterministic = false | |
| 18 | |
| 19 type qIndexSlice []*ds.IndexDefinition | 17 type qIndexSlice []*ds.IndexDefinition |
| 20 | 18 |
| 21 func (s qIndexSlice) Len() int { return len(s) } | 19 func (s qIndexSlice) Len() int { return len(s) } |
| 22 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | 20 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
| 23 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } | 21 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } |
| 24 | 22 |
| 25 func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { | 23 func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { |
| 26 ret := make(qIndexSlice, 0, 2*len(pmap)+1) | 24 ret := make(qIndexSlice, 0, 2*len(pmap)+1) |
| 27 ret = append(ret, &ds.IndexDefinition{Kind: kind}) | 25 ret = append(ret, &ds.IndexDefinition{Kind: kind}) |
| 28 for name, pvals := range pmap { | 26 for name, pvals := range pmap { |
| 29 needsIndex := false | 27 needsIndex := false |
| 30 for _, v := range pvals { | 28 for _, v := range pvals { |
| 31 if v.IndexSetting() == ds.ShouldIndex { | 29 if v.IndexSetting() == ds.ShouldIndex { |
| 32 needsIndex = true | 30 needsIndex = true |
| 33 break | 31 break |
| 34 } | 32 } |
| 35 } | 33 } |
| 36 if !needsIndex { | 34 if !needsIndex { |
| 37 continue | 35 continue |
| 38 } | 36 } |
| 39 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I
ndexColumn{{Property: name}}}) | 37 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I
ndexColumn{{Property: name}}}) |
| 40 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I
ndexColumn{{Property: name, Direction: ds.DESCENDING}}}) | 38 ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I
ndexColumn{{Property: name, Direction: ds.DESCENDING}}}) |
| 41 } | 39 } |
| 42 » if indexCreationDeterministic { | 40 » if serializationDeterministic { |
| 43 sort.Sort(ret) | 41 sort.Sort(ret) |
| 44 } | 42 } |
| 45 return ret | 43 return ret |
| 46 } | 44 } |
| 47 | 45 |
| 48 func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.Ind
exDefinition) *memStore { | 46 func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.Ind
exDefinition) *memStore { |
| 49 » sip := partiallySerialize(pm) | 47 » sip := partiallySerialize(k, pm) |
| 50 » return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complex
Idxs...)) | 48 » return sip.indexEntries(k.Namespace(), append(defaultIndicies(k.Kind(),
pm), complexIdxs...)) |
| 51 } | 49 } |
| 52 | 50 |
| 53 // serializedPvals is all of the serialized DSProperty values in qASC order. | 51 // serializedPvals is all of the serialized DSProperty values in qASC order. |
| 54 type serializedPvals [][]byte | 52 type serializedPvals [][]byte |
| 55 | 53 |
| 56 func (s serializedPvals) Len() int { return len(s) } | 54 func (s serializedPvals) Len() int { return len(s) } |
| 57 func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | 55 func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
| 58 func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j])
< 0 } | 56 func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j])
< 0 } |
| 59 | 57 |
| 60 // prop name -> [<serialized DSProperty>, ...] | 58 // prop name -> [<serialized DSProperty>, ...] |
| 59 // includes special values '__key__' and '__ancestor__' which contains all of |
| 60 // the ancestor entries for this key. |
| 61 type serializedIndexablePmap map[string]serializedPvals | 61 type serializedIndexablePmap map[string]serializedPvals |
| 62 | 62 |
| 63 func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) { | 63 func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePma
p) { |
| 64 » if len(pm) == 0 { | 64 » ret = make(serializedIndexablePmap, len(pm)+2) |
| 65 » » return | 65 » ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))} |
| 66 » for k != nil { |
| 67 » » ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBy
tes(ds.MkProperty(k))) |
| 68 » » k = k.Parent() |
| 66 } | 69 } |
| 67 | |
| 68 buf := &bytes.Buffer{} | |
| 69 ret = make(serializedIndexablePmap, len(pm)) | |
| 70 for k, vals := range pm { | 70 for k, vals := range pm { |
| 71 newVals := make(serializedPvals, 0, len(vals)) | 71 newVals := make(serializedPvals, 0, len(vals)) |
| 72 for _, v := range vals { | 72 for _, v := range vals { |
| 73 if v.IndexSetting() == ds.NoIndex { | 73 if v.IndexSetting() == ds.NoIndex { |
| 74 continue | 74 continue |
| 75 } | 75 } |
| 76 » » » buf.Reset() | 76 » » » newVals = append(newVals, serialize.ToBytes(v)) |
| 77 » » » serialize.WriteProperty(buf, serialize.WithoutContext, v
) | |
| 78 » » » newVal := make([]byte, buf.Len()) | |
| 79 » » » copy(newVal, buf.Bytes()) | |
| 80 » » » newVals = append(newVals, newVal) | |
| 81 } | 77 } |
| 82 if len(newVals) > 0 { | 78 if len(newVals) > 0 { |
| 83 sort.Sort(newVals) | 79 sort.Sort(newVals) |
| 84 ret[k] = newVals | 80 ret[k] = newVals |
| 85 } | 81 } |
| 86 } | 82 } |
| 87 return | 83 return |
| 88 } | 84 } |
| 89 | 85 |
| 90 // indexRowGen contains enough information to generate all of the index rows whi
ch | 86 // indexRowGen contains enough information to generate all of the index rows whi
ch |
| 91 // correspond with a propertyList and a ds.IndexDefinition. | 87 // correspond with a propertyList and a ds.IndexDefinition. |
| 92 type indexRowGen struct { | 88 type indexRowGen struct { |
| 93 propVec []serializedPvals | 89 propVec []serializedPvals |
| 94 orders []ds.IndexDirection | 90 orders []ds.IndexDirection |
| 95 } | 91 } |
| 96 | 92 |
| 97 // permute calls cb for each index row, in the sorted order of the rows. | 93 // permute calls cb for each index row, in the sorted order of the rows. |
| 98 func (s indexRowGen) permute(cb func([]byte)) { | 94 func (s indexRowGen) permute(collSetFn func(k, v []byte)) { |
| 99 iVec := make([]int, len(s.propVec)) | 95 iVec := make([]int, len(s.propVec)) |
| 100 iVecLim := make([]int, len(s.propVec)) | 96 iVecLim := make([]int, len(s.propVec)) |
| 101 | 97 |
| 102 incPos := func() bool { | 98 incPos := func() bool { |
| 103 for i := len(iVec) - 1; i >= 0; i-- { | 99 for i := len(iVec) - 1; i >= 0; i-- { |
| 104 var done bool | 100 var done bool |
| 105 var newVal int | 101 var newVal int |
| 106 if s.orders[i] == ds.ASCENDING { | 102 if s.orders[i] == ds.ASCENDING { |
| 107 newVal = (iVec[i] + 1) % iVecLim[i] | 103 newVal = (iVec[i] + 1) % iVecLim[i] |
| 108 done = newVal != 0 | 104 done = newVal != 0 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 130 if s.orders[i] == ds.DESCENDING { | 126 if s.orders[i] == ds.DESCENDING { |
| 131 iVec[i] = iVecLim[i] - 1 | 127 iVec[i] = iVecLim[i] - 1 |
| 132 } | 128 } |
| 133 } | 129 } |
| 134 | 130 |
| 135 for { | 131 for { |
| 136 bufsiz := 0 | 132 bufsiz := 0 |
| 137 for pvalSliceIdx, pvalIdx := range iVec { | 133 for pvalSliceIdx, pvalIdx := range iVec { |
| 138 bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) | 134 bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) |
| 139 } | 135 } |
| 140 » » buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) | 136 » » buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufs
iz))) |
| 141 for pvalSliceIdx, pvalIdx := range iVec { | 137 for pvalSliceIdx, pvalIdx := range iVec { |
| 142 data := s.propVec[pvalSliceIdx][pvalIdx] | 138 data := s.propVec[pvalSliceIdx][pvalIdx] |
| 143 » » » if s.orders[pvalSliceIdx] == ds.ASCENDING { | 139 » » » buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING) |
| 144 » » » » buf.Write(data) | 140 » » » buf.Write(data) |
| 145 » » » } else { | |
| 146 » » » » for _, b := range data { | |
| 147 » » » » » buf.WriteByte(b ^ 0xFF) | |
| 148 » » » » } | |
| 149 » » » } | |
| 150 } | 141 } |
| 151 » » cb(buf.Bytes()) | 142 » » collSetFn(buf.Bytes(), []byte{}) |
| 152 if !incPos() { | 143 if !incPos() { |
| 153 break | 144 break |
| 154 } | 145 } |
| 155 } | 146 } |
| 156 } | 147 } |
| 157 | 148 |
| 158 type matcher struct { | 149 type matcher struct { |
| 159 buf indexRowGen | 150 buf indexRowGen |
| 160 } | 151 } |
| 161 | 152 |
| 162 // matcher.match checks to see if the mapped, serialized property values | 153 // matcher.match checks to see if the mapped, serialized property values |
| 163 // match the index. If they do, it returns a indexRowGen. Do not write or modify | 154 // match the index. If they do, it returns a indexRowGen. Do not write or modify |
| 164 // the data in the indexRowGen. | 155 // the data in the indexRowGen. |
| 165 func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i
ndexRowGen, bool) { | 156 func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (i
ndexRowGen, bool) { |
| 166 m.buf.propVec = m.buf.propVec[:0] | 157 m.buf.propVec = m.buf.propVec[:0] |
| 167 m.buf.orders = m.buf.orders[:0] | 158 m.buf.orders = m.buf.orders[:0] |
| 168 » for _, sb := range idx.SortBy { | 159 » for _, sb := range sortBy { |
| 169 » » if sb.Property == "__key__" { | |
| 170 » » » panic("don't know how to build compound index on __key__
") | |
| 171 » » } | |
| 172 if pv, ok := sip[sb.Property]; ok { | 160 if pv, ok := sip[sb.Property]; ok { |
| 173 m.buf.propVec = append(m.buf.propVec, pv) | 161 m.buf.propVec = append(m.buf.propVec, pv) |
| 174 m.buf.orders = append(m.buf.orders, sb.Direction) | 162 m.buf.orders = append(m.buf.orders, sb.Direction) |
| 175 } else { | 163 } else { |
| 176 return indexRowGen{}, false | 164 return indexRowGen{}, false |
| 177 } | 165 } |
| 178 } | 166 } |
| 179 return m.buf, true | 167 return m.buf, true |
| 180 } | 168 } |
| 181 | 169 |
| 182 func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefini
tion) *memStore { | 170 func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefin
ition) *memStore { |
| 183 ret := newMemStore() | 171 ret := newMemStore() |
| 184 idxColl := ret.SetCollection("idx", nil) | 172 idxColl := ret.SetCollection("idx", nil) |
| 185 // getIdxEnts retrieves an index collection or adds it if it's not there
. | |
| 186 getIdxEnts := func(qi *ds.IndexDefinition) *memCollection { | |
| 187 b := serialize.ToBytes(*qi) | |
| 188 idxColl.Set(b, []byte{}) | |
| 189 return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(),
b), nil) | |
| 190 } | |
| 191 | |
| 192 keyData := serialize.ToBytes(k) | |
| 193 | |
| 194 walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollec
tion) { | |
| 195 irg.permute(func(data []byte) { | |
| 196 buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(d
ata)+len(keyData))) | |
| 197 buf.Write(prefix) | |
| 198 buf.Write(data) | |
| 199 buf.Write(keyData) | |
| 200 ents.Set(buf.Bytes(), []byte{}) | |
| 201 }) | |
| 202 } | |
| 203 | 173 |
| 204 mtch := matcher{} | 174 mtch := matcher{} |
| 205 for _, idx := range idxs { | 175 for _, idx := range idxs { |
| 206 » » if irg, ok := mtch.match(idx, sip); ok { | 176 » » if irg, ok := mtch.match(idx.NormalizeOrder(), sip); ok { |
| 207 » » » idxEnts := getIdxEnts(idx) | 177 » » » idxBin := serialize.ToBytes(*idx) |
| 208 » » » if len(irg.propVec) == 0 { | 178 » » » idxColl.Set(idxBin, []byte{}) |
| 209 » » » » idxEnts.Set(keyData, []byte{}) // propless index
, e.g. kind -> key = nil | 179 » » » coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, i
dxBin), nil) |
| 210 » » » } else if idx.Ancestor { | 180 » » » irg.permute(coll.Set) |
| 211 » » » » for ancKey := k; ancKey != nil; ancKey = ancKey.
Parent() { | |
| 212 » » » » » walkPermutations(serialize.ToBytes(ancKe
y), irg, idxEnts) | |
| 213 » » » » } | |
| 214 » » » } else { | |
| 215 » » » » walkPermutations(nil, irg, idxEnts) | |
| 216 » » » } | |
| 217 } | 181 } |
| 218 } | 182 } |
| 219 | 183 |
| 220 return ret | 184 return ret |
| 221 } | 185 } |
| 222 | 186 |
| 223 func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition { | 187 func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition { |
| 224 // load all current complex query index definitions. | 188 // load all current complex query index definitions. |
| 225 compIdx := []*ds.IndexDefinition{} | 189 compIdx := []*ds.IndexDefinition{} |
| 226 complexQueryPrefix := ds.IndexComplexQueryPrefix() | 190 complexQueryPrefix := ds.IndexComplexQueryPrefix() |
| 227 idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item
) bool { | 191 idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item
) bool { |
| 228 if !bytes.HasPrefix(i.Key, complexQueryPrefix) { | 192 if !bytes.HasPrefix(i.Key, complexQueryPrefix) { |
| 229 return false | 193 return false |
| 230 } | 194 } |
| 231 qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) | 195 qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) |
| 232 » » if err != nil { | 196 » » memoryCorruption(err) |
| 233 » » » panic(err) // memory corruption | |
| 234 » » } | |
| 235 compIdx = append(compIdx, &qi) | 197 compIdx = append(compIdx, &qi) |
| 236 return true | 198 return true |
| 237 }) | 199 }) |
| 238 return compIdx | 200 return compIdx |
| 239 } | 201 } |
| 240 | 202 |
| 241 func getIdxColl(store *memStore) *memCollection { | 203 func getIdxColl(store *memStore) *memCollection { |
| 242 idxColl := store.GetCollection("idx") | 204 idxColl := store.GetCollection("idx") |
| 243 if idxColl == nil { | 205 if idxColl == nil { |
| 244 idxColl = store.SetCollection("idx", nil) | 206 idxColl = store.SetCollection("idx", nil) |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 277 }) | 239 }) |
| 278 case ov != nil && nv != nil: // merge | 240 case ov != nil && nv != nil: // merge |
| 279 gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { | 241 gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { |
| 280 if nv == nil { | 242 if nv == nil { |
| 281 coll.Delete(k) | 243 coll.Delete(k) |
| 282 } else { | 244 } else { |
| 283 coll.Set(k, nv) | 245 coll.Set(k, nv) |
| 284 } | 246 } |
| 285 }) | 247 }) |
| 286 default: | 248 default: |
| 287 » » » panic("impossible") | 249 » » » impossible(fmt.Errorf("both values from gkvCollide were
nil?")) |
| 288 } | 250 } |
| 289 // TODO(riannucci): remove entries from idxColl and remove index
collections | 251 // TODO(riannucci): remove entries from idxColl and remove index
collections |
| 290 // when there are no index entries for that index any more. | 252 // when there are no index entries for that index any more. |
| 291 }) | 253 }) |
| 292 } | 254 } |
| 293 | 255 |
| 294 func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) { | 256 func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) { |
| 295 store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvl
ite.Item) bool { | 257 store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvl
ite.Item) bool { |
| 296 pm, err := rpmWoCtx(i.Val, ns) | 258 pm, err := rpmWoCtx(i.Val, ns) |
| 297 » » if err != nil { | 259 » » memoryCorruption(err) |
| 298 » » » panic(err) // memory corruption | 260 |
| 299 » » } | |
| 300 k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.Wi
thoutContext, globalAppID, ns) | 261 k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.Wi
thoutContext, globalAppID, ns) |
| 301 » » if err != nil { | 262 » » memoryCorruption(err) |
| 302 » » » panic(err) | 263 |
| 303 » » } | 264 » » sip := partiallySerialize(k, pm) |
| 304 » » sip := partiallySerialize(pm) | 265 » » mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k.Namesp
ace(), compIdx)) |
| 305 » » mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compI
dx)) | |
| 306 return true | 266 return true |
| 307 }) | 267 }) |
| 308 } | 268 } |
| 309 | 269 |
| 310 func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap)
{ | 270 func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap)
{ |
| 311 // load all current complex query index definitions. | 271 // load all current complex query index definitions. |
| 312 compIdx := getCompIdxs(getIdxColl(store)) | 272 compIdx := getCompIdxs(getIdxColl(store)) |
| 313 | 273 |
| 314 mergeIndexes(key.Namespace(), store, | 274 mergeIndexes(key.Namespace(), store, |
| 315 indexEntriesWithBuiltins(key, oldEnt, compIdx), | 275 indexEntriesWithBuiltins(key, oldEnt, compIdx), |
| 316 indexEntriesWithBuiltins(key, newEnt, compIdx)) | 276 indexEntriesWithBuiltins(key, newEnt, compIdx)) |
| 317 } | 277 } |
| OLD | NEW |