Index: impl/memory/datastore_index.go |
diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go |
index f22a970a8e9ef62593c2b60be332c08bd98086b6..4c37e7136b4aec16e0c167b6a2ce9be42cfaf059 100644 |
--- a/impl/memory/datastore_index.go |
+++ b/impl/memory/datastore_index.go |
@@ -14,8 +14,6 @@ import ( |
"github.com/luci/gkvlite" |
) |
-var indexCreationDeterministic = false |
- |
type qIndexSlice []*ds.IndexDefinition |
func (s qIndexSlice) Len() int { return len(s) } |
@@ -39,15 +37,15 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { |
ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}}) |
ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}}) |
} |
- if indexCreationDeterministic { |
+ if serializationDeterministic { |
sort.Sort(ret) |
} |
return ret |
} |
func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore { |
- sip := partiallySerialize(pm) |
- return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...)) |
+ sip := partiallySerialize(k, pm) |
+ return sip.indexEntries(k.Namespace(), append(defaultIndicies(k.Kind(), pm), complexIdxs...)) |
} |
// serializedPvals is all of the serialized DSProperty values in qASC order. |
@@ -58,29 +56,33 @@ func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 } |
// prop name -> [<serialized DSProperty>, ...] |
+// includes special values '__key__' and '__ancestor__' which contains all of |
+// the ancestor entries for this key. |
type serializedIndexablePmap map[string]serializedPvals |
-func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) { |
- if len(pm) == 0 { |
- return |
+func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePmap) { |
+ ret = make(serializedIndexablePmap, len(pm)+2) |
+ ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))} |
+ for k != nil { |
+ ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBytes(ds.MkProperty(k))) |
+ k = k.Parent() |
} |
- |
- buf := &bytes.Buffer{} |
- ret = make(serializedIndexablePmap, len(pm)) |
for k, vals := range pm { |
+ dups := map[string]struct{}{} |
newVals := make(serializedPvals, 0, len(vals)) |
for _, v := range vals { |
if v.IndexSetting() == ds.NoIndex { |
continue |
} |
- buf.Reset() |
- serialize.WriteProperty(buf, serialize.WithoutContext, v) |
- newVal := make([]byte, buf.Len()) |
- copy(newVal, buf.Bytes()) |
- newVals = append(newVals, newVal) |
+ data := serialize.ToBytes(v) |
+ dataS := string(data) |
+ if _, ok := dups[dataS]; ok { |
+ continue |
+ } |
+ dups[dataS] = struct{}{} |
+ newVals = append(newVals, data) |
} |
if len(newVals) > 0 { |
- sort.Sort(newVals) |
ret[k] = newVals |
} |
} |
@@ -95,7 +97,7 @@ type indexRowGen struct { |
} |
// permute calls cb for each index row, in the sorted order of the rows. |
-func (s indexRowGen) permute(cb func([]byte)) { |
+func (s indexRowGen) permute(collSetFn func(k, v []byte)) { |
iVec := make([]int, len(s.propVec)) |
iVecLim := make([]int, len(s.propVec)) |
@@ -137,18 +139,13 @@ func (s indexRowGen) permute(cb func([]byte)) { |
for pvalSliceIdx, pvalIdx := range iVec { |
bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) |
} |
- buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) |
+ buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufsiz))) |
for pvalSliceIdx, pvalIdx := range iVec { |
data := s.propVec[pvalSliceIdx][pvalIdx] |
- if s.orders[pvalSliceIdx] == ds.ASCENDING { |
- buf.Write(data) |
- } else { |
- for _, b := range data { |
- buf.WriteByte(b ^ 0xFF) |
- } |
- } |
+ buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING) |
+ buf.Write(data) |
} |
- cb(buf.Bytes()) |
+ collSetFn(buf.Bytes(), []byte{}) |
if !incPos() { |
break |
} |
@@ -162,13 +159,10 @@ type matcher struct { |
// matcher.match checks to see if the mapped, serialized property values |
// match the index. If they do, it returns a indexRowGen. Do not write or modify |
// the data in the indexRowGen. |
-func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) { |
+func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (indexRowGen, bool) { |
m.buf.propVec = m.buf.propVec[:0] |
m.buf.orders = m.buf.orders[:0] |
- for _, sb := range idx.SortBy { |
- if sb.Property == "__key__" { |
- panic("don't know how to build compound index on __key__") |
- } |
+ for _, sb := range sortBy { |
if pv, ok := sip[sb.Property]; ok { |
m.buf.propVec = append(m.buf.propVec, pv) |
m.buf.orders = append(m.buf.orders, sb.Direction) |
@@ -179,97 +173,71 @@ func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i |
return m.buf, true |
} |
-func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore { |
+func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefinition) *memStore { |
ret := newMemStore() |
idxColl := ret.SetCollection("idx", nil) |
- // getIdxEnts retrieves an index collection or adds it if it's not there. |
- getIdxEnts := func(qi *ds.IndexDefinition) *memCollection { |
- b := serialize.ToBytes(*qi) |
- idxColl.Set(b, []byte{}) |
- return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil) |
- } |
- |
- keyData := serialize.ToBytes(k) |
- |
- walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollection) { |
- prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil. |
- irg.permute(func(data []byte) { |
- buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(data)+len(keyData))) |
- buf.Write(prefix) |
- buf.Write(data) |
- buf.Write(keyData) |
- ents.Set(buf.Bytes(), prev) |
- prev = data |
- }) |
- } |
mtch := matcher{} |
for _, idx := range idxs { |
- if irg, ok := mtch.match(idx, sip); ok { |
- idxEnts := getIdxEnts(idx) |
- if len(irg.propVec) == 0 { |
- idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil |
- } else if idx.Ancestor { |
- for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() { |
- walkPermutations(serialize.ToBytes(ancKey), irg, idxEnts) |
- } |
- } else { |
- walkPermutations(nil, irg, idxEnts) |
- } |
+ idx = idx.Normalize() |
+ if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok { |
+ idxBin := serialize.ToBytes(*idx.PrepForIdxTable()) |
+ idxColl.Set(idxBin, []byte{}) |
dnj (Google)
2015/08/28 17:54:21
Use []byte(nil) here, as []byte{} allocates an emp
iannucci
2015/08/28 19:48:55
If gkvlite didn't panic on nil values, I would tot
|
+ coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil) |
+ irg.permute(coll.Set) |
} |
} |
return ret |
} |
-func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition { |
- // load all current complex query index definitions. |
- compIdx := []*ds.IndexDefinition{} |
- complexQueryPrefix := ds.IndexComplexQueryPrefix() |
- idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool { |
- if !bytes.HasPrefix(i.Key, complexQueryPrefix) { |
- return false |
- } |
- qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) |
- if err != nil { |
- panic(err) // memory corruption |
- } |
- compIdx = append(compIdx, &qi) |
- return true |
- }) |
- return compIdx |
-} |
- |
-func getIdxColl(store *memStore) *memCollection { |
+// walkCompIdxs walks the table of compound indexes in the store. If `endsWith` |
+// is provided, this will only walk over compound indexes which match |
+// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix. |
+func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) { |
idxColl := store.GetCollection("idx") |
if idxColl == nil { |
- idxColl = store.SetCollection("idx", nil) |
+ return |
+ } |
+ itrDef := iterDefinition{c: idxColl} |
+ |
+ if endsWith != nil { |
+ full := serialize.ToBytes(*endsWith.Flip()) |
+ // chop off the null terminating byte |
+ itrDef.prefix = full[:len(full)-1] |
+ } |
+ |
+ for it := itrDef.mkIter(); !it.stopped; { |
+ it.next(nil, func(i *gkvlite.Item) { |
+ if i == nil { |
+ return |
+ } |
+ qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) |
+ memoryCorruption(err) |
+ if !cb(qi.Flip()) { |
+ it.stop() |
dnj (Google)
2015/08/28 17:54:21
It'd be better to arrange to do this via defer, as
iannucci
2015/08/28 19:48:55
I think you mean 'additionally do this via a defer
|
+ } |
+ }) |
} |
- return idxColl |
} |
func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { |
- idxColl := getIdxColl(store) |
prefix := "idx:" + ns + ":" |
gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) { |
ks := prefix + string(k) |
dnj (Google)
2015/08/28 17:54:21
Since you do this a lot, you should probably alloc
iannucci
2015/08/28 19:48:55
done. though this is definitely a micro-optimizati
|
- if idxColl.Get(k) == nil { |
- // avoids unnecessary mutation, otherwise the idx collection thrashes on |
- // every update. |
- idxColl.Set(k, []byte{}) |
- } |
coll := store.GetCollection(ks) |
if coll == nil { |
coll = store.SetCollection(ks, nil) |
} |
+ |
oldColl := oldIdx.GetCollection(ks) |
newColl := newIdx.GetCollection(ks) |
switch { |
case ov == nil && nv != nil: // all additions |
newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool { |
- coll.Set(i.Key, i.Val) |
+ coll.Set(i.Key, []byte{}) |
return true |
}) |
case ov != nil && nv == nil: // all deletions |
@@ -282,11 +250,11 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { |
if nv == nil { |
coll.Delete(k) |
} else { |
- coll.Set(k, nv) |
+ coll.Set(k, []byte{}) |
} |
}) |
default: |
- panic("impossible") |
+ impossible(fmt.Errorf("both values from gkvCollide were nil?")) |
} |
// TODO(riannucci): remove entries from idxColl and remove index collections |
// when there are no index entries for that index any more. |
@@ -294,24 +262,40 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { |
} |
func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) { |
- store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { |
- pm, err := rpmWoCtx(i.Val, ns) |
- if err != nil { |
- panic(err) // memory corruption |
- } |
- k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns) |
- if err != nil { |
- panic(err) |
- } |
- sip := partiallySerialize(pm) |
- mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx)) |
- return true |
- }) |
+ normalized := make([]*ds.IndexDefinition, len(compIdx)) |
+ idxColl := store.SetCollection("idx", nil) |
+ for i, idx := range compIdx { |
+ normalized[i] = idx.Normalize() |
+ idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{}) |
+ } |
+ |
+ if allEnts := store.GetCollection("ents:" + ns); allEnts != nil { |
+ allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { |
+ pm, err := rpmWoCtx(i.Val, ns) |
+ memoryCorruption(err) |
+ |
+ prop, err := serialize.ReadProperty(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns) |
+ memoryCorruption(err) |
+ |
+ k := prop.Value().(ds.Key) |
+ |
+ sip := partiallySerialize(k, pm) |
+ |
+ mergeIndexes(ns, store, |
+ newMemStore(), |
+ sip.indexEntries(ns, normalized)) |
+ return true |
+ }) |
+ } |
} |
func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) { |
// load all current complex query index definitions. |
- compIdx := getCompIdxs(getIdxColl(store)) |
+ compIdx := []*ds.IndexDefinition{} |
+ walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool { |
+ compIdx = append(compIdx, i) |
+ return true |
+ }) |
mergeIndexes(key.Namespace(), store, |
indexEntriesWithBuiltins(key, oldEnt, compIdx), |