OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "sync" | 9 "sync" |
10 | 10 |
11 "github.com/luci/gkvlite" | 11 "github.com/luci/gkvlite" |
12 ) | 12 ) |
13 | 13 |
14 // TODO(riannucci): Add a multi-iterator which allows: | 14 type iterDefinition struct { |
dnj (Google)
2015/08/23 06:50:07
Nice combination.
| |
15 // multiple collections | 15 » // The collection to iterate over |
16 // iterate smallest collection -> largest collection | 16 » c *memCollection |
17 // retain input order | |
18 // constant prefixes for each collection | |
19 // start + end partial suffixes | |
20 // * these are appended to the prefix for the collection to narrow the scan | |
21 // range. The collection scans will start at >= prefix+start suffix, and | |
22 // will scan until < prefix+end suffix. | |
23 // produces hits (a suffix+value) when all collections contain the same prefix | |
24 // and the same suffix | |
25 // * a hit value will have | |
26 // - a []byte suffix which is the matching suffix of every collection. | |
27 // The caller already knows the prefix for each collection. The caller | |
28 // may need to parse the suffix to separate the sort order(s) from the | |
29 // Key. | |
30 | 17 |
31 // TODO(riannucci): dedup and distinct will be postfilters | 18 » // The prefix to always assert for every row. A nil prefix matches every row. |
32 // dedup could be done by simply analyzing the retrieved entity and then seeing | 19 » prefix []byte |
33 // if it could come before the query parameters, but this will break in | |
34 // eventually-consistent scenarios. Say that you changed an entity from having | |
35 // a multiproperty with 100 values to only having 1. Then it shows up in the | |
36 // index in 100 places, but on retrieval will only have 1 entry (so "it could | |
37 // not possibly be a dup"). Then you'll serve the entity 100 times :( | |
38 // | |
39 // Similarly, it won't allow you to dedup on 'distinct' queries, since you'd | |
40 // be forced to retrieve and decode the entity, defeating the purpose. | |
41 | 20 |
42 func bjoin(itms ...[]byte) []byte { | 21 » // prefixLen is the number of prefix bytes that the caller cares about. It |
43 » total := 0 | 22 » // may be <= len(prefix). When doing a multiIterator, this number will b e used |
44 » for _, i := range itms { | 23 » // to determine the amount of suffix to transfer accross iterators. This is |
45 » » total += len(i) | 24 » // used specifically when using builtin indexes to service ancestor quer ies. |
46 » } | 25 » // The builtin index represents the ancestor key with prefix bytes, but in a |
47 » ret := make([]byte, 0, total) | 26 » // multiIterator context, it wants the entire key to be included in the |
48 » for _, i := range itms { | 27 » // suffix. |
49 » » ret = append(ret, i...) | 28 » prefixLen int |
50 » } | |
51 » return ret | |
52 } | |
53 | 29 |
54 type iterDefinition struct { | 30 » // The start cursor. It's appended to prefix to find the first row. |
55 » c *memCollection | 31 » start []byte |
56 » prefix []byte | 32 |
57 » start []byte | 33 » // The end cursor. It's appended to prefix to find the last row (which i s not |
58 » end []byte | 34 » // included in the interation result). If this is nil, then there's no e nd |
35 » // except the natural end of the collection. | |
36 » end []byte | |
59 } | 37 } |
60 | 38 |
61 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { | 39 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { |
40 if len(defs) == 0 { | |
41 return | |
42 } | |
43 | |
62 ts := make([]*iterator, len(defs)) | 44 ts := make([]*iterator, len(defs)) |
63 prefixLens := make([]int, len(defs)) | 45 prefixLens := make([]int, len(defs)) |
64 for i, def := range defs { | 46 for i, def := range defs { |
65 » » if def.prefix == nil { | 47 » » ts[i] = newIterator(def) |
66 » » » panic("got nil prefix") | 48 » » prefixLens[i] = def.prefixLen |
67 » » } | |
68 | |
69 » » end := []byte(nil) | |
70 » » if def.end != nil { | |
71 » » » end = bjoin(def.prefix, def.end) | |
72 » » } else { | |
73 » » » end = []byte(increment(string(def.prefix), true)) | |
74 » » } | |
75 | |
76 » » start := def.prefix | |
77 » » if def.start != nil { | |
78 » » » start = bjoin(def.prefix, def.start) | |
79 » » } | |
80 | |
81 » » ts[i] = newIterator(def.c, start, end) | |
82 » » prefixLens[i] = len(def.prefix) | |
83 defer ts[i].stop() | 49 defer ts[i].stop() |
dnj (Google)
2015/08/23 06:50:07
This "i" is bound do the last value held by the it
iannucci
2015/08/23 18:19:43
good catch! Done.
| |
84 } | 50 } |
85 | 51 |
86 suffix := []byte(nil) | 52 suffix := []byte(nil) |
87 skip := -1 | 53 skip := -1 |
88 | 54 |
89 for { | 55 for { |
90 stop := false | 56 stop := false |
91 restart := false | 57 restart := false |
92 | 58 |
93 for idx, it := range ts { | 59 for idx, it := range ts { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
141 cb func(*gkvlite.Item) | 107 cb func(*gkvlite.Item) |
142 } | 108 } |
143 | 109 |
144 type iterator struct { | 110 type iterator struct { |
145 stopper sync.Once | 111 stopper sync.Once |
146 | 112 |
147 stopped bool | 113 stopped bool |
148 ch chan<- *cmd | 114 ch chan<- *cmd |
149 } | 115 } |
150 | 116 |
151 func newIterator(coll *memCollection, start, end []byte) *iterator { | 117 func newIterator(def *iterDefinition) *iterator { |
152 cmdChan := make(chan *cmd) | 118 cmdChan := make(chan *cmd) |
153 ret := &iterator{ | 119 ret := &iterator{ |
154 ch: cmdChan, | 120 ch: cmdChan, |
155 } | 121 } |
156 | 122 |
123 // convert the suffixes from the iterDefinition into full rows for the | |
124 // underlying storage. | |
125 start := make([]byte, len(def.prefix)+len(def.start)) | |
126 copy(start, def.prefix) | |
dnj (Google)
2015/08/23 06:50:07
Might as well use "bjoin" here.
iannucci
2015/08/23 18:19:43
Oh ya!
| |
127 copy(start[len(def.prefix):], def.start) | |
128 | |
129 end := []byte(nil) | |
130 if def.end != nil { | |
131 end = make([]byte, len(def.prefix)+len(def.end)) | |
dnj (Google)
2015/08/23 06:50:07
Might as well use "bjoin" here.
iannucci
2015/08/23 18:19:43
Done.
| |
132 copy(end, def.prefix) | |
133 copy(end[len(def.prefix):], def.end) | |
134 } | |
135 | |
157 go func() { | 136 go func() { |
158 c := (*cmd)(nil) | 137 c := (*cmd)(nil) |
159 ensureCmd := func() bool { | 138 ensureCmd := func() bool { |
160 if c == nil { | 139 if c == nil { |
161 c = <-cmdChan | 140 c = <-cmdChan |
162 if c == nil { // stop() | 141 if c == nil { // stop() |
163 return false | 142 return false |
164 } | 143 } |
165 } | 144 } |
166 return true | 145 return true |
167 } | 146 } |
168 if ensureCmd() { | 147 if ensureCmd() { |
169 if bytes.Compare(c.targ, start) < 0 { | 148 if bytes.Compare(c.targ, start) < 0 { |
170 c.targ = start | 149 c.targ = start |
171 } | 150 } |
172 } | 151 } |
173 | 152 |
174 for { | 153 for { |
154 defer ret.stop() | |
dnj (Google)
2015/08/23 06:50:07
You should do this one before the loop.
iannucci
2015/08/23 18:19:43
Done.
| |
175 if !ensureCmd() { | 155 if !ensureCmd() { |
176 return | 156 return |
177 } | 157 } |
178 » » » needCallback := true | 158 » » » terminalCallback := true |
179 » » » coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item ) bool { | 159 » » » def.c.VisitItemsAscend(c.targ, true, func(i *gkvlite.Ite m) bool { |
180 if !ensureCmd() { | 160 if !ensureCmd() { |
181 return false | 161 return false |
182 } | 162 } |
183 » » » » if c.targ != nil && bytes.Compare(i.Key, c.targ) < 0 { | 163 » » » » if bytes.Compare(i.Key, c.targ) < 0 { |
184 // we need to start a new ascention func tion | 164 // we need to start a new ascention func tion |
dnj (Google)
2015/08/23 06:50:07
ascension
Is this actually a condition that can h
iannucci
2015/08/23 18:19:43
? This is when we have an iterator over
1
2
3
4
5
| |
185 » » » » » needCallback = false | 165 » » » » » terminalCallback = false |
166 » » » » » return false | |
167 » » » » } | |
168 » » » » if !bytes.HasPrefix(i.Key, def.prefix) { | |
169 » » » » » // we're no longer in prefix, terminate | |
186 return false | 170 return false |
187 } | 171 } |
188 if end != nil && bytes.Compare(i.Key, end) >= 0 { | 172 if end != nil && bytes.Compare(i.Key, end) >= 0 { |
189 » » » » » // we hit our cap, return nil in callbac k outside loop. | 173 » » » » » // we hit our cap, terminate. |
190 return false | 174 return false |
191 } | 175 } |
192 c.cb(i) | 176 c.cb(i) |
193 c = nil | 177 c = nil |
194 return true | 178 return true |
195 }) | 179 }) |
196 » » » if c != nil && needCallback { | 180 » » » if terminalCallback && ensureCmd() { |
197 c.cb(nil) | 181 c.cb(nil) |
198 c = nil | 182 c = nil |
199 } | 183 } |
200 } | 184 } |
201 }() | 185 }() |
202 | 186 |
203 return ret | 187 return ret |
204 } | 188 } |
205 | 189 |
206 func (t *iterator) stop() { | 190 func (t *iterator) stop() { |
(...skipping 13 matching lines...) Expand all Loading... | |
220 t.ch <- &cmd{targ, func(i *gkvlite.Item) { | 204 t.ch <- &cmd{targ, func(i *gkvlite.Item) { |
221 defer close(waiter) | 205 defer close(waiter) |
222 | 206 |
223 cb(i) | 207 cb(i) |
224 if i == nil { | 208 if i == nil { |
225 t.stop() | 209 t.stop() |
226 } | 210 } |
227 }} | 211 }} |
228 <-waiter | 212 <-waiter |
229 } | 213 } |
OLD | NEW |