Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(111)

Side by Side Diff: impl/memory/gkvlite_iter.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: Baby's first query! Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "sync" 9 "sync"
10 10
11 "github.com/luci/gkvlite" 11 "github.com/luci/gkvlite"
12 ) 12 )
13 13
14 // TODO(riannucci): Add a multi-iterator which allows: 14 type iterDefinition struct {
dnj (Google) 2015/08/23 06:50:07 Nice combination.
15 // multiple collections 15 » // The collection to iterate over
16 // iterate smallest collection -> largest collection 16 » c *memCollection
17 // retain input order
18 // constant prefixes for each collection
19 // start + end partial suffixes
20 // * these are appended to the prefix for the collection to narrow the scan
21 // range. The collection scans will start at >= prefix+start suffix, and
22 // will scan until < prefix+end suffix.
23 // produces hits (a suffix+value) when all collections contain the same prefix
24 // and the same suffix
25 // * a hit value will have
26 // - a []byte suffix which is the matching suffix of every collection.
27 // The caller already knows the prefix for each collection. The caller
28 // may need to parse the suffix to separate the sort order(s) from the
29 // Key.
30 17
31 // TODO(riannucci): dedup and distinct will be postfilters 18 » // The prefix to always assert for every row. A nil prefix matches every row.
32 // dedup could be done by simply analyzing the retrieved entity and then seeing 19 » prefix []byte
33 // if it could come before the query parameters, but this will break in
34 // eventually-consistent scenarios. Say that you changed an entity from having
35 // a multiproperty with 100 values to only having 1. Then it shows up in the
36 // index in 100 places, but on retrieval will only have 1 entry (so "it could
37 // not possibly be a dup"). Then you'll serve the entity 100 times :(
38 //
39 // Similarly, it won't allow you to dedup on 'distinct' queries, since you'd
40 // be forced to retrieve and decode the entity, defeating the purpose.
41 20
42 func bjoin(itms ...[]byte) []byte { 21 » // prefixLen is the number of prefix bytes that the caller cares about. It
43 » total := 0 22 » // may be <= len(prefix). When doing a multiIterator, this number will b e used
44 » for _, i := range itms { 23 » // to determine the amount of suffix to transfer accross iterators. This is
45 » » total += len(i) 24 » // used specifically when using builtin indexes to service ancestor quer ies.
46 » } 25 » // The builtin index represents the ancestor key with prefix bytes, but in a
47 » ret := make([]byte, 0, total) 26 » // multiIterator context, it wants the entire key to be included in the
48 » for _, i := range itms { 27 » // suffix.
49 » » ret = append(ret, i...) 28 » prefixLen int
50 » }
51 » return ret
52 }
53 29
54 type iterDefinition struct { 30 » // The start cursor. It's appended to prefix to find the first row.
55 » c *memCollection 31 » start []byte
56 » prefix []byte 32
57 » start []byte 33 » // The end cursor. It's appended to prefix to find the last row (which i s not
58 » end []byte 34 » // included in the interation result). If this is nil, then there's no e nd
35 » // except the natural end of the collection.
36 » end []byte
59 } 37 }
60 38
61 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { 39 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) {
40 if len(defs) == 0 {
41 return
42 }
43
62 ts := make([]*iterator, len(defs)) 44 ts := make([]*iterator, len(defs))
63 prefixLens := make([]int, len(defs)) 45 prefixLens := make([]int, len(defs))
64 for i, def := range defs { 46 for i, def := range defs {
65 » » if def.prefix == nil { 47 » » ts[i] = newIterator(def)
66 » » » panic("got nil prefix") 48 » » prefixLens[i] = def.prefixLen
67 » » }
68
69 » » end := []byte(nil)
70 » » if def.end != nil {
71 » » » end = bjoin(def.prefix, def.end)
72 » » } else {
73 » » » end = []byte(increment(string(def.prefix), true))
74 » » }
75
76 » » start := def.prefix
77 » » if def.start != nil {
78 » » » start = bjoin(def.prefix, def.start)
79 » » }
80
81 » » ts[i] = newIterator(def.c, start, end)
82 » » prefixLens[i] = len(def.prefix)
83 defer ts[i].stop() 49 defer ts[i].stop()
dnj (Google) 2015/08/23 06:50:07 This "i" is bound do the last value held by the it
iannucci 2015/08/23 18:19:43 good catch! Done.
84 } 50 }
85 51
86 suffix := []byte(nil) 52 suffix := []byte(nil)
87 skip := -1 53 skip := -1
88 54
89 for { 55 for {
90 stop := false 56 stop := false
91 restart := false 57 restart := false
92 58
93 for idx, it := range ts { 59 for idx, it := range ts {
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
141 cb func(*gkvlite.Item) 107 cb func(*gkvlite.Item)
142 } 108 }
143 109
144 type iterator struct { 110 type iterator struct {
145 stopper sync.Once 111 stopper sync.Once
146 112
147 stopped bool 113 stopped bool
148 ch chan<- *cmd 114 ch chan<- *cmd
149 } 115 }
150 116
151 func newIterator(coll *memCollection, start, end []byte) *iterator { 117 func newIterator(def *iterDefinition) *iterator {
152 cmdChan := make(chan *cmd) 118 cmdChan := make(chan *cmd)
153 ret := &iterator{ 119 ret := &iterator{
154 ch: cmdChan, 120 ch: cmdChan,
155 } 121 }
156 122
123 // convert the suffixes from the iterDefinition into full rows for the
124 // underlying storage.
125 start := make([]byte, len(def.prefix)+len(def.start))
126 copy(start, def.prefix)
dnj (Google) 2015/08/23 06:50:07 Might as well use "bjoin" here.
iannucci 2015/08/23 18:19:43 Oh ya!
127 copy(start[len(def.prefix):], def.start)
128
129 end := []byte(nil)
130 if def.end != nil {
131 end = make([]byte, len(def.prefix)+len(def.end))
dnj (Google) 2015/08/23 06:50:07 Might as well use "bjoin" here.
iannucci 2015/08/23 18:19:43 Done.
132 copy(end, def.prefix)
133 copy(end[len(def.prefix):], def.end)
134 }
135
157 go func() { 136 go func() {
158 c := (*cmd)(nil) 137 c := (*cmd)(nil)
159 ensureCmd := func() bool { 138 ensureCmd := func() bool {
160 if c == nil { 139 if c == nil {
161 c = <-cmdChan 140 c = <-cmdChan
162 if c == nil { // stop() 141 if c == nil { // stop()
163 return false 142 return false
164 } 143 }
165 } 144 }
166 return true 145 return true
167 } 146 }
168 if ensureCmd() { 147 if ensureCmd() {
169 if bytes.Compare(c.targ, start) < 0 { 148 if bytes.Compare(c.targ, start) < 0 {
170 c.targ = start 149 c.targ = start
171 } 150 }
172 } 151 }
173 152
174 for { 153 for {
154 defer ret.stop()
dnj (Google) 2015/08/23 06:50:07 You should do this one before the loop.
iannucci 2015/08/23 18:19:43 Done.
175 if !ensureCmd() { 155 if !ensureCmd() {
176 return 156 return
177 } 157 }
178 » » » needCallback := true 158 » » » terminalCallback := true
179 » » » coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item ) bool { 159 » » » def.c.VisitItemsAscend(c.targ, true, func(i *gkvlite.Ite m) bool {
180 if !ensureCmd() { 160 if !ensureCmd() {
181 return false 161 return false
182 } 162 }
183 » » » » if c.targ != nil && bytes.Compare(i.Key, c.targ) < 0 { 163 » » » » if bytes.Compare(i.Key, c.targ) < 0 {
184 // we need to start a new ascention func tion 164 // we need to start a new ascention func tion
dnj (Google) 2015/08/23 06:50:07 ascension Is this actually a condition that can h
iannucci 2015/08/23 18:19:43 ? This is when we have an iterator over 1 2 3 4 5
185 » » » » » needCallback = false 165 » » » » » terminalCallback = false
166 » » » » » return false
167 » » » » }
168 » » » » if !bytes.HasPrefix(i.Key, def.prefix) {
169 » » » » » // we're no longer in prefix, terminate
186 return false 170 return false
187 } 171 }
188 if end != nil && bytes.Compare(i.Key, end) >= 0 { 172 if end != nil && bytes.Compare(i.Key, end) >= 0 {
189 » » » » » // we hit our cap, return nil in callbac k outside loop. 173 » » » » » // we hit our cap, terminate.
190 return false 174 return false
191 } 175 }
192 c.cb(i) 176 c.cb(i)
193 c = nil 177 c = nil
194 return true 178 return true
195 }) 179 })
196 » » » if c != nil && needCallback { 180 » » » if terminalCallback && ensureCmd() {
197 c.cb(nil) 181 c.cb(nil)
198 c = nil 182 c = nil
199 } 183 }
200 } 184 }
201 }() 185 }()
202 186
203 return ret 187 return ret
204 } 188 }
205 189
206 func (t *iterator) stop() { 190 func (t *iterator) stop() {
(...skipping 13 matching lines...) Expand all
220 t.ch <- &cmd{targ, func(i *gkvlite.Item) { 204 t.ch <- &cmd{targ, func(i *gkvlite.Item) {
221 defer close(waiter) 205 defer close(waiter)
222 206
223 cb(i) 207 cb(i)
224 if i == nil { 208 if i == nil {
225 t.stop() 209 t.stop()
226 } 210 }
227 }} 211 }}
228 <-waiter 212 <-waiter
229 } 213 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698