Chromium Code Reviews| Index: filter/txnBuf/query_merger.go |
| diff --git a/filter/txnBuf/query_merger.go b/filter/txnBuf/query_merger.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..1e4ff44ccf92891208ee46d27eea9307c3ff9583 |
| --- /dev/null |
| +++ b/filter/txnBuf/query_merger.go |
| @@ -0,0 +1,295 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package txnBuf |
| + |
| +import ( |
| + "bytes" |
| + "sort" |
| + "sync" |
| + |
| + "github.com/luci/gae/impl/memory" |
| + ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/datastore/serialize" |
| + "github.com/luci/luci-go/common/stringset" |
| +) |
| + |
| +// queryToIter takes a FinalizedQuery and returns an iterator function which |
| +// will produce either *items or errors. |
| +// |
| +// - d is the raw datastore to run this query on |
| +// - dedup is a function which will return true if the given key has already |
| +// been used in the merged result set. If nil, then this check is skipped. |
| +// - filter is a function which will return true if the given key appears |
| +// in the in-memory result set. This means that all entries for this key |
| +// will show up from the in-memory query and don't need to be merged. This |
| +// also prevents deleted entities from showing up in a query. |
| +func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterface, dedup, filter func(string) bool) func() (*item, error) { |
| + c := make(chan *item) |
| + |
| + go func() { |
| + defer close(c) |
| + |
| + err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) (keepGoing bool) { |
| + i := &item{key: k, data: pm} |
| + encKey := i.getEncKey() |
| + if filter != nil && filter(encKey) { |
| + return true |
| + } |
| + if dedup != nil && dedup(encKey) { |
|
Vadim Sh.
2015/09/30 01:11:28
why have both dedup & filter if they are used iden
iannucci
2015/09/30 02:00:10
Hm... ok.
|
| + return true |
| + } |
| + |
| + select { |
| + case c <- i: |
| + return true |
| + case <-stopChan: |
| + return false |
| + } |
| + }) |
| + if err != nil { |
| + c <- &item{err: err} |
| + } |
| + }() |
| + |
| + return func() (*item, error) { |
| + for { |
| + itm := <-c |
| + if itm == nil { |
| + return nil, nil |
| + } |
| + if itm.err != nil { |
|
Vadim Sh.
2015/09/30 01:11:28
not sure it matters, but iteration function return
iannucci
2015/09/30 02:00:10
doesn't matter, we bail on the first error.
|
| + return nil, itm.err |
| + } |
| + return itm, nil |
| + } |
| + } |
| +} |
| + |
| +// adjustQuery applies various mutations to the query to make it suitable for |
| +// merging. In general, this removes limits and offsets the 'distinct' modifier, |
| +// and it ensures that if there are sort orders which won't appear in the |
| +// result data that the query is transformed into a projection query which |
| +// contains all of the data. A non-projection query will never be transformed |
| +// in this way. |
| +func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { |
| + q := fq.Original() |
| + |
| + // The limit and offset must be done in-memory because otherwise we may |
| + // request too few entities from the underlying store if many matching |
| + // entities have been deleted in the buffered transaction. |
| + q = q.Limit(-1) |
| + q = q.Offset(-1) |
| + |
| + // distinction must be done in-memory, because otherwise there's no way |
| + // to merge in the effect of the in-flight changes (because there's no way |
| + // to push back to the datastore "yeah, I know you told me that the (1, 2) |
| + // result came from `/Bob,1`, but would you mind pretending that it didn't |
| + // and tell me next the one instead? |
| + if fq.Distinct() { |
| + q = q.Distinct(false) |
| + } |
| + |
| + // since we need to merge results, we must have all order-related fields |
| + // in each result. The only time we wouldn't have all the data available would |
| + // be for a keys-only or projection query. To fix this, we convert all |
| + // Projection and KeysOnly queries to project on /all/ Orders. |
| + // |
| + // FinalizedQuery already guarantees that all projected fields show up in |
| + // the Orders, but the projected fields could be a subset of the orders. |
| + // |
| + // Additionally on a keys-only query, any orders other than __key__ require |
| + // conversion of this query to a projection query including those orders in |
| + // order to merge the results correctly. |
| + // |
| + // In both cases, the resulting objects returned to the higher layers of |
| + // the stack will only include the information requested by the user; keysonly |
| + // queries will discard all PropertyMap data, and projection queries will |
| + // discard any field data that the user didn't ask for. |
| + orders := fq.Orders() |
| + if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) { |
| + q = q.KeysOnly(false) |
| + |
| + for _, o := range orders { |
| + if o.Property == "__key__" { |
| + continue |
| + } |
| + q = q.Project(o.Property) |
| + } |
| + } |
| + |
| + return q.Finalize() |
| +} |
| + |
| +// runMergedQueries executes a user query `fq` against the parent datastore as |
| +// well as the in-memory datastore, calling `cb` with the merged result set. |
| +// |
| +// It's expected that the caller of this function will apply limit and offset |
| +// if the query contains those restrictions. This may convert the query to |
| +// an expanded projection query with more data than the user asked for. It's the |
| +// caller's responsibility to prune away the extra data. |
| +// |
| +// See also `dsTxnBuf.Run()`. |
| +func runMergedQueries(fq *ds.FinalizedQuery, state *txnBufState, cb func(k *ds.Key, data ds.PropertyMap) bool) error { |
| + toRun, err := adjustQuery(fq) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + stopChan := make(chan struct{}) |
| + defer close(stopChan) |
| + |
| + cmpLower, cmpUpper := memory.GetBinaryBounds(fq) |
| + cmpOrder := fq.Orders() |
| + cmpFn := func(i *item) string { |
| + return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) |
| + } |
| + |
| + dedup := stringset.Set(nil) |
| + distinct := stringset.Set(nil) |
| + distinctOrder := []ds.IndexColumn(nil) |
| + if len(fq.Project()) > 0 { // the original query was a projection query |
| + if fq.Distinct() { |
| + // it was a distinct projection query, so we need to dedup by distinct |
| + // options. |
| + distinct = stringset.New(0) |
| + proj := fq.Project() |
| + distinctOrder = make([]ds.IndexColumn, len(proj)) |
| + for i, p := range proj { |
| + distinctOrder[i].Property = p |
| + } |
| + } |
| + } else { |
| + // the original was a normal or keysonly query, so we need to dedup by keys |
| + dedup = stringset.New(0) |
| + } |
| + |
| + // need lock around dedup since it's not threadsafe and we read/write it in |
| + // different goroutines. No lock is needed around state.entState.has because |
| + // the whole transaction is locked during this query and so the entState is |
| + // effectively read-only. |
| + dedupLock := sync.Mutex{} |
| + dedupFn := (func(string) bool)(nil) |
| + if dedup != nil { |
| + dedupFn = func(val string) bool { |
| + dedupLock.Lock() |
| + defer dedupLock.Unlock() |
| + return dedup.Has(val) |
| + } |
| + } |
| + |
| + parItemGet := queryToIter(stopChan, toRun, state.parentDS, dedupFn, state.entState.has) |
| + memItemGet := queryToIter(stopChan, toRun, state.memDS, dedupFn, nil) |
| + |
| + pitm, err := parItemGet() |
| + if err != nil { |
| + return err |
| + } |
| + |
| + mitm, err := memItemGet() |
| + if err != nil { |
| + return err |
| + } |
| + |
| + for { |
| + usePitm := pitm != nil |
| + if pitm != nil && mitm != nil { |
| + usePitm = cmpFn(pitm) < cmpFn(mitm) |
| + } else if pitm == nil && mitm == nil { |
| + break |
| + } |
| + |
| + toUse := (*item)(nil) |
| + if usePitm { |
| + toUse = pitm |
| + if pitm, err = parItemGet(); err != nil { |
| + return err |
| + } |
| + } else { |
| + toUse = mitm |
| + if mitm, err = memItemGet(); err != nil { |
| + return err |
| + } |
| + } |
| + |
| + if dedup != nil { |
| + encKey := toUse.getEncKey() |
| + dedupLock.Lock() |
| + added := dedup.Add(encKey) |
| + dedupLock.Unlock() |
| + if !added { |
| + continue |
| + } |
| + } |
| + if distinct != nil { |
| + // NOTE: We know that toUse will not be used after this point for |
| + // comparison purposes, so re-use its cmpRow property for our distinct |
| + // filter here. |
| + toUse.cmpRow = "" |
| + if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder)) { |
| + continue |
| + } |
| + } |
| + if !cb(toUse.key, toUse.data) { |
| + break |
| + } |
| + } |
| + |
| + return nil |
| +} |
| + |
| +// toComparableString computes the byte-sortable 'order' string for the given |
| +// key/PropertyMap. |
| +// |
| +// * start/end are byte sequences which are the inequality bounds of the |
| +// query, if any. These are a serialized datastore.Property. If the |
| +// inequality column is inverted, then start and end are also inverted and |
| +// swapped with each other. |
| +// * order is the list of sort orders in the actual executing queries. |
| +// * k / pm are the data to derive a sortable string for. |
| +// |
| +// The result of this function is the series of serialized properties, one per |
| +// order column, which represent this key/pm's first entry in the composite |
| +// index that would point to it (e.g. the one with `order` sort orders). |
| +func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) { |
| + doCmp := true |
| + soFar := []byte{} |
| + ps := serialize.PropertyMapPartially(k, nil) |
| + for _, ord := range order { |
| + row, ok := ps[ord.Property] |
| + if !ok { |
| + if vals, ok := pm[ord.Property]; ok { |
| + row = serialize.PropertySlice(vals) |
| + } |
| + } |
| + sort.Sort(row) |
| + foundOne := false |
| + for _, serialized := range row { |
| + if ord.Descending { |
| + serialized = serialize.Invert(serialized) |
| + } |
| + if doCmp { |
| + maybe := serialize.Join(soFar, serialized) |
| + cmp := bytes.Compare(maybe, start) |
| + if cmp >= 0 { |
| + foundOne = true |
| + soFar = maybe |
| + doCmp = len(soFar) < len(start) |
| + break |
| + } |
| + } else { |
| + foundOne = true |
| + soFar = serialize.Join(soFar, serialized) |
| + break |
| + } |
| + } |
| + if !foundOne { |
| + return nil, nil |
| + } |
| + } |
| + if end != nil && bytes.Compare(soFar, end) >= 0 { |
| + return nil, nil |
| + } |
| + return soFar, ps["__key__"][0] |
| +} |