OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package txnBuf | |
6 | |
7 import ( | |
8 "bytes" | |
9 "sort" | |
10 "sync" | |
11 | |
12 "github.com/luci/gae/impl/memory" | |
13 ds "github.com/luci/gae/service/datastore" | |
14 "github.com/luci/gae/service/datastore/serialize" | |
15 "github.com/luci/luci-go/common/stringset" | |
16 ) | |
17 | |
18 // queryToIter takes a FinalizedQuery and returns an iterator function which | |
19 // will produce either *items or errors. | |
20 // | |
21 // - d is the raw datastore to run this query on | |
22 // - filter is a function which will return true if the given key should be | |
23 // excluded from the result set. | |
24 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e, filter func(string) bool) func() (*item, error) { | |
25 c := make(chan *item) | |
26 | |
27 go func() { | |
28 defer close(c) | |
29 | |
30 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) { | |
31 i := &item{key: k, data: pm} | |
32 encKey := i.getEncKey() | |
33 if filter != nil && filter(encKey) { | |
34 return true | |
35 } | |
36 | |
37 select { | |
38 case c <- i: | |
39 return true | |
40 case <-stopChan: | |
41 return false | |
42 } | |
43 }) | |
44 if err != nil { | |
45 c <- &item{err: err} | |
46 } | |
47 }() | |
48 | |
49 return func() (*item, error) { | |
50 for { | |
dnj
2015/09/30 16:35:26
No need for "for" loop; you always return.
iannucci
2015/09/30 17:10:28
wat. oh, I think i used to have the filter check i
| |
51 itm := <-c | |
52 if itm == nil { | |
53 return nil, nil | |
54 } | |
55 if itm.err != nil { | |
56 return nil, itm.err | |
57 } | |
58 return itm, nil | |
59 } | |
60 } | |
61 } | |
62 | |
63 // adjustQuery applies various mutations to the query to make it suitable for | |
64 // merging. In general, this removes limits and offsets the 'distinct' modifier, | |
65 // and it ensures that if there are sort orders which won't appear in the | |
66 // result data that the query is transformed into a projection query which | |
67 // contains all of the data. A non-projection query will never be transformed | |
68 // in this way. | |
69 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { | |
70 q := fq.Original() | |
71 | |
72 // The limit and offset must be done in-memory because otherwise we may | |
73 // request too few entities from the underlying store if many matching | |
74 // entities have been deleted in the buffered transaction. | |
75 q = q.Limit(-1) | |
76 q = q.Offset(-1) | |
77 | |
78 // distinction must be done in-memory, because otherwise there's no way | |
79 // to merge in the effect of the in-flight changes (because there's no w ay | |
80 // to push back to the datastore "yeah, I know you told me that the (1, 2) | |
81 // result came from `/Bob,1`, but would you mind pretending that it didn 't | |
82 // and tell me next the one instead? | |
83 if fq.Distinct() { | |
dnj
2015/09/30 16:35:26
nit: ditch the "if", just say q = q.Distinct(false
iannucci
2015/09/30 17:10:28
done
| |
84 q = q.Distinct(false) | |
85 } | |
86 | |
87 // since we need to merge results, we must have all order-related fields | |
88 // in each result. The only time we wouldn't have all the data available would | |
89 // be for a keys-only or projection query. To fix this, we convert all | |
90 // Projection and KeysOnly queries to project on /all/ Orders. | |
91 // | |
92 // FinalizedQuery already guarantees that all projected fields show up i n | |
93 // the Orders, but the projected fields could be a subset of the orders. | |
94 // | |
95 // Additionally on a keys-only query, any orders other than __key__ requ ire | |
96 // conversion of this query to a projection query including those orders in | |
97 // order to merge the results correctly. | |
98 // | |
99 // In both cases, the resulting objects returned to the higher layers of | |
100 // the stack will only include the information requested by the user; ke ysonly | |
dnj
2015/09/30 16:35:26
nit: keys-only
iannucci
2015/09/30 17:10:27
done
| |
101 // queries will discard all PropertyMap data, and projection queries wil l | |
102 // discard any field data that the user didn't ask for. | |
103 orders := fq.Orders() | |
104 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) { | |
105 q = q.KeysOnly(false) | |
106 | |
107 for _, o := range orders { | |
108 if o.Property == "__key__" { | |
109 continue | |
110 } | |
111 q = q.Project(o.Property) | |
112 } | |
113 } | |
114 | |
115 return q.Finalize() | |
dnj
2015/09/30 16:35:26
All hail editable queries!
iannucci
2015/09/30 17:10:28
indeed :)
| |
116 } | |
117 | |
118 // runMergedQueries executes a user query `fq` against the parent datastore as | |
119 // well as the in-memory datastore, calling `cb` with the merged result set. | |
120 // | |
121 // It's expected that the caller of this function will apply limit and offset | |
122 // if the query contains those restrictions. This may convert the query to | |
123 // an expanded projection query with more data than the user asked for. It's the | |
124 // caller's responsibility to prune away the extra data. | |
125 // | |
126 // See also `dsTxnBuf.Run()`. | |
127 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error { | |
dnj
2015/09/30 16:35:26
nit: This function signature is huge. Can you line
iannucci
2015/09/30 17:10:28
done
| |
128 toRun, err := adjustQuery(fq) | |
129 if err != nil { | |
130 return err | |
131 } | |
132 | |
133 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) | |
134 cmpOrder := fq.Orders() | |
135 cmpFn := func(i *item) string { | |
136 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) | |
137 } | |
138 | |
139 dedup := stringset.Set(nil) | |
140 distinct := stringset.Set(nil) | |
141 distinctOrder := []ds.IndexColumn(nil) | |
142 if len(fq.Project()) > 0 { // the original query was a projection query | |
143 if fq.Distinct() { | |
144 // it was a distinct projection query, so we need to ded up by distinct | |
145 // options. | |
146 distinct = stringset.New(0) | |
147 proj := fq.Project() | |
148 distinctOrder = make([]ds.IndexColumn, len(proj)) | |
149 for i, p := range proj { | |
150 distinctOrder[i].Property = p | |
151 } | |
152 } | |
153 } else { | |
154 // the original was a normal or keysonly query, so we need to de dup by keys | |
dnj
2015/09/30 16:35:26
nit: keys-only. Also period at the end of comment.
iannucci
2015/09/30 17:10:28
done
| |
155 dedup = stringset.New(0) | |
156 } | |
157 | |
158 // need lock around dedup since it's not threadsafe and we read/write it in | |
159 // different goroutines. No lock is needed around state.entState.has bec ause | |
160 // the whole transaction is locked during this query and so the entState is | |
161 // effectively read-only. | |
162 dedupLock := sync.Mutex{} | |
163 dedupFn := (func(string) bool)(nil) | |
164 if dedup != nil { | |
165 dedupFn = func(val string) bool { | |
166 dedupLock.Lock() | |
167 defer dedupLock.Unlock() | |
168 return dedup.Has(val) | |
169 } | |
170 } | |
171 | |
172 stopChan := make(chan struct{}) | |
173 | |
174 parItemGet := queryToIter(stopChan, toRun, parentDS, func(key string) bo ol { | |
175 return sizes.has(key) || (dedupFn != nil && dedupFn(key)) | |
176 }) | |
177 memItemGet := queryToIter(stopChan, toRun, memDS, dedupFn) | |
178 | |
179 defer func() { | |
180 close(stopChan) | |
181 parItemGet() | |
182 memItemGet() | |
183 }() | |
184 | |
185 pitm, err := parItemGet() | |
186 if err != nil { | |
187 return err | |
188 } | |
189 | |
190 mitm, err := memItemGet() | |
191 if err != nil { | |
192 return err | |
193 } | |
194 | |
195 for { | |
196 usePitm := pitm != nil | |
197 if pitm != nil && mitm != nil { | |
198 usePitm = cmpFn(pitm) < cmpFn(mitm) | |
199 } else if pitm == nil && mitm == nil { | |
200 break | |
201 } | |
202 | |
203 toUse := (*item)(nil) | |
204 if usePitm { | |
205 toUse = pitm | |
206 if pitm, err = parItemGet(); err != nil { | |
207 return err | |
dnj
2015/09/30 16:35:26
Should punt toUse before failing from error gettin
iannucci
2015/09/30 17:10:28
ah, yeah, ok.
| |
208 } | |
209 } else { | |
210 toUse = mitm | |
211 if mitm, err = memItemGet(); err != nil { | |
212 return err | |
213 } | |
214 } | |
215 | |
216 if dedup != nil { | |
217 encKey := toUse.getEncKey() | |
218 dedupLock.Lock() | |
219 added := dedup.Add(encKey) | |
220 dedupLock.Unlock() | |
221 if !added { | |
222 continue | |
223 } | |
224 } | |
225 if distinct != nil { | |
226 // NOTE: We know that toUse will not be used after this point for | |
227 // comparison purposes, so re-use its cmpRow property fo r our distinct | |
228 // filter here. | |
229 toUse.cmpRow = "" | |
230 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) { | |
231 continue | |
232 } | |
233 } | |
234 if !cb(toUse.key, toUse.data) { | |
235 break | |
236 } | |
237 } | |
238 | |
239 return nil | |
240 } | |
241 | |
242 // toComparableString computes the byte-sortable 'order' string for the given | |
243 // key/PropertyMap. | |
244 // | |
245 // * start/end are byte sequences which are the inequality bounds of the | |
246 // query, if any. These are a serialized datastore.Property. If the | |
247 // inequality column is inverted, then start and end are also inverted and | |
248 // swapped with each other. | |
249 // * order is the list of sort orders in the actual executing queries. | |
250 // * k / pm are the data to derive a sortable string for. | |
251 // | |
252 // The result of this function is the series of serialized properties, one per | |
253 // order column, which represent this key/pm's first entry in the composite | |
254 // index that would point to it (e.g. the one with `order` sort orders). | |
255 func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) { | |
256 doCmp := true | |
257 soFar := []byte{} | |
258 ps := serialize.PropertyMapPartially(k, nil) | |
259 for _, ord := range order { | |
260 row, ok := ps[ord.Property] | |
261 if !ok { | |
262 if vals, ok := pm[ord.Property]; ok { | |
263 row = serialize.PropertySlice(vals) | |
264 } | |
265 } | |
266 sort.Sort(row) | |
267 foundOne := false | |
268 for _, serialized := range row { | |
269 if ord.Descending { | |
270 serialized = serialize.Invert(serialized) | |
271 } | |
272 if doCmp { | |
273 maybe := serialize.Join(soFar, serialized) | |
274 cmp := bytes.Compare(maybe, start) | |
275 if cmp >= 0 { | |
276 foundOne = true | |
277 soFar = maybe | |
278 doCmp = len(soFar) < len(start) | |
279 break | |
280 } | |
281 } else { | |
282 foundOne = true | |
283 soFar = serialize.Join(soFar, serialized) | |
284 break | |
285 } | |
286 } | |
287 if !foundOne { | |
288 return nil, nil | |
289 } | |
290 } | |
291 if end != nil && bytes.Compare(soFar, end) >= 0 { | |
292 return nil, nil | |
293 } | |
294 return soFar, ps["__key__"][0] | |
295 } | |
OLD | NEW |