Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(426)

Side by Side Diff: filter/txnBuf/query_merger.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: rebase Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sort"
10 "sync"
11
12 "github.com/luci/gae/impl/memory"
13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/serialize"
15 "github.com/luci/luci-go/common/stringset"
16 )
17
18 // queryToIter takes a FinalizedQuery and returns an iterator function which
19 // will produce either *items or errors.
20 //
21 // - d is the raw datastore to run this query on
22 // - dedup is a function which will return true if the given key has already
23 // been used in the merged result set. If nil, then this check is skipped.
24 // - filter is a function which will return true if the given key appears
25 // in the in-memory result set. This means that all entries for this key
26 // will show up from the in-memory query and don't need to be merged. This
27 // also prevents deleted entities from showing up in a query.
28 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e, dedup, filter func(string) bool) func() (*item, error) {
29 c := make(chan *item)
30
31 go func() {
32 defer close(c)
33
34 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) {
35 i := &item{key: k, data: pm}
36 encKey := i.getEncKey()
37 if filter != nil && filter(encKey) {
38 return true
39 }
40 if dedup != nil && dedup(encKey) {
Vadim Sh. 2015/09/30 01:11:28 why have both dedup & filter if they are used iden
iannucci 2015/09/30 02:00:10 Hm... ok.
41 return true
42 }
43
44 select {
45 case c <- i:
46 return true
47 case <-stopChan:
48 return false
49 }
50 })
51 if err != nil {
52 c <- &item{err: err}
53 }
54 }()
55
56 return func() (*item, error) {
57 for {
58 itm := <-c
59 if itm == nil {
60 return nil, nil
61 }
62 if itm.err != nil {
Vadim Sh. 2015/09/30 01:11:28 not sure it matters, but iteration function return
iannucci 2015/09/30 02:00:10 doesn't matter, we bail on the first error.
63 return nil, itm.err
64 }
65 return itm, nil
66 }
67 }
68 }
69
70 // adjustQuery applies various mutations to the query to make it suitable for
71 // merging. In general, this removes limits and offsets the 'distinct' modifier,
72 // and it ensures that if there are sort orders which won't appear in the
73 // result data that the query is transformed into a projection query which
74 // contains all of the data. A non-projection query will never be transformed
75 // in this way.
76 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
77 q := fq.Original()
78
79 // The limit and offset must be done in-memory because otherwise we may
80 // request too few entities from the underlying store if many matching
81 // entities have been deleted in the buffered transaction.
82 q = q.Limit(-1)
83 q = q.Offset(-1)
84
85 // distinction must be done in-memory, because otherwise there's no way
86 // to merge in the effect of the in-flight changes (because there's no w ay
87 // to push back to the datastore "yeah, I know you told me that the (1, 2)
88 // result came from `/Bob,1`, but would you mind pretending that it didn 't
89 // and tell me next the one instead?
90 if fq.Distinct() {
91 q = q.Distinct(false)
92 }
93
94 // since we need to merge results, we must have all order-related fields
95 // in each result. The only time we wouldn't have all the data available would
96 // be for a keys-only or projection query. To fix this, we convert all
97 // Projection and KeysOnly queries to project on /all/ Orders.
98 //
99 // FinalizedQuery already guarantees that all projected fields show up i n
100 // the Orders, but the projected fields could be a subset of the orders.
101 //
102 // Additionally on a keys-only query, any orders other than __key__ requ ire
103 // conversion of this query to a projection query including those orders in
104 // order to merge the results correctly.
105 //
106 // In both cases, the resulting objects returned to the higher layers of
107 // the stack will only include the information requested by the user; ke ysonly
108 // queries will discard all PropertyMap data, and projection queries wil l
109 // discard any field data that the user didn't ask for.
110 orders := fq.Orders()
111 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
112 q = q.KeysOnly(false)
113
114 for _, o := range orders {
115 if o.Property == "__key__" {
116 continue
117 }
118 q = q.Project(o.Property)
119 }
120 }
121
122 return q.Finalize()
123 }
124
125 // runMergedQueries executes a user query `fq` against the parent datastore as
126 // well as the in-memory datastore, calling `cb` with the merged result set.
127 //
128 // It's expected that the caller of this function will apply limit and offset
129 // if the query contains those restrictions. This may convert the query to
130 // an expanded projection query with more data than the user asked for. It's the
131 // caller's responsibility to prune away the extra data.
132 //
133 // See also `dsTxnBuf.Run()`.
134 func runMergedQueries(fq *ds.FinalizedQuery, state *txnBufState, cb func(k *ds.K ey, data ds.PropertyMap) bool) error {
135 toRun, err := adjustQuery(fq)
136 if err != nil {
137 return err
138 }
139
140 stopChan := make(chan struct{})
141 defer close(stopChan)
142
143 cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
144 cmpOrder := fq.Orders()
145 cmpFn := func(i *item) string {
146 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
147 }
148
149 dedup := stringset.Set(nil)
150 distinct := stringset.Set(nil)
151 distinctOrder := []ds.IndexColumn(nil)
152 if len(fq.Project()) > 0 { // the original query was a projection query
153 if fq.Distinct() {
154 // it was a distinct projection query, so we need to ded up by distinct
155 // options.
156 distinct = stringset.New(0)
157 proj := fq.Project()
158 distinctOrder = make([]ds.IndexColumn, len(proj))
159 for i, p := range proj {
160 distinctOrder[i].Property = p
161 }
162 }
163 } else {
164 // the original was a normal or keysonly query, so we need to de dup by keys
165 dedup = stringset.New(0)
166 }
167
168 // need lock around dedup since it's not threadsafe and we read/write it in
169 // different goroutines. No lock is needed around state.entState.has bec ause
170 // the whole transaction is locked during this query and so the entState is
171 // effectively read-only.
172 dedupLock := sync.Mutex{}
173 dedupFn := (func(string) bool)(nil)
174 if dedup != nil {
175 dedupFn = func(val string) bool {
176 dedupLock.Lock()
177 defer dedupLock.Unlock()
178 return dedup.Has(val)
179 }
180 }
181
182 parItemGet := queryToIter(stopChan, toRun, state.parentDS, dedupFn, stat e.entState.has)
183 memItemGet := queryToIter(stopChan, toRun, state.memDS, dedupFn, nil)
184
185 pitm, err := parItemGet()
186 if err != nil {
187 return err
188 }
189
190 mitm, err := memItemGet()
191 if err != nil {
192 return err
193 }
194
195 for {
196 usePitm := pitm != nil
197 if pitm != nil && mitm != nil {
198 usePitm = cmpFn(pitm) < cmpFn(mitm)
199 } else if pitm == nil && mitm == nil {
200 break
201 }
202
203 toUse := (*item)(nil)
204 if usePitm {
205 toUse = pitm
206 if pitm, err = parItemGet(); err != nil {
207 return err
208 }
209 } else {
210 toUse = mitm
211 if mitm, err = memItemGet(); err != nil {
212 return err
213 }
214 }
215
216 if dedup != nil {
217 encKey := toUse.getEncKey()
218 dedupLock.Lock()
219 added := dedup.Add(encKey)
220 dedupLock.Unlock()
221 if !added {
222 continue
223 }
224 }
225 if distinct != nil {
226 // NOTE: We know that toUse will not be used after this point for
227 // comparison purposes, so re-use its cmpRow property fo r our distinct
228 // filter here.
229 toUse.cmpRow = ""
230 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) {
231 continue
232 }
233 }
234 if !cb(toUse.key, toUse.data) {
235 break
236 }
237 }
238
239 return nil
240 }
241
242 // toComparableString computes the byte-sortable 'order' string for the given
243 // key/PropertyMap.
244 //
245 // * start/end are byte sequences which are the inequality bounds of the
246 // query, if any. These are a serialized datastore.Property. If the
247 // inequality column is inverted, then start and end are also inverted and
248 // swapped with each other.
249 // * order is the list of sort orders in the actual executing queries.
250 // * k / pm are the data to derive a sortable string for.
251 //
252 // The result of this function is the series of serialized properties, one per
253 // order column, which represent this key/pm's first entry in the composite
254 // index that would point to it (e.g. the one with `order` sort orders).
255 func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
256 doCmp := true
257 soFar := []byte{}
258 ps := serialize.PropertyMapPartially(k, nil)
259 for _, ord := range order {
260 row, ok := ps[ord.Property]
261 if !ok {
262 if vals, ok := pm[ord.Property]; ok {
263 row = serialize.PropertySlice(vals)
264 }
265 }
266 sort.Sort(row)
267 foundOne := false
268 for _, serialized := range row {
269 if ord.Descending {
270 serialized = serialize.Invert(serialized)
271 }
272 if doCmp {
273 maybe := serialize.Join(soFar, serialized)
274 cmp := bytes.Compare(maybe, start)
275 if cmp >= 0 {
276 foundOne = true
277 soFar = maybe
278 doCmp = len(soFar) < len(start)
279 break
280 }
281 } else {
282 foundOne = true
283 soFar = serialize.Join(soFar, serialized)
284 break
285 }
286 }
287 if !foundOne {
288 return nil, nil
289 }
290 }
291 if end != nil && bytes.Compare(soFar, end) >= 0 {
292 return nil, nil
293 }
294 return soFar, ps["__key__"][0]
295 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698