Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "sync" | 9 "sync" |
| 10 | 10 |
| 11 "github.com/luci/gkvlite" | 11 "github.com/luci/gkvlite" |
| 12 ) | 12 ) |
| 13 | 13 |
| 14 // TODO(riannucci): Add a multi-iterator which allows: | 14 type iterDefinition struct { |
|
dnj (Google)
2015/08/23 06:50:07
Nice combination.
| |
| 15 // multiple collections | 15 » // The collection to iterate over |
| 16 // iterate smallest collection -> largest collection | 16 » c *memCollection |
| 17 // retain input order | |
| 18 // constant prefixes for each collection | |
| 19 // start + end partial suffixes | |
| 20 // * these are appended to the prefix for the collection to narrow the scan | |
| 21 // range. The collection scans will start at >= prefix+start suffix, and | |
| 22 // will scan until < prefix+end suffix. | |
| 23 // produces hits (a suffix+value) when all collections contain the same prefix | |
| 24 // and the same suffix | |
| 25 // * a hit value will have | |
| 26 // - a []byte suffix which is the matching suffix of every collection. | |
| 27 // The caller already knows the prefix for each collection. The caller | |
| 28 // may need to parse the suffix to separate the sort order(s) from the | |
| 29 // Key. | |
| 30 | 17 |
| 31 // TODO(riannucci): dedup and distinct will be postfilters | 18 » // The prefix to always assert for every row. A nil prefix matches every row. |
| 32 // dedup could be done by simply analyzing the retrieved entity and then seeing | 19 » prefix []byte |
| 33 // if it could come before the query parameters, but this will break in | |
| 34 // eventually-consistent scenarios. Say that you changed an entity from having | |
| 35 // a multiproperty with 100 values to only having 1. Then it shows up in the | |
| 36 // index in 100 places, but on retrieval will only have 1 entry (so "it could | |
| 37 // not possibly be a dup"). Then you'll serve the entity 100 times :( | |
| 38 // | |
| 39 // Similarly, it won't allow you to dedup on 'distinct' queries, since you'd | |
| 40 // be forced to retrieve and decode the entity, defeating the purpose. | |
| 41 | 20 |
| 42 func bjoin(itms ...[]byte) []byte { | 21 » // prefixLen is the number of prefix bytes that the caller cares about. It |
| 43 » total := 0 | 22 » // may be <= len(prefix). When doing a multiIterator, this number will b e used |
| 44 » for _, i := range itms { | 23 » // to determine the amount of suffix to transfer accross iterators. This is |
| 45 » » total += len(i) | 24 » // used specifically when using builtin indexes to service ancestor quer ies. |
| 46 » } | 25 » // The builtin index represents the ancestor key with prefix bytes, but in a |
| 47 » ret := make([]byte, 0, total) | 26 » // multiIterator context, it wants the entire key to be included in the |
| 48 » for _, i := range itms { | 27 » // suffix. |
| 49 » » ret = append(ret, i...) | 28 » prefixLen int |
| 50 » } | |
| 51 » return ret | |
| 52 } | |
| 53 | 29 |
| 54 type iterDefinition struct { | 30 » // The start cursor. It's appended to prefix to find the first row. |
| 55 » c *memCollection | 31 » start []byte |
| 56 » prefix []byte | 32 |
| 57 » start []byte | 33 » // The end cursor. It's appended to prefix to find the last row (which i s not |
| 58 » end []byte | 34 » // included in the interation result). If this is nil, then there's no e nd |
| 35 » // except the natural end of the collection. | |
| 36 » end []byte | |
| 59 } | 37 } |
| 60 | 38 |
| 61 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { | 39 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { |
| 40 if len(defs) == 0 { | |
| 41 return | |
| 42 } | |
| 43 | |
| 62 ts := make([]*iterator, len(defs)) | 44 ts := make([]*iterator, len(defs)) |
| 63 prefixLens := make([]int, len(defs)) | 45 prefixLens := make([]int, len(defs)) |
| 64 for i, def := range defs { | 46 for i, def := range defs { |
| 65 » » if def.prefix == nil { | 47 » » ts[i] = newIterator(def) |
| 66 » » » panic("got nil prefix") | 48 » » prefixLens[i] = def.prefixLen |
| 67 » » } | |
| 68 | |
| 69 » » end := []byte(nil) | |
| 70 » » if def.end != nil { | |
| 71 » » » end = bjoin(def.prefix, def.end) | |
| 72 » » } else { | |
| 73 » » » end = []byte(increment(string(def.prefix), true)) | |
| 74 » » } | |
| 75 | |
| 76 » » start := def.prefix | |
| 77 » » if def.start != nil { | |
| 78 » » » start = bjoin(def.prefix, def.start) | |
| 79 » » } | |
| 80 | |
| 81 » » ts[i] = newIterator(def.c, start, end) | |
| 82 » » prefixLens[i] = len(def.prefix) | |
| 83 defer ts[i].stop() | 49 defer ts[i].stop() |
|
dnj (Google)
2015/08/23 06:50:07
This "i" is bound do the last value held by the it
iannucci
2015/08/23 18:19:43
good catch! Done.
| |
| 84 } | 50 } |
| 85 | 51 |
| 86 suffix := []byte(nil) | 52 suffix := []byte(nil) |
| 87 skip := -1 | 53 skip := -1 |
| 88 | 54 |
| 89 for { | 55 for { |
| 90 stop := false | 56 stop := false |
| 91 restart := false | 57 restart := false |
| 92 | 58 |
| 93 for idx, it := range ts { | 59 for idx, it := range ts { |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 141 cb func(*gkvlite.Item) | 107 cb func(*gkvlite.Item) |
| 142 } | 108 } |
| 143 | 109 |
| 144 type iterator struct { | 110 type iterator struct { |
| 145 stopper sync.Once | 111 stopper sync.Once |
| 146 | 112 |
| 147 stopped bool | 113 stopped bool |
| 148 ch chan<- *cmd | 114 ch chan<- *cmd |
| 149 } | 115 } |
| 150 | 116 |
| 151 func newIterator(coll *memCollection, start, end []byte) *iterator { | 117 func newIterator(def *iterDefinition) *iterator { |
| 152 cmdChan := make(chan *cmd) | 118 cmdChan := make(chan *cmd) |
| 153 ret := &iterator{ | 119 ret := &iterator{ |
| 154 ch: cmdChan, | 120 ch: cmdChan, |
| 155 } | 121 } |
| 156 | 122 |
| 123 // convert the suffixes from the iterDefinition into full rows for the | |
| 124 // underlying storage. | |
| 125 start := make([]byte, len(def.prefix)+len(def.start)) | |
| 126 copy(start, def.prefix) | |
|
dnj (Google)
2015/08/23 06:50:07
Might as well use "bjoin" here.
iannucci
2015/08/23 18:19:43
Oh ya!
| |
| 127 copy(start[len(def.prefix):], def.start) | |
| 128 | |
| 129 end := []byte(nil) | |
| 130 if def.end != nil { | |
| 131 end = make([]byte, len(def.prefix)+len(def.end)) | |
|
dnj (Google)
2015/08/23 06:50:07
Might as well use "bjoin" here.
iannucci
2015/08/23 18:19:43
Done.
| |
| 132 copy(end, def.prefix) | |
| 133 copy(end[len(def.prefix):], def.end) | |
| 134 } | |
| 135 | |
| 157 go func() { | 136 go func() { |
| 158 c := (*cmd)(nil) | 137 c := (*cmd)(nil) |
| 159 ensureCmd := func() bool { | 138 ensureCmd := func() bool { |
| 160 if c == nil { | 139 if c == nil { |
| 161 c = <-cmdChan | 140 c = <-cmdChan |
| 162 if c == nil { // stop() | 141 if c == nil { // stop() |
| 163 return false | 142 return false |
| 164 } | 143 } |
| 165 } | 144 } |
| 166 return true | 145 return true |
| 167 } | 146 } |
| 168 if ensureCmd() { | 147 if ensureCmd() { |
| 169 if bytes.Compare(c.targ, start) < 0 { | 148 if bytes.Compare(c.targ, start) < 0 { |
| 170 c.targ = start | 149 c.targ = start |
| 171 } | 150 } |
| 172 } | 151 } |
| 173 | 152 |
| 174 for { | 153 for { |
| 154 defer ret.stop() | |
|
dnj (Google)
2015/08/23 06:50:07
You should do this one before the loop.
iannucci
2015/08/23 18:19:43
Done.
| |
| 175 if !ensureCmd() { | 155 if !ensureCmd() { |
| 176 return | 156 return |
| 177 } | 157 } |
| 178 » » » needCallback := true | 158 » » » terminalCallback := true |
| 179 » » » coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item ) bool { | 159 » » » def.c.VisitItemsAscend(c.targ, true, func(i *gkvlite.Ite m) bool { |
| 180 if !ensureCmd() { | 160 if !ensureCmd() { |
| 181 return false | 161 return false |
| 182 } | 162 } |
| 183 » » » » if c.targ != nil && bytes.Compare(i.Key, c.targ) < 0 { | 163 » » » » if bytes.Compare(i.Key, c.targ) < 0 { |
| 184 // we need to start a new ascention func tion | 164 // we need to start a new ascention func tion |
|
dnj (Google)
2015/08/23 06:50:07
ascension
Is this actually a condition that can h
iannucci
2015/08/23 18:19:43
? This is when we have an iterator over
1
2
3
4
5
| |
| 185 » » » » » needCallback = false | 165 » » » » » terminalCallback = false |
| 166 » » » » » return false | |
| 167 » » » » } | |
| 168 » » » » if !bytes.HasPrefix(i.Key, def.prefix) { | |
| 169 » » » » » // we're no longer in prefix, terminate | |
| 186 return false | 170 return false |
| 187 } | 171 } |
| 188 if end != nil && bytes.Compare(i.Key, end) >= 0 { | 172 if end != nil && bytes.Compare(i.Key, end) >= 0 { |
| 189 » » » » » // we hit our cap, return nil in callbac k outside loop. | 173 » » » » » // we hit our cap, terminate. |
| 190 return false | 174 return false |
| 191 } | 175 } |
| 192 c.cb(i) | 176 c.cb(i) |
| 193 c = nil | 177 c = nil |
| 194 return true | 178 return true |
| 195 }) | 179 }) |
| 196 » » » if c != nil && needCallback { | 180 » » » if terminalCallback && ensureCmd() { |
| 197 c.cb(nil) | 181 c.cb(nil) |
| 198 c = nil | 182 c = nil |
| 199 } | 183 } |
| 200 } | 184 } |
| 201 }() | 185 }() |
| 202 | 186 |
| 203 return ret | 187 return ret |
| 204 } | 188 } |
| 205 | 189 |
| 206 func (t *iterator) stop() { | 190 func (t *iterator) stop() { |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 220 t.ch <- &cmd{targ, func(i *gkvlite.Item) { | 204 t.ch <- &cmd{targ, func(i *gkvlite.Item) { |
| 221 defer close(waiter) | 205 defer close(waiter) |
| 222 | 206 |
| 223 cb(i) | 207 cb(i) |
| 224 if i == nil { | 208 if i == nil { |
| 225 t.stop() | 209 t.stop() |
| 226 } | 210 } |
| 227 }} | 211 }} |
| 228 <-waiter | 212 <-waiter |
| 229 } | 213 } |
| OLD | NEW |