Index: impl/memory/gkvlite_iter.go |
diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go |
index cf407970920d016aafbf7b75367f6dd9e83d7c95..5bfc726f86d16f9ecc27c8add00b02d448fc8b23 100644 |
--- a/impl/memory/gkvlite_iter.go |
+++ b/impl/memory/gkvlite_iter.go |
@@ -11,75 +11,41 @@ import ( |
"github.com/luci/gkvlite" |
) |
-// TODO(riannucci): Add a multi-iterator which allows: |
-// multiple collections |
-// iterate smallest collection -> largest collection |
-// retain input order |
-// constant prefixes for each collection |
-// start + end partial suffixes |
-// * these are appended to the prefix for the collection to narrow the scan |
-// range. The collection scans will start at >= prefix+start suffix, and |
-// will scan until < prefix+end suffix. |
-// produces hits (a suffix+value) when all collections contain the same prefix |
-// and the same suffix |
-// * a hit value will have |
-// - a []byte suffix which is the matching suffix of every collection. |
-// The caller already knows the prefix for each collection. The caller |
-// may need to parse the suffix to separate the sort order(s) from the |
-// Key. |
- |
-// TODO(riannucci): dedup and distinct will be postfilters |
-// dedup could be done by simply analyzing the retrieved entity and then seeing |
-// if it could come before the query parameters, but this will break in |
-// eventually-consistent scenarios. Say that you changed an entity from having |
-// a multiproperty with 100 values to only having 1. Then it shows up in the |
-// index in 100 places, but on retrieval will only have 1 entry (so "it could |
-// not possibly be a dup"). Then you'll serve the entity 100 times :( |
-// |
-// Similarly, it won't allow you to dedup on 'distinct' queries, since you'd |
-// be forced to retrieve and decode the entity, defeating the purpose. |
- |
-func bjoin(itms ...[]byte) []byte { |
- total := 0 |
- for _, i := range itms { |
- total += len(i) |
- } |
- ret := make([]byte, 0, total) |
- for _, i := range itms { |
- ret = append(ret, i...) |
- } |
- return ret |
-} |
- |
type iterDefinition struct { |
dnj (Google)
2015/08/23 06:50:07
Nice combination.
|
- c *memCollection |
+ // The collection to iterate over |
+ c *memCollection |
+ |
+ // The prefix to always assert for every row. A nil prefix matches every row. |
prefix []byte |
- start []byte |
- end []byte |
+ |
+ // prefixLen is the number of prefix bytes that the caller cares about. It |
+ // may be <= len(prefix). When doing a multiIterator, this number will be used |
+ // to determine the amount of suffix to transfer accross iterators. This is |
+ // used specifically when using builtin indexes to service ancestor queries. |
+ // The builtin index represents the ancestor key with prefix bytes, but in a |
+ // multiIterator context, it wants the entire key to be included in the |
+ // suffix. |
+ prefixLen int |
+ |
+ // The start cursor. It's appended to prefix to find the first row. |
+ start []byte |
+ |
+ // The end cursor. It's appended to prefix to find the last row (which is not |
+ // included in the interation result). If this is nil, then there's no end |
+ // except the natural end of the collection. |
+ end []byte |
} |
func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { |
+ if len(defs) == 0 { |
+ return |
+ } |
+ |
ts := make([]*iterator, len(defs)) |
prefixLens := make([]int, len(defs)) |
for i, def := range defs { |
- if def.prefix == nil { |
- panic("got nil prefix") |
- } |
- |
- end := []byte(nil) |
- if def.end != nil { |
- end = bjoin(def.prefix, def.end) |
- } else { |
- end = []byte(increment(string(def.prefix), true)) |
- } |
- |
- start := def.prefix |
- if def.start != nil { |
- start = bjoin(def.prefix, def.start) |
- } |
- |
- ts[i] = newIterator(def.c, start, end) |
- prefixLens[i] = len(def.prefix) |
+ ts[i] = newIterator(def) |
+ prefixLens[i] = def.prefixLen |
defer ts[i].stop() |
dnj (Google)
2015/08/23 06:50:07
This "i" is bound do the last value held by the it
iannucci
2015/08/23 18:19:43
good catch! Done.
|
} |
@@ -148,12 +114,25 @@ type iterator struct { |
ch chan<- *cmd |
} |
-func newIterator(coll *memCollection, start, end []byte) *iterator { |
+func newIterator(def *iterDefinition) *iterator { |
cmdChan := make(chan *cmd) |
ret := &iterator{ |
ch: cmdChan, |
} |
+ // convert the suffixes from the iterDefinition into full rows for the |
+ // underlying storage. |
+ start := make([]byte, len(def.prefix)+len(def.start)) |
+ copy(start, def.prefix) |
dnj (Google)
2015/08/23 06:50:07
Might as well use "bjoin" here.
iannucci
2015/08/23 18:19:43
Oh ya!
|
+ copy(start[len(def.prefix):], def.start) |
+ |
+ end := []byte(nil) |
+ if def.end != nil { |
+ end = make([]byte, len(def.prefix)+len(def.end)) |
dnj (Google)
2015/08/23 06:50:07
Might as well use "bjoin" here.
iannucci
2015/08/23 18:19:43
Done.
|
+ copy(end, def.prefix) |
+ copy(end[len(def.prefix):], def.end) |
+ } |
+ |
go func() { |
c := (*cmd)(nil) |
ensureCmd := func() bool { |
@@ -172,28 +151,33 @@ func newIterator(coll *memCollection, start, end []byte) *iterator { |
} |
for { |
+ defer ret.stop() |
dnj (Google)
2015/08/23 06:50:07
You should do this one before the loop.
iannucci
2015/08/23 18:19:43
Done.
|
if !ensureCmd() { |
return |
} |
- needCallback := true |
- coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool { |
+ terminalCallback := true |
+ def.c.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool { |
if !ensureCmd() { |
return false |
} |
- if c.targ != nil && bytes.Compare(i.Key, c.targ) < 0 { |
+ if bytes.Compare(i.Key, c.targ) < 0 { |
// we need to start a new ascention function |
dnj (Google)
2015/08/23 06:50:07
ascension
Is this actually a condition that can h
iannucci
2015/08/23 18:19:43
? This is when we have an iterator over
1
2
3
4
5
|
- needCallback = false |
+ terminalCallback = false |
+ return false |
+ } |
+ if !bytes.HasPrefix(i.Key, def.prefix) { |
+ // we're no longer in prefix, terminate |
return false |
} |
if end != nil && bytes.Compare(i.Key, end) >= 0 { |
- // we hit our cap, return nil in callback outside loop. |
+ // we hit our cap, terminate. |
return false |
} |
c.cb(i) |
c = nil |
return true |
}) |
- if c != nil && needCallback { |
+ if terminalCallback && ensureCmd() { |
c.cb(nil) |
c = nil |
} |