| Index: impl/memory/gkvlite_iter.go
|
| diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go
|
| index cf407970920d016aafbf7b75367f6dd9e83d7c95..5bfc726f86d16f9ecc27c8add00b02d448fc8b23 100644
|
| --- a/impl/memory/gkvlite_iter.go
|
| +++ b/impl/memory/gkvlite_iter.go
|
| @@ -11,75 +11,41 @@ import (
|
| "github.com/luci/gkvlite"
|
| )
|
|
|
| -// TODO(riannucci): Add a multi-iterator which allows:
|
| -// multiple collections
|
| -// iterate smallest collection -> largest collection
|
| -// retain input order
|
| -// constant prefixes for each collection
|
| -// start + end partial suffixes
|
| -// * these are appended to the prefix for the collection to narrow the scan
|
| -// range. The collection scans will start at >= prefix+start suffix, and
|
| -// will scan until < prefix+end suffix.
|
| -// produces hits (a suffix+value) when all collections contain the same prefix
|
| -// and the same suffix
|
| -// * a hit value will have
|
| -// - a []byte suffix which is the matching suffix of every collection.
|
| -// The caller already knows the prefix for each collection. The caller
|
| -// may need to parse the suffix to separate the sort order(s) from the
|
| -// Key.
|
| -
|
| -// TODO(riannucci): dedup and distinct will be postfilters
|
| -// dedup could be done by simply analyzing the retrieved entity and then seeing
|
| -// if it could come before the query parameters, but this will break in
|
| -// eventually-consistent scenarios. Say that you changed an entity from having
|
| -// a multiproperty with 100 values to only having 1. Then it shows up in the
|
| -// index in 100 places, but on retrieval will only have 1 entry (so "it could
|
| -// not possibly be a dup"). Then you'll serve the entity 100 times :(
|
| -//
|
| -// Similarly, it won't allow you to dedup on 'distinct' queries, since you'd
|
| -// be forced to retrieve and decode the entity, defeating the purpose.
|
| -
|
| -func bjoin(itms ...[]byte) []byte {
|
| - total := 0
|
| - for _, i := range itms {
|
| - total += len(i)
|
| - }
|
| - ret := make([]byte, 0, total)
|
| - for _, i := range itms {
|
| - ret = append(ret, i...)
|
| - }
|
| - return ret
|
| -}
|
| -
|
| type iterDefinition struct {
|
| - c *memCollection
|
| + // The collection to iterate over
|
| + c *memCollection
|
| +
|
| + // The prefix to always assert for every row. A nil prefix matches every row.
|
| prefix []byte
|
| - start []byte
|
| - end []byte
|
| +
|
| + // prefixLen is the number of prefix bytes that the caller cares about. It
|
| + // may be <= len(prefix). When doing a multiIterator, this number will be used
|
| + // to determine the amount of suffix to transfer accross iterators. This is
|
| + // used specifically when using builtin indexes to service ancestor queries.
|
| + // The builtin index represents the ancestor key with prefix bytes, but in a
|
| + // multiIterator context, it wants the entire key to be included in the
|
| + // suffix.
|
| + prefixLen int
|
| +
|
| + // The start cursor. It's appended to prefix to find the first row.
|
| + start []byte
|
| +
|
| + // The end cursor. It's appended to prefix to find the last row (which is not
|
| + // included in the interation result). If this is nil, then there's no end
|
| + // except the natural end of the collection.
|
| + end []byte
|
| }
|
|
|
| func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) {
|
| + if len(defs) == 0 {
|
| + return
|
| + }
|
| +
|
| ts := make([]*iterator, len(defs))
|
| prefixLens := make([]int, len(defs))
|
| for i, def := range defs {
|
| - if def.prefix == nil {
|
| - panic("got nil prefix")
|
| - }
|
| -
|
| - end := []byte(nil)
|
| - if def.end != nil {
|
| - end = bjoin(def.prefix, def.end)
|
| - } else {
|
| - end = []byte(increment(string(def.prefix), true))
|
| - }
|
| -
|
| - start := def.prefix
|
| - if def.start != nil {
|
| - start = bjoin(def.prefix, def.start)
|
| - }
|
| -
|
| - ts[i] = newIterator(def.c, start, end)
|
| - prefixLens[i] = len(def.prefix)
|
| + ts[i] = newIterator(def)
|
| + prefixLens[i] = def.prefixLen
|
| defer ts[i].stop()
|
| }
|
|
|
| @@ -148,12 +114,25 @@ type iterator struct {
|
| ch chan<- *cmd
|
| }
|
|
|
| -func newIterator(coll *memCollection, start, end []byte) *iterator {
|
| +func newIterator(def *iterDefinition) *iterator {
|
| cmdChan := make(chan *cmd)
|
| ret := &iterator{
|
| ch: cmdChan,
|
| }
|
|
|
| + // convert the suffixes from the iterDefinition into full rows for the
|
| + // underlying storage.
|
| + start := make([]byte, len(def.prefix)+len(def.start))
|
| + copy(start, def.prefix)
|
| + copy(start[len(def.prefix):], def.start)
|
| +
|
| + end := []byte(nil)
|
| + if def.end != nil {
|
| + end = make([]byte, len(def.prefix)+len(def.end))
|
| + copy(end, def.prefix)
|
| + copy(end[len(def.prefix):], def.end)
|
| + }
|
| +
|
| go func() {
|
| c := (*cmd)(nil)
|
| ensureCmd := func() bool {
|
| @@ -172,28 +151,33 @@ func newIterator(coll *memCollection, start, end []byte) *iterator {
|
| }
|
|
|
| for {
|
| + defer ret.stop()
|
| if !ensureCmd() {
|
| return
|
| }
|
| - needCallback := true
|
| - coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool {
|
| + terminalCallback := true
|
| + def.c.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool {
|
| if !ensureCmd() {
|
| return false
|
| }
|
| - if c.targ != nil && bytes.Compare(i.Key, c.targ) < 0 {
|
| + if bytes.Compare(i.Key, c.targ) < 0 {
|
| // we need to start a new ascention function
|
| - needCallback = false
|
| + terminalCallback = false
|
| + return false
|
| + }
|
| + if !bytes.HasPrefix(i.Key, def.prefix) {
|
| + // we're no longer in prefix, terminate
|
| return false
|
| }
|
| if end != nil && bytes.Compare(i.Key, end) >= 0 {
|
| - // we hit our cap, return nil in callback outside loop.
|
| + // we hit our cap, terminate.
|
| return false
|
| }
|
| c.cb(i)
|
| c = nil
|
| return true
|
| })
|
| - if c != nil && needCallback {
|
| + if terminalCallback && ensureCmd() {
|
| c.cb(nil)
|
| c = nil
|
| }
|
|
|