Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package txnBuf | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "sort" | |
| 10 "sync" | |
| 11 | |
| 12 "github.com/luci/gae/impl/memory" | |
| 13 ds "github.com/luci/gae/service/datastore" | |
| 14 "github.com/luci/gae/service/datastore/serialize" | |
| 15 "github.com/luci/luci-go/common/stringset" | |
| 16 ) | |
| 17 | |
| 18 // queryToIter takes a FinalizedQuery and returns an iterator function which | |
| 19 // will produce either *items or errors. | |
| 20 // | |
| 21 // - d is the raw datastore to run this query on | |
| 22 // - filter is a function which will return true if the given key should be | |
| 23 // excluded from the result set. | |
| 24 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e, filter func(string) bool) func() (*item, error) { | |
| 25 c := make(chan *item) | |
| 26 | |
| 27 go func() { | |
| 28 defer close(c) | |
| 29 | |
| 30 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) { | |
| 31 i := &item{key: k, data: pm} | |
| 32 encKey := i.getEncKey() | |
| 33 if filter != nil && filter(encKey) { | |
| 34 return true | |
| 35 } | |
| 36 | |
| 37 select { | |
| 38 case c <- i: | |
| 39 return true | |
| 40 case <-stopChan: | |
| 41 return false | |
| 42 } | |
| 43 }) | |
| 44 if err != nil { | |
| 45 c <- &item{err: err} | |
| 46 } | |
| 47 }() | |
| 48 | |
| 49 return func() (*item, error) { | |
| 50 for { | |
|
dnj
2015/09/30 16:35:26
No need for "for" loop; you always return.
iannucci
2015/09/30 17:10:28
wat. oh, I think i used to have the filter check i
| |
| 51 itm := <-c | |
| 52 if itm == nil { | |
| 53 return nil, nil | |
| 54 } | |
| 55 if itm.err != nil { | |
| 56 return nil, itm.err | |
| 57 } | |
| 58 return itm, nil | |
| 59 } | |
| 60 } | |
| 61 } | |
| 62 | |
| 63 // adjustQuery applies various mutations to the query to make it suitable for | |
| 64 // merging. In general, this removes limits and offsets the 'distinct' modifier, | |
| 65 // and it ensures that if there are sort orders which won't appear in the | |
| 66 // result data that the query is transformed into a projection query which | |
| 67 // contains all of the data. A non-projection query will never be transformed | |
| 68 // in this way. | |
| 69 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { | |
| 70 q := fq.Original() | |
| 71 | |
| 72 // The limit and offset must be done in-memory because otherwise we may | |
| 73 // request too few entities from the underlying store if many matching | |
| 74 // entities have been deleted in the buffered transaction. | |
| 75 q = q.Limit(-1) | |
| 76 q = q.Offset(-1) | |
| 77 | |
| 78 // distinction must be done in-memory, because otherwise there's no way | |
| 79 // to merge in the effect of the in-flight changes (because there's no w ay | |
| 80 // to push back to the datastore "yeah, I know you told me that the (1, 2) | |
| 81 // result came from `/Bob,1`, but would you mind pretending that it didn 't | |
| 82 // and tell me next the one instead? | |
| 83 if fq.Distinct() { | |
|
dnj
2015/09/30 16:35:26
nit: ditch the "if", just say q = q.Distinct(false
iannucci
2015/09/30 17:10:28
done
| |
| 84 q = q.Distinct(false) | |
| 85 } | |
| 86 | |
| 87 // since we need to merge results, we must have all order-related fields | |
| 88 // in each result. The only time we wouldn't have all the data available would | |
| 89 // be for a keys-only or projection query. To fix this, we convert all | |
| 90 // Projection and KeysOnly queries to project on /all/ Orders. | |
| 91 // | |
| 92 // FinalizedQuery already guarantees that all projected fields show up i n | |
| 93 // the Orders, but the projected fields could be a subset of the orders. | |
| 94 // | |
| 95 // Additionally on a keys-only query, any orders other than __key__ requ ire | |
| 96 // conversion of this query to a projection query including those orders in | |
| 97 // order to merge the results correctly. | |
| 98 // | |
| 99 // In both cases, the resulting objects returned to the higher layers of | |
| 100 // the stack will only include the information requested by the user; ke ysonly | |
|
dnj
2015/09/30 16:35:26
nit: keys-only
iannucci
2015/09/30 17:10:27
done
| |
| 101 // queries will discard all PropertyMap data, and projection queries wil l | |
| 102 // discard any field data that the user didn't ask for. | |
| 103 orders := fq.Orders() | |
| 104 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) { | |
| 105 q = q.KeysOnly(false) | |
| 106 | |
| 107 for _, o := range orders { | |
| 108 if o.Property == "__key__" { | |
| 109 continue | |
| 110 } | |
| 111 q = q.Project(o.Property) | |
| 112 } | |
| 113 } | |
| 114 | |
| 115 return q.Finalize() | |
|
dnj
2015/09/30 16:35:26
All hail editable queries!
iannucci
2015/09/30 17:10:28
indeed :)
| |
| 116 } | |
| 117 | |
| 118 // runMergedQueries executes a user query `fq` against the parent datastore as | |
| 119 // well as the in-memory datastore, calling `cb` with the merged result set. | |
| 120 // | |
| 121 // It's expected that the caller of this function will apply limit and offset | |
| 122 // if the query contains those restrictions. This may convert the query to | |
| 123 // an expanded projection query with more data than the user asked for. It's the | |
| 124 // caller's responsibility to prune away the extra data. | |
| 125 // | |
| 126 // See also `dsTxnBuf.Run()`. | |
| 127 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error { | |
|
dnj
2015/09/30 16:35:26
nit: This function signature is huge. Can you line
iannucci
2015/09/30 17:10:28
done
| |
| 128 toRun, err := adjustQuery(fq) | |
| 129 if err != nil { | |
| 130 return err | |
| 131 } | |
| 132 | |
| 133 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) | |
| 134 cmpOrder := fq.Orders() | |
| 135 cmpFn := func(i *item) string { | |
| 136 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) | |
| 137 } | |
| 138 | |
| 139 dedup := stringset.Set(nil) | |
| 140 distinct := stringset.Set(nil) | |
| 141 distinctOrder := []ds.IndexColumn(nil) | |
| 142 if len(fq.Project()) > 0 { // the original query was a projection query | |
| 143 if fq.Distinct() { | |
| 144 // it was a distinct projection query, so we need to ded up by distinct | |
| 145 // options. | |
| 146 distinct = stringset.New(0) | |
| 147 proj := fq.Project() | |
| 148 distinctOrder = make([]ds.IndexColumn, len(proj)) | |
| 149 for i, p := range proj { | |
| 150 distinctOrder[i].Property = p | |
| 151 } | |
| 152 } | |
| 153 } else { | |
| 154 // the original was a normal or keysonly query, so we need to de dup by keys | |
|
dnj
2015/09/30 16:35:26
nit: keys-only. Also period at the end of comment.
iannucci
2015/09/30 17:10:28
done
| |
| 155 dedup = stringset.New(0) | |
| 156 } | |
| 157 | |
| 158 // need lock around dedup since it's not threadsafe and we read/write it in | |
| 159 // different goroutines. No lock is needed around state.entState.has bec ause | |
| 160 // the whole transaction is locked during this query and so the entState is | |
| 161 // effectively read-only. | |
| 162 dedupLock := sync.Mutex{} | |
| 163 dedupFn := (func(string) bool)(nil) | |
| 164 if dedup != nil { | |
| 165 dedupFn = func(val string) bool { | |
| 166 dedupLock.Lock() | |
| 167 defer dedupLock.Unlock() | |
| 168 return dedup.Has(val) | |
| 169 } | |
| 170 } | |
| 171 | |
| 172 stopChan := make(chan struct{}) | |
| 173 | |
| 174 parItemGet := queryToIter(stopChan, toRun, parentDS, func(key string) bo ol { | |
| 175 return sizes.has(key) || (dedupFn != nil && dedupFn(key)) | |
| 176 }) | |
| 177 memItemGet := queryToIter(stopChan, toRun, memDS, dedupFn) | |
| 178 | |
| 179 defer func() { | |
| 180 close(stopChan) | |
| 181 parItemGet() | |
| 182 memItemGet() | |
| 183 }() | |
| 184 | |
| 185 pitm, err := parItemGet() | |
| 186 if err != nil { | |
| 187 return err | |
| 188 } | |
| 189 | |
| 190 mitm, err := memItemGet() | |
| 191 if err != nil { | |
| 192 return err | |
| 193 } | |
| 194 | |
| 195 for { | |
| 196 usePitm := pitm != nil | |
| 197 if pitm != nil && mitm != nil { | |
| 198 usePitm = cmpFn(pitm) < cmpFn(mitm) | |
| 199 } else if pitm == nil && mitm == nil { | |
| 200 break | |
| 201 } | |
| 202 | |
| 203 toUse := (*item)(nil) | |
| 204 if usePitm { | |
| 205 toUse = pitm | |
| 206 if pitm, err = parItemGet(); err != nil { | |
| 207 return err | |
|
dnj
2015/09/30 16:35:26
Should punt toUse before failing from error gettin
iannucci
2015/09/30 17:10:28
ah, yeah, ok.
| |
| 208 } | |
| 209 } else { | |
| 210 toUse = mitm | |
| 211 if mitm, err = memItemGet(); err != nil { | |
| 212 return err | |
| 213 } | |
| 214 } | |
| 215 | |
| 216 if dedup != nil { | |
| 217 encKey := toUse.getEncKey() | |
| 218 dedupLock.Lock() | |
| 219 added := dedup.Add(encKey) | |
| 220 dedupLock.Unlock() | |
| 221 if !added { | |
| 222 continue | |
| 223 } | |
| 224 } | |
| 225 if distinct != nil { | |
| 226 // NOTE: We know that toUse will not be used after this point for | |
| 227 // comparison purposes, so re-use its cmpRow property fo r our distinct | |
| 228 // filter here. | |
| 229 toUse.cmpRow = "" | |
| 230 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) { | |
| 231 continue | |
| 232 } | |
| 233 } | |
| 234 if !cb(toUse.key, toUse.data) { | |
| 235 break | |
| 236 } | |
| 237 } | |
| 238 | |
| 239 return nil | |
| 240 } | |
| 241 | |
| 242 // toComparableString computes the byte-sortable 'order' string for the given | |
| 243 // key/PropertyMap. | |
| 244 // | |
| 245 // * start/end are byte sequences which are the inequality bounds of the | |
| 246 // query, if any. These are a serialized datastore.Property. If the | |
| 247 // inequality column is inverted, then start and end are also inverted and | |
| 248 // swapped with each other. | |
| 249 // * order is the list of sort orders in the actual executing queries. | |
| 250 // * k / pm are the data to derive a sortable string for. | |
| 251 // | |
| 252 // The result of this function is the series of serialized properties, one per | |
| 253 // order column, which represent this key/pm's first entry in the composite | |
| 254 // index that would point to it (e.g. the one with `order` sort orders). | |
| 255 func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) { | |
| 256 doCmp := true | |
| 257 soFar := []byte{} | |
| 258 ps := serialize.PropertyMapPartially(k, nil) | |
| 259 for _, ord := range order { | |
| 260 row, ok := ps[ord.Property] | |
| 261 if !ok { | |
| 262 if vals, ok := pm[ord.Property]; ok { | |
| 263 row = serialize.PropertySlice(vals) | |
| 264 } | |
| 265 } | |
| 266 sort.Sort(row) | |
| 267 foundOne := false | |
| 268 for _, serialized := range row { | |
| 269 if ord.Descending { | |
| 270 serialized = serialize.Invert(serialized) | |
| 271 } | |
| 272 if doCmp { | |
| 273 maybe := serialize.Join(soFar, serialized) | |
| 274 cmp := bytes.Compare(maybe, start) | |
| 275 if cmp >= 0 { | |
| 276 foundOne = true | |
| 277 soFar = maybe | |
| 278 doCmp = len(soFar) < len(start) | |
| 279 break | |
| 280 } | |
| 281 } else { | |
| 282 foundOne = true | |
| 283 soFar = serialize.Join(soFar, serialized) | |
| 284 break | |
| 285 } | |
| 286 } | |
| 287 if !foundOne { | |
| 288 return nil, nil | |
| 289 } | |
| 290 } | |
| 291 if end != nil && bytes.Compare(soFar, end) >= 0 { | |
| 292 return nil, nil | |
| 293 } | |
| 294 return soFar, ps["__key__"][0] | |
| 295 } | |
| OLD | NEW |