Index: filter/txnBuf/query_merger.go |
diff --git a/filter/txnBuf/query_merger.go b/filter/txnBuf/query_merger.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e87d91ea5daed6de1b5ebc7f5999d377b46d30ae |
--- /dev/null |
+++ b/filter/txnBuf/query_merger.go |
@@ -0,0 +1,298 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package txnBuf |
+ |
+import ( |
+ "bytes" |
+ "sort" |
+ |
+ "github.com/luci/gae/impl/memory" |
+ ds "github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/datastore/serialize" |
+ "github.com/luci/luci-go/common/stringset" |
+) |
+ |
+// queryToIter takes a FinalizedQuery and returns an iterator function which |
+// will produce either *items or errors. |
+// |
+// - d is the raw datastore to run this query on |
+// - filter is a function which will return true if the given key should be |
+// excluded from the result set. |
+func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterface) func() (*item, error) { |
+ c := make(chan *item) |
+ |
+ go func() { |
+ defer close(c) |
+ |
+ err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) (keepGoing bool) { |
+ i := &item{key: k, data: pm} |
+ select { |
+ case c <- i: |
+ return true |
+ case <-stopChan: |
+ return false |
+ } |
+ }) |
+ if err != nil { |
+ c <- &item{err: err} |
+ } |
+ }() |
+ |
+ return func() (*item, error) { |
+ itm := <-c |
+ if itm == nil { |
+ return nil, nil |
+ } |
+ if itm.err != nil { |
+ return nil, itm.err |
+ } |
+ return itm, nil |
+ } |
+} |
+ |
+// adjustQuery applies various mutations to the query to make it suitable for |
+// merging. In general, this removes limits and offsets the 'distinct' modifier, |
+// and it ensures that if there are sort orders which won't appear in the |
+// result data that the query is transformed into a projection query which |
+// contains all of the data. A non-projection query will never be transformed |
+// in this way. |
+func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { |
+ q := fq.Original() |
+ |
+ // The limit and offset must be done in-memory because otherwise we may |
+ // request too few entities from the underlying store if many matching |
+ // entities have been deleted in the buffered transaction. |
+ q = q.Limit(-1) |
+ q = q.Offset(-1) |
+ |
+ // distinction must be done in-memory, because otherwise there's no way |
+ // to merge in the effect of the in-flight changes (because there's no way |
+ // to push back to the datastore "yeah, I know you told me that the (1, 2) |
+ // result came from `/Bob,1`, but would you mind pretending that it didn't |
+ // and tell me next the one instead? |
+ q = q.Distinct(false) |
+ |
+ // since we need to merge results, we must have all order-related fields |
+ // in each result. The only time we wouldn't have all the data available would |
+ // be for a keys-only or projection query. To fix this, we convert all |
+ // Projection and KeysOnly queries to project on /all/ Orders. |
+ // |
+ // FinalizedQuery already guarantees that all projected fields show up in |
+ // the Orders, but the projected fields could be a subset of the orders. |
+ // |
+ // Additionally on a keys-only query, any orders other than __key__ require |
+ // conversion of this query to a projection query including those orders in |
+ // order to merge the results correctly. |
+ // |
+ // In both cases, the resulting objects returned to the higher layers of the |
+ // stack will only include the information requested by the user; keys-only |
+ // queries will discard all PropertyMap data, and projection queries will |
+ // discard any field data that the user didn't ask for. |
+ orders := fq.Orders() |
+ if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) { |
+ q = q.KeysOnly(false) |
+ |
+ for _, o := range orders { |
+ if o.Property == "__key__" { |
+ continue |
+ } |
+ q = q.Project(o.Property) |
+ } |
+ } |
+ |
+ return q.Finalize() |
+} |
+ |
+// runMergedQueries executes a user query `fq` against the parent datastore as |
+// well as the in-memory datastore, calling `cb` with the merged result set. |
+// |
+// It's expected that the caller of this function will apply limit and offset |
+// if the query contains those restrictions. This may convert the query to |
+// an expanded projection query with more data than the user asked for. It's the |
+// caller's responsibility to prune away the extra data. |
+// |
+// See also `dsTxnBuf.Run()`. |
+func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, |
+ memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error { |
+ |
+ toRun, err := adjustQuery(fq) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ cmpLower, cmpUpper := memory.GetBinaryBounds(fq) |
+ cmpOrder := fq.Orders() |
+ cmpFn := func(i *item) string { |
+ return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) |
+ } |
+ |
+ dedup := stringset.Set(nil) |
+ distinct := stringset.Set(nil) |
+ distinctOrder := []ds.IndexColumn(nil) |
+ if len(fq.Project()) > 0 { // the original query was a projection query |
+ if fq.Distinct() { |
+ // it was a distinct projection query, so we need to dedup by distinct |
+ // options. |
+ distinct = stringset.New(0) |
+ proj := fq.Project() |
+ distinctOrder = make([]ds.IndexColumn, len(proj)) |
+ for i, p := range proj { |
+ distinctOrder[i].Property = p |
+ } |
+ } |
+ } else { |
+ // the original was a normal or keys-only query, so we need to dedup by keys. |
+ dedup = stringset.New(0) |
+ } |
+ |
+ stopChan := make(chan struct{}) |
+ |
+ parIter := queryToIter(stopChan, toRun, parentDS) |
+ memIter := queryToIter(stopChan, toRun, memDS) |
+ |
+ parItemGet := func() (*item, error) { |
+ for { |
+ itm, err := parIter() |
+ if itm == nil || err != nil { |
+ return nil, err |
+ } |
+ encKey := itm.getEncKey() |
+ if sizes.has(encKey) || (dedup != nil && dedup.Has(encKey)) { |
+ continue |
+ } |
+ return itm, nil |
+ } |
+ } |
+ memItemGet := func() (*item, error) { |
+ for { |
+ itm, err := memIter() |
+ if itm == nil || err != nil { |
+ return nil, err |
+ } |
+ if dedup != nil && dedup.Has(itm.getEncKey()) { |
+ continue |
+ } |
+ return itm, nil |
+ } |
+ } |
+ |
+ defer func() { |
+ close(stopChan) |
+ parItemGet() |
+ memItemGet() |
+ }() |
+ |
+ pitm, err := parItemGet() |
+ if err != nil { |
+ return err |
+ } |
+ |
+ mitm, err := memItemGet() |
+ if err != nil { |
+ return err |
+ } |
+ |
+ for { |
+ // the err can be set during the loop below. If we come around the bend and |
+ // it's set, then we need to return it. We don't check it immediately |
+ // because it's set after we already have a good result to return to the |
+ // user. |
+ if err != nil { |
+ return err |
+ } |
+ |
+ usePitm := pitm != nil |
+ if pitm != nil && mitm != nil { |
+ usePitm = cmpFn(pitm) < cmpFn(mitm) |
+ } else if pitm == nil && mitm == nil { |
+ break |
+ } |
+ |
+ toUse := (*item)(nil) |
+ // we check the error at the beginning of the loop. |
+ if usePitm { |
+ toUse = pitm |
+ pitm, err = parItemGet() |
+ } else { |
+ toUse = mitm |
+ mitm, err = memItemGet() |
+ } |
+ |
+ if dedup != nil { |
+ if !dedup.Add(toUse.getEncKey()) { |
+ continue |
+ } |
+ } |
+ if distinct != nil { |
+ // NOTE: We know that toUse will not be used after this point for |
+ // comparison purposes, so re-use its cmpRow property for our distinct |
+ // filter here. |
+ toUse.cmpRow = "" |
+ if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder)) { |
+ continue |
+ } |
+ } |
+ if !cb(toUse.key, toUse.data) { |
+ break |
+ } |
+ } |
+ |
+ return nil |
+} |
+ |
+// toComparableString computes the byte-sortable 'order' string for the given |
+// key/PropertyMap. |
+// |
+// * start/end are byte sequences which are the inequality bounds of the |
+// query, if any. These are a serialized datastore.Property. If the |
+// inequality column is inverted, then start and end are also inverted and |
+// swapped with each other. |
+// * order is the list of sort orders in the actual executing queries. |
+// * k / pm are the data to derive a sortable string for. |
+// |
+// The result of this function is the series of serialized properties, one per |
+// order column, which represent this key/pm's first entry in the composite |
+// index that would point to it (e.g. the one with `order` sort orders). |
+func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) { |
+ doCmp := true |
+ soFar := []byte{} |
+ ps := serialize.PropertyMapPartially(k, nil) |
+ for _, ord := range order { |
+ row, ok := ps[ord.Property] |
+ if !ok { |
+ if vals, ok := pm[ord.Property]; ok { |
+ row = serialize.PropertySlice(vals) |
+ } |
+ } |
+ sort.Sort(row) |
+ foundOne := false |
+ for _, serialized := range row { |
+ if ord.Descending { |
+ serialized = serialize.Invert(serialized) |
+ } |
+ if doCmp { |
+ maybe := serialize.Join(soFar, serialized) |
+ cmp := bytes.Compare(maybe, start) |
+ if cmp >= 0 { |
+ foundOne = true |
+ soFar = maybe |
+ doCmp = len(soFar) < len(start) |
+ break |
+ } |
+ } else { |
+ foundOne = true |
+ soFar = serialize.Join(soFar, serialized) |
+ break |
+ } |
+ } |
+ if !foundOne { |
+ return nil, nil |
+ } |
+ } |
+ if end != nil && bytes.Compare(soFar, end) >= 0 { |
+ return nil, nil |
+ } |
+ return soFar, ps["__key__"][0] |
+} |