Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package memory | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "fmt" | |
| 10 | |
| 11 "github.com/luci/gkvlite" | |
| 12 ) | |
| 13 | |
| 14 // TODO(riannucci): Add a multi-iterator which allows: | |
| 15 // multiple collections | |
| 16 // sort smallest collection -> largest collection | |
| 17 // constant prefixes for each collection | |
| 18 // start + end partial suffixes | |
| 19 // * these are appended to the prefix for the collection to narrow the scan | |
| 20 // range. The collection scans will start at >= prefix+start suffix, and | |
| 21 // will scan until > prefix+end suffix (excluding this larger row). | |
| 22 // produces hits (a suffix+value) when all collections contain the same prefix | |
| 23 // and the same suffix | |
| 24 // * a hit value will have | |
| 25 // - a []byte suffix which is the matching suffix of every collection. | |
| 26 // The caller already knows the prefix for each collection. The caller | |
| 27 // may need to parse the suffix to separate the sort order(s) from the | |
| 28 // Key. | |
| 29 // dedup | |
| 30 // on each hit, the row values are concatenated and compared to the | |
| 31 // concatenated prefixes+startsuffix. If it's <=, it's skipped. | |
| 32 | |
| 33 type cmd struct { | |
| 34 targ []byte | |
| 35 cb func(*gkvlite.Item) | |
| 36 } | |
| 37 | |
| 38 type iterator struct { | |
| 39 stopped bool | |
| 40 prev []byte | |
| 41 ch chan<- *cmd | |
| 42 } | |
| 43 | |
| 44 func newIterable(coll *memCollection, end []byte) *iterator { | |
| 45 cmdChan := make(chan *cmd) | |
| 46 ret := &iterator{ | |
| 47 ch: cmdChan, | |
| 48 } | |
| 49 | |
| 50 go func() { | |
| 51 defer ret.stop() | |
| 52 c := (*cmd)(nil) | |
| 53 ensureCmd := func() bool { | |
| 54 if c == nil { | |
| 55 c = <-cmdChan | |
| 56 if c == nil { // stop() | |
| 57 return false | |
| 58 } | |
| 59 } | |
| 60 return true | |
| 61 } | |
| 62 for { | |
| 63 if !ensureCmd() { | |
| 64 return | |
| 65 } | |
| 66 previous := c.targ | |
| 67 needCallback := true | |
| 68 coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item ) bool { | |
| 69 if !ensureCmd() { | |
| 70 return false | |
| 71 } | |
| 72 if !bytes.Equal(previous, c.targ) { | |
| 73 // we need to start a new ascention func tion | |
| 74 needCallback = false | |
| 75 return false | |
| 76 } | |
| 77 if end != nil && bytes.Compare(i.Key, end) >= 0 { | |
| 78 // we hit our cap | |
| 79 ret.stop() | |
| 80 return false | |
| 81 } | |
| 82 c.cb(i) | |
| 83 previous = i.Key | |
| 84 c = nil | |
| 85 return true | |
| 86 }) | |
| 87 if c != nil && needCallback { | |
| 88 c.cb(nil) | |
| 89 c = nil | |
| 90 } | |
| 91 } | |
| 92 }() | |
| 93 | |
| 94 return ret | |
| 95 } | |
| 96 | |
| 97 func (t *iterator) stop() { | |
| 98 t.stopped = true | |
| 99 defer func() { recover() }() | |
|
dnj (Google)
2015/08/15 02:00:31
If your goal is to close the channel once, wrap it
iannucci
2015/08/15 02:10:10
Done.
| |
| 100 close(t.ch) | |
| 101 } | |
| 102 | |
| 103 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { | |
| 104 if t.stopped { | |
| 105 cb(nil) | |
| 106 return | |
| 107 } | |
| 108 | |
| 109 if targ == nil { | |
| 110 targ = t.prev | |
| 111 if targ == nil { | |
| 112 targ = []byte{} | |
| 113 } | |
| 114 } else if bytes.Compare(t.prev, targ) >= 0 { | |
|
dnj (Google)
2015/08/15 02:00:31
Is this a necessary check? Could be expensive doin
iannucci
2015/08/15 02:10:10
it's only when you skip
| |
| 115 panic(fmt.Errorf( | |
| 116 "iterator was instructed to go backwards!? %q -> %q", | |
| 117 string(t.prev), string(targ))) | |
| 118 } | |
| 119 | |
| 120 waiter := make(chan struct{}) | |
|
dnj (Google)
2015/08/15 02:10:39
Idea:
itemC := make(chan *gkvlite.Item)
t.ch <- &
| |
| 121 t.ch <- &cmd{targ, func(i *gkvlite.Item) { | |
| 122 cb(i) | |
| 123 if i == nil { | |
| 124 t.stop() | |
| 125 } else { | |
| 126 t.prev = i.Key | |
| 127 } | |
| 128 close(waiter) | |
|
dnj (Google)
2015/08/15 02:00:31
Defer this close, dawg.
iannucci
2015/08/15 02:10:10
Done.
| |
| 129 }} | |
| 130 <-waiter | |
| 131 } | |
| OLD | NEW |