Index: impl/memory/datastore_index.go |
diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go |
index 879258316e9cb7ed309d8d96086aed267a609a82..46c2ec5173c92f6b283ba55edebd77ba8ef02c92 100644 |
--- a/impl/memory/datastore_index.go |
+++ b/impl/memory/datastore_index.go |
@@ -15,15 +15,15 @@ import ( |
var indexCreationDeterministic = false |
-type qIndexSlice []*qIndex |
+type qIndexSlice []*ds.IndexDefinition |
func (s qIndexSlice) Len() int { return len(s) } |
func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } |
-func defaultIndicies(kind string, pmap ds.PropertyMap) []*qIndex { |
+func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { |
ret := make(qIndexSlice, 0, 2*len(pmap)+1) |
- ret = append(ret, &qIndex{kind, false, nil}) |
+ ret = append(ret, &ds.IndexDefinition{Kind: kind}) |
for name, pvals := range pmap { |
needsIndex := false |
for _, v := range pvals { |
@@ -35,8 +35,8 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*qIndex { |
if !needsIndex { |
continue |
} |
- ret = append(ret, &qIndex{kind, false, []qSortBy{{name, qASC}}}) |
- ret = append(ret, &qIndex{kind, false, []qSortBy{{name, qDEC}}}) |
+ ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}}) |
+ ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}}) |
} |
if indexCreationDeterministic { |
sort.Sort(ret) |
@@ -44,7 +44,7 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*qIndex { |
return ret |
} |
-func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*qIndex) *memStore { |
+func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore { |
sip := partiallySerialize(pm) |
return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...)) |
} |
@@ -87,10 +87,10 @@ func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) { |
} |
// indexRowGen contains enough information to generate all of the index rows which |
-// correspond with a propertyList and a qIndex. |
+// correspond with a propertyList and a ds.IndexDefinition. |
type indexRowGen struct { |
propVec []serializedPvals |
- orders []qDirection |
+ orders []ds.IndexDirection |
} |
// permute calls cb for each index row, in the sorted order of the rows. |
@@ -102,7 +102,7 @@ func (s indexRowGen) permute(cb func([]byte)) { |
for i := len(iVec) - 1; i >= 0; i-- { |
var done bool |
var newVal int |
- if s.orders[i] == qASC { |
+ if s.orders[i] == ds.ASCENDING { |
newVal = (iVec[i] + 1) % iVecLim[i] |
done = newVal != 0 |
} else { |
@@ -126,7 +126,7 @@ func (s indexRowGen) permute(cb func([]byte)) { |
} |
for i := range iVec { |
- if s.orders[i] == qDEC { |
+ if s.orders[i] == ds.DESCENDING { |
iVec[i] = iVecLim[i] - 1 |
} |
} |
@@ -139,7 +139,7 @@ func (s indexRowGen) permute(cb func([]byte)) { |
buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) |
for pvalSliceIdx, pvalIdx := range iVec { |
data := s.propVec[pvalSliceIdx][pvalIdx] |
- if s.orders[pvalSliceIdx] == qASC { |
+ if s.orders[pvalSliceIdx] == ds.ASCENDING { |
buf.Write(data) |
} else { |
for _, b := range data { |
@@ -161,13 +161,13 @@ type matcher struct { |
// matcher.match checks to see if the mapped, serialized property values |
// match the index. If they do, it returns a indexRowGen. Do not write or modify |
// the data in the indexRowGen. |
-func (m *matcher) match(idx *qIndex, sip serializedIndexablePmap) (indexRowGen, bool) { |
+func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) { |
m.buf.propVec = m.buf.propVec[:0] |
m.buf.orders = m.buf.orders[:0] |
- for _, sb := range idx.sortby { |
- if pv, ok := sip[sb.prop]; ok { |
+ for _, sb := range idx.SortBy { |
+ if pv, ok := sip[sb.Property]; ok { |
m.buf.propVec = append(m.buf.propVec, pv) |
- m.buf.orders = append(m.buf.orders, sb.dir) |
+ m.buf.orders = append(m.buf.orders, sb.Direction) |
} else { |
return indexRowGen{}, false |
} |
@@ -175,13 +175,13 @@ func (m *matcher) match(idx *qIndex, sip serializedIndexablePmap) (indexRowGen, |
return m.buf, true |
} |
-func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*qIndex) *memStore { |
+func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore { |
ret := newMemStore() |
idxColl := ret.SetCollection("idx", nil) |
// getIdxEnts retrieves an index collection or adds it if it's not there. |
- getIdxEnts := func(qi *qIndex) *memCollection { |
+ getIdxEnts := func(qi *ds.IndexDefinition) *memCollection { |
buf := &bytes.Buffer{} |
- qi.WriteBinary(buf) |
+ qi.Write(buf) |
b := buf.Bytes() |
idxColl.Set(b, []byte{}) |
return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil) |
@@ -209,7 +209,7 @@ func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*qIndex) *memSt |
idxEnts := getIdxEnts(idx) |
if len(irg.propVec) == 0 { |
idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil |
- } else if idx.ancestor { |
+ } else if idx.Ancestor { |
for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() { |
buf := &bytes.Buffer{} |
ds.WriteKey(buf, ds.WithoutContext, ancKey) |
@@ -224,35 +224,42 @@ func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*qIndex) *memSt |
return ret |
} |
-func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) { |
- idxColl := store.GetCollection("idx") |
- if idxColl == nil { |
- idxColl = store.SetCollection("idx", nil) |
- } |
- |
+func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition { |
// load all current complex query index definitions. |
- compIdx := []*qIndex{} |
+ compIdx := []*ds.IndexDefinition{} |
+ complexQueryPrefix := ds.IndexComplexQueryPrefix() |
idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool { |
if !bytes.HasPrefix(i.Key, complexQueryPrefix) { |
return false |
} |
- qi := &qIndex{} |
- if err := qi.ReadBinary(bytes.NewBuffer(i.Key)); err != nil { |
+ qi := &ds.IndexDefinition{} |
+ if err := qi.Read(bytes.NewBuffer(i.Key)); err != nil { |
panic(err) // memory corruption |
} |
compIdx = append(compIdx, qi) |
return true |
}) |
+ return compIdx |
+} |
- oldIdx := indexEntriesWithBuiltins(key, oldEnt, compIdx) |
- |
- newIdx := indexEntriesWithBuiltins(key, newEnt, compIdx) |
- |
- prefix := "idx:" + key.Namespace() + ":" |
+func getIdxColl(store *memStore) *memCollection { |
+ idxColl := store.GetCollection("idx") |
+ if idxColl == nil { |
+ idxColl = store.SetCollection("idx", nil) |
+ } |
+ return idxColl |
+} |
+func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { |
+ idxColl := getIdxColl(store) |
+ prefix := "idx:" + ns + ":" |
gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) { |
ks := prefix + string(k) |
- idxColl.Set(k, []byte{}) |
+ if idxColl.Get(k) == nil { |
+ // avoids unnecessary mutation, otherwise the idx collection thrashes on |
+ // every update. |
+ idxColl.Set(k, []byte{}) |
+ } |
coll := store.GetCollection(ks) |
if coll == nil { |
@@ -287,3 +294,28 @@ func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) |
// when there are no index entries for that index any more. |
}) |
} |
+ |
+func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) { |
+ store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { |
+ pm, err := rpmWoCtx(i.Val, ns) |
+ if err != nil { |
+ panic(err) // memory corruption |
+ } |
+ k, err := ds.ReadKey(bytes.NewBuffer(i.Key), ds.WithoutContext, globalAppID, ns) |
+ if err != nil { |
+ panic(err) |
+ } |
+ sip := partiallySerialize(pm) |
+ mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx)) |
+ return true |
+ }) |
+} |
+ |
+func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) { |
+ // load all current complex query index definitions. |
+ compIdx := getCompIdxs(getIdxColl(store)) |
+ |
+ mergeIndexes(key.Namespace(), store, |
+ indexEntriesWithBuiltins(key, oldEnt, compIdx), |
+ indexEntriesWithBuiltins(key, newEnt, compIdx)) |
+} |