| Index: impl/memory/datastore_index.go
|
| diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go
|
| index f22a970a8e9ef62593c2b60be332c08bd98086b6..354584c006879ed4d814039e015ab814eab6f7fc 100644
|
| --- a/impl/memory/datastore_index.go
|
| +++ b/impl/memory/datastore_index.go
|
| @@ -14,8 +14,6 @@ import (
|
| "github.com/luci/gkvlite"
|
| )
|
|
|
| -var indexCreationDeterministic = false
|
| -
|
| type qIndexSlice []*ds.IndexDefinition
|
|
|
| func (s qIndexSlice) Len() int { return len(s) }
|
| @@ -39,15 +37,15 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
|
| ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}})
|
| ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}})
|
| }
|
| - if indexCreationDeterministic {
|
| + if serializationDeterministic {
|
| sort.Sort(ret)
|
| }
|
| return ret
|
| }
|
|
|
| func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore {
|
| - sip := partiallySerialize(pm)
|
| - return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...))
|
| + sip := partiallySerialize(k, pm)
|
| + return sip.indexEntries(k.Namespace(), append(defaultIndicies(k.Kind(), pm), complexIdxs...))
|
| }
|
|
|
| // serializedPvals is all of the serialized DSProperty values in qASC order.
|
| @@ -58,29 +56,33 @@ func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
| func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 }
|
|
|
| // prop name -> [<serialized DSProperty>, ...]
|
| +// includes special values '__key__' and '__ancestor__' which contains all of
|
| +// the ancestor entries for this key.
|
| type serializedIndexablePmap map[string]serializedPvals
|
|
|
| -func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) {
|
| - if len(pm) == 0 {
|
| - return
|
| +func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePmap) {
|
| + ret = make(serializedIndexablePmap, len(pm)+2)
|
| + ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))}
|
| + for k != nil {
|
| + ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBytes(ds.MkProperty(k)))
|
| + k = k.Parent()
|
| }
|
| -
|
| - buf := &bytes.Buffer{}
|
| - ret = make(serializedIndexablePmap, len(pm))
|
| for k, vals := range pm {
|
| + dups := map[string]struct{}{}
|
| newVals := make(serializedPvals, 0, len(vals))
|
| for _, v := range vals {
|
| if v.IndexSetting() == ds.NoIndex {
|
| continue
|
| }
|
| - buf.Reset()
|
| - serialize.WriteProperty(buf, serialize.WithoutContext, v)
|
| - newVal := make([]byte, buf.Len())
|
| - copy(newVal, buf.Bytes())
|
| - newVals = append(newVals, newVal)
|
| + data := serialize.ToBytes(v)
|
| + dataS := string(data)
|
| + if _, ok := dups[dataS]; ok {
|
| + continue
|
| + }
|
| + dups[dataS] = struct{}{}
|
| + newVals = append(newVals, data)
|
| }
|
| if len(newVals) > 0 {
|
| - sort.Sort(newVals)
|
| ret[k] = newVals
|
| }
|
| }
|
| @@ -95,7 +97,7 @@ type indexRowGen struct {
|
| }
|
|
|
| // permute calls cb for each index row, in the sorted order of the rows.
|
| -func (s indexRowGen) permute(cb func([]byte)) {
|
| +func (s indexRowGen) permute(collSetFn func(k, v []byte)) {
|
| iVec := make([]int, len(s.propVec))
|
| iVecLim := make([]int, len(s.propVec))
|
|
|
| @@ -137,18 +139,13 @@ func (s indexRowGen) permute(cb func([]byte)) {
|
| for pvalSliceIdx, pvalIdx := range iVec {
|
| bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx])
|
| }
|
| - buf := bytes.NewBuffer(make([]byte, 0, bufsiz))
|
| + buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufsiz)))
|
| for pvalSliceIdx, pvalIdx := range iVec {
|
| data := s.propVec[pvalSliceIdx][pvalIdx]
|
| - if s.orders[pvalSliceIdx] == ds.ASCENDING {
|
| - buf.Write(data)
|
| - } else {
|
| - for _, b := range data {
|
| - buf.WriteByte(b ^ 0xFF)
|
| - }
|
| - }
|
| + buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING)
|
| + buf.Write(data)
|
| }
|
| - cb(buf.Bytes())
|
| + collSetFn(buf.Bytes(), []byte{})
|
| if !incPos() {
|
| break
|
| }
|
| @@ -162,13 +159,10 @@ type matcher struct {
|
| // matcher.match checks to see if the mapped, serialized property values
|
| // match the index. If they do, it returns a indexRowGen. Do not write or modify
|
| // the data in the indexRowGen.
|
| -func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) {
|
| +func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (indexRowGen, bool) {
|
| m.buf.propVec = m.buf.propVec[:0]
|
| m.buf.orders = m.buf.orders[:0]
|
| - for _, sb := range idx.SortBy {
|
| - if sb.Property == "__key__" {
|
| - panic("don't know how to build compound index on __key__")
|
| - }
|
| + for _, sb := range sortBy {
|
| if pv, ok := sip[sb.Property]; ok {
|
| m.buf.propVec = append(m.buf.propVec, pv)
|
| m.buf.orders = append(m.buf.orders, sb.Direction)
|
| @@ -179,97 +173,75 @@ func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i
|
| return m.buf, true
|
| }
|
|
|
| -func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore {
|
| +func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefinition) *memStore {
|
| ret := newMemStore()
|
| idxColl := ret.SetCollection("idx", nil)
|
| - // getIdxEnts retrieves an index collection or adds it if it's not there.
|
| - getIdxEnts := func(qi *ds.IndexDefinition) *memCollection {
|
| - b := serialize.ToBytes(*qi)
|
| - idxColl.Set(b, []byte{})
|
| - return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil)
|
| - }
|
| -
|
| - keyData := serialize.ToBytes(k)
|
| -
|
| - walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollection) {
|
| - prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil.
|
| - irg.permute(func(data []byte) {
|
| - buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(data)+len(keyData)))
|
| - buf.Write(prefix)
|
| - buf.Write(data)
|
| - buf.Write(keyData)
|
| - ents.Set(buf.Bytes(), prev)
|
| - prev = data
|
| - })
|
| - }
|
|
|
| mtch := matcher{}
|
| for _, idx := range idxs {
|
| - if irg, ok := mtch.match(idx, sip); ok {
|
| - idxEnts := getIdxEnts(idx)
|
| - if len(irg.propVec) == 0 {
|
| - idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil
|
| - } else if idx.Ancestor {
|
| - for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() {
|
| - walkPermutations(serialize.ToBytes(ancKey), irg, idxEnts)
|
| - }
|
| - } else {
|
| - walkPermutations(nil, irg, idxEnts)
|
| - }
|
| + idx = idx.Normalize()
|
| + if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok {
|
| + idxBin := serialize.ToBytes(*idx.PrepForIdxTable())
|
| + idxColl.Set(idxBin, []byte{})
|
| + coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil)
|
| + irg.permute(coll.Set)
|
| }
|
| }
|
|
|
| return ret
|
| }
|
|
|
| -func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition {
|
| - // load all current complex query index definitions.
|
| - compIdx := []*ds.IndexDefinition{}
|
| - complexQueryPrefix := ds.IndexComplexQueryPrefix()
|
| - idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool {
|
| - if !bytes.HasPrefix(i.Key, complexQueryPrefix) {
|
| - return false
|
| - }
|
| - qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key))
|
| - if err != nil {
|
| - panic(err) // memory corruption
|
| - }
|
| - compIdx = append(compIdx, &qi)
|
| - return true
|
| - })
|
| - return compIdx
|
| -}
|
| -
|
| -func getIdxColl(store *memStore) *memCollection {
|
| +// walkCompIdxs walks the table of compound indexes in the store. If `endsWith`
|
| +// is provided, this will only walk over compound indexes which match
|
| +// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix.
|
| +func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) {
|
| idxColl := store.GetCollection("idx")
|
| if idxColl == nil {
|
| - idxColl = store.SetCollection("idx", nil)
|
| + return
|
| + }
|
| + itrDef := iterDefinition{c: idxColl}
|
| +
|
| + if endsWith != nil {
|
| + full := serialize.ToBytes(*endsWith.Flip())
|
| + // chop off the null terminating byte
|
| + itrDef.prefix = full[:len(full)-1]
|
| + }
|
| +
|
| + it := itrDef.mkIter()
|
| + defer it.stop()
|
| + for !it.stopped {
|
| + it.next(nil, func(i *gkvlite.Item) {
|
| + if i == nil {
|
| + return
|
| + }
|
| + qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key))
|
| + memoryCorruption(err)
|
| + if !cb(qi.Flip()) {
|
| + it.stop()
|
| + }
|
| + })
|
| }
|
| - return idxColl
|
| }
|
|
|
| func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
|
| - idxColl := getIdxColl(store)
|
| - prefix := "idx:" + ns + ":"
|
| + prefixBuf := []byte("idx:" + ns + ":")
|
| + origPrefixBufLen := len(prefixBuf)
|
| gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) {
|
| - ks := prefix + string(k)
|
| - if idxColl.Get(k) == nil {
|
| - // avoids unnecessary mutation, otherwise the idx collection thrashes on
|
| - // every update.
|
| - idxColl.Set(k, []byte{})
|
| - }
|
| + prefixBuf = append(prefixBuf[:origPrefixBufLen], k...)
|
| + ks := string(prefixBuf)
|
|
|
| coll := store.GetCollection(ks)
|
| if coll == nil {
|
| coll = store.SetCollection(ks, nil)
|
| }
|
| +
|
| oldColl := oldIdx.GetCollection(ks)
|
| newColl := newIdx.GetCollection(ks)
|
|
|
| switch {
|
| case ov == nil && nv != nil: // all additions
|
| newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool {
|
| - coll.Set(i.Key, i.Val)
|
| + coll.Set(i.Key, []byte{})
|
| return true
|
| })
|
| case ov != nil && nv == nil: // all deletions
|
| @@ -282,11 +254,11 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
|
| if nv == nil {
|
| coll.Delete(k)
|
| } else {
|
| - coll.Set(k, nv)
|
| + coll.Set(k, []byte{})
|
| }
|
| })
|
| default:
|
| - panic("impossible")
|
| + impossible(fmt.Errorf("both values from gkvCollide were nil?"))
|
| }
|
| // TODO(riannucci): remove entries from idxColl and remove index collections
|
| // when there are no index entries for that index any more.
|
| @@ -294,24 +266,40 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
|
| }
|
|
|
| func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) {
|
| - store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
|
| - pm, err := rpmWoCtx(i.Val, ns)
|
| - if err != nil {
|
| - panic(err) // memory corruption
|
| - }
|
| - k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns)
|
| - if err != nil {
|
| - panic(err)
|
| - }
|
| - sip := partiallySerialize(pm)
|
| - mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx))
|
| - return true
|
| - })
|
| + normalized := make([]*ds.IndexDefinition, len(compIdx))
|
| + idxColl := store.SetCollection("idx", nil)
|
| + for i, idx := range compIdx {
|
| + normalized[i] = idx.Normalize()
|
| + idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{})
|
| + }
|
| +
|
| + if allEnts := store.GetCollection("ents:" + ns); allEnts != nil {
|
| + allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
|
| + pm, err := rpmWoCtx(i.Val, ns)
|
| + memoryCorruption(err)
|
| +
|
| + prop, err := serialize.ReadProperty(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns)
|
| + memoryCorruption(err)
|
| +
|
| + k := prop.Value().(ds.Key)
|
| +
|
| + sip := partiallySerialize(k, pm)
|
| +
|
| + mergeIndexes(ns, store,
|
| + newMemStore(),
|
| + sip.indexEntries(ns, normalized))
|
| + return true
|
| + })
|
| + }
|
| }
|
|
|
| func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
|
| // load all current complex query index definitions.
|
| - compIdx := getCompIdxs(getIdxColl(store))
|
| + compIdx := []*ds.IndexDefinition{}
|
| + walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool {
|
| + compIdx = append(compIdx, i)
|
| + return true
|
| + })
|
|
|
| mergeIndexes(key.Namespace(), store,
|
| indexEntriesWithBuiltins(key, oldEnt, compIdx),
|
|
|