Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(650)

Side by Side Diff: filter/txnBuf/query_merger.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: add err for too many roots Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sort"
10 "sync"
11
12 "github.com/luci/gae/impl/memory"
13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/serialize"
15 "github.com/luci/luci-go/common/stringset"
16 )
17
18 // queryToIter takes a FinalizedQuery and returns an iterator function which
19 // will produce either *items or errors.
20 //
21 // - d is the raw datastore to run this query on
22 // - filter is a function which will return true if the given key should be
23 // excluded from the result set.
24 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e, filter func(string) bool) func() (*item, error) {
25 c := make(chan *item)
26
27 go func() {
28 defer close(c)
29
30 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) {
31 i := &item{key: k, data: pm}
32 encKey := i.getEncKey()
33 if filter != nil && filter(encKey) {
34 return true
35 }
36
37 select {
38 case c <- i:
39 return true
40 case <-stopChan:
41 return false
42 }
43 })
44 if err != nil {
45 c <- &item{err: err}
46 }
47 }()
48
49 return func() (*item, error) {
50 for {
dnj 2015/09/30 16:35:26 No need for "for" loop; you always return.
iannucci 2015/09/30 17:10:28 wat. oh, I think i used to have the filter check i
51 itm := <-c
52 if itm == nil {
53 return nil, nil
54 }
55 if itm.err != nil {
56 return nil, itm.err
57 }
58 return itm, nil
59 }
60 }
61 }
62
63 // adjustQuery applies various mutations to the query to make it suitable for
64 // merging. In general, this removes limits and offsets the 'distinct' modifier,
65 // and it ensures that if there are sort orders which won't appear in the
66 // result data that the query is transformed into a projection query which
67 // contains all of the data. A non-projection query will never be transformed
68 // in this way.
69 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
70 q := fq.Original()
71
72 // The limit and offset must be done in-memory because otherwise we may
73 // request too few entities from the underlying store if many matching
74 // entities have been deleted in the buffered transaction.
75 q = q.Limit(-1)
76 q = q.Offset(-1)
77
78 // distinction must be done in-memory, because otherwise there's no way
79 // to merge in the effect of the in-flight changes (because there's no w ay
80 // to push back to the datastore "yeah, I know you told me that the (1, 2)
81 // result came from `/Bob,1`, but would you mind pretending that it didn 't
82 // and tell me next the one instead?
83 if fq.Distinct() {
dnj 2015/09/30 16:35:26 nit: ditch the "if", just say q = q.Distinct(false
iannucci 2015/09/30 17:10:28 done
84 q = q.Distinct(false)
85 }
86
87 // since we need to merge results, we must have all order-related fields
88 // in each result. The only time we wouldn't have all the data available would
89 // be for a keys-only or projection query. To fix this, we convert all
90 // Projection and KeysOnly queries to project on /all/ Orders.
91 //
92 // FinalizedQuery already guarantees that all projected fields show up i n
93 // the Orders, but the projected fields could be a subset of the orders.
94 //
95 // Additionally on a keys-only query, any orders other than __key__ requ ire
96 // conversion of this query to a projection query including those orders in
97 // order to merge the results correctly.
98 //
99 // In both cases, the resulting objects returned to the higher layers of
100 // the stack will only include the information requested by the user; ke ysonly
dnj 2015/09/30 16:35:26 nit: keys-only
iannucci 2015/09/30 17:10:27 done
101 // queries will discard all PropertyMap data, and projection queries wil l
102 // discard any field data that the user didn't ask for.
103 orders := fq.Orders()
104 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
105 q = q.KeysOnly(false)
106
107 for _, o := range orders {
108 if o.Property == "__key__" {
109 continue
110 }
111 q = q.Project(o.Property)
112 }
113 }
114
115 return q.Finalize()
dnj 2015/09/30 16:35:26 All hail editable queries!
iannucci 2015/09/30 17:10:28 indeed :)
116 }
117
118 // runMergedQueries executes a user query `fq` against the parent datastore as
119 // well as the in-memory datastore, calling `cb` with the merged result set.
120 //
121 // It's expected that the caller of this function will apply limit and offset
122 // if the query contains those restrictions. This may convert the query to
123 // an expanded projection query with more data than the user asked for. It's the
124 // caller's responsibility to prune away the extra data.
125 //
126 // See also `dsTxnBuf.Run()`.
127 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error {
dnj 2015/09/30 16:35:26 nit: This function signature is huge. Can you line
iannucci 2015/09/30 17:10:28 done
128 toRun, err := adjustQuery(fq)
129 if err != nil {
130 return err
131 }
132
133 cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
134 cmpOrder := fq.Orders()
135 cmpFn := func(i *item) string {
136 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
137 }
138
139 dedup := stringset.Set(nil)
140 distinct := stringset.Set(nil)
141 distinctOrder := []ds.IndexColumn(nil)
142 if len(fq.Project()) > 0 { // the original query was a projection query
143 if fq.Distinct() {
144 // it was a distinct projection query, so we need to ded up by distinct
145 // options.
146 distinct = stringset.New(0)
147 proj := fq.Project()
148 distinctOrder = make([]ds.IndexColumn, len(proj))
149 for i, p := range proj {
150 distinctOrder[i].Property = p
151 }
152 }
153 } else {
154 // the original was a normal or keysonly query, so we need to de dup by keys
dnj 2015/09/30 16:35:26 nit: keys-only. Also period at the end of comment.
iannucci 2015/09/30 17:10:28 done
155 dedup = stringset.New(0)
156 }
157
158 // need lock around dedup since it's not threadsafe and we read/write it in
159 // different goroutines. No lock is needed around state.entState.has bec ause
160 // the whole transaction is locked during this query and so the entState is
161 // effectively read-only.
162 dedupLock := sync.Mutex{}
163 dedupFn := (func(string) bool)(nil)
164 if dedup != nil {
165 dedupFn = func(val string) bool {
166 dedupLock.Lock()
167 defer dedupLock.Unlock()
168 return dedup.Has(val)
169 }
170 }
171
172 stopChan := make(chan struct{})
173
174 parItemGet := queryToIter(stopChan, toRun, parentDS, func(key string) bo ol {
175 return sizes.has(key) || (dedupFn != nil && dedupFn(key))
176 })
177 memItemGet := queryToIter(stopChan, toRun, memDS, dedupFn)
178
179 defer func() {
180 close(stopChan)
181 parItemGet()
182 memItemGet()
183 }()
184
185 pitm, err := parItemGet()
186 if err != nil {
187 return err
188 }
189
190 mitm, err := memItemGet()
191 if err != nil {
192 return err
193 }
194
195 for {
196 usePitm := pitm != nil
197 if pitm != nil && mitm != nil {
198 usePitm = cmpFn(pitm) < cmpFn(mitm)
199 } else if pitm == nil && mitm == nil {
200 break
201 }
202
203 toUse := (*item)(nil)
204 if usePitm {
205 toUse = pitm
206 if pitm, err = parItemGet(); err != nil {
207 return err
dnj 2015/09/30 16:35:26 Should punt toUse before failing from error gettin
iannucci 2015/09/30 17:10:28 ah, yeah, ok.
208 }
209 } else {
210 toUse = mitm
211 if mitm, err = memItemGet(); err != nil {
212 return err
213 }
214 }
215
216 if dedup != nil {
217 encKey := toUse.getEncKey()
218 dedupLock.Lock()
219 added := dedup.Add(encKey)
220 dedupLock.Unlock()
221 if !added {
222 continue
223 }
224 }
225 if distinct != nil {
226 // NOTE: We know that toUse will not be used after this point for
227 // comparison purposes, so re-use its cmpRow property fo r our distinct
228 // filter here.
229 toUse.cmpRow = ""
230 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) {
231 continue
232 }
233 }
234 if !cb(toUse.key, toUse.data) {
235 break
236 }
237 }
238
239 return nil
240 }
241
242 // toComparableString computes the byte-sortable 'order' string for the given
243 // key/PropertyMap.
244 //
245 // * start/end are byte sequences which are the inequality bounds of the
246 // query, if any. These are a serialized datastore.Property. If the
247 // inequality column is inverted, then start and end are also inverted and
248 // swapped with each other.
249 // * order is the list of sort orders in the actual executing queries.
250 // * k / pm are the data to derive a sortable string for.
251 //
252 // The result of this function is the series of serialized properties, one per
253 // order column, which represent this key/pm's first entry in the composite
254 // index that would point to it (e.g. the one with `order` sort orders).
255 func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
256 doCmp := true
257 soFar := []byte{}
258 ps := serialize.PropertyMapPartially(k, nil)
259 for _, ord := range order {
260 row, ok := ps[ord.Property]
261 if !ok {
262 if vals, ok := pm[ord.Property]; ok {
263 row = serialize.PropertySlice(vals)
264 }
265 }
266 sort.Sort(row)
267 foundOne := false
268 for _, serialized := range row {
269 if ord.Descending {
270 serialized = serialize.Invert(serialized)
271 }
272 if doCmp {
273 maybe := serialize.Join(soFar, serialized)
274 cmp := bytes.Compare(maybe, start)
275 if cmp >= 0 {
276 foundOne = true
277 soFar = maybe
278 doCmp = len(soFar) < len(start)
279 break
280 }
281 } else {
282 foundOne = true
283 soFar = serialize.Join(soFar, serialized)
284 break
285 }
286 }
287 if !foundOne {
288 return nil, nil
289 }
290 }
291 if end != nil && bytes.Compare(soFar, end) >= 0 {
292 return nil, nil
293 }
294 return soFar, ps["__key__"][0]
295 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698