Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1437)

Unified Diff: filter/txnBuf/query_merger.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: add err for too many roots Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: filter/txnBuf/query_merger.go
diff --git a/filter/txnBuf/query_merger.go b/filter/txnBuf/query_merger.go
new file mode 100644
index 0000000000000000000000000000000000000000..b21c1391ce93c1be256530df9c416718e24d1209
--- /dev/null
+++ b/filter/txnBuf/query_merger.go
@@ -0,0 +1,295 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package txnBuf
+
+import (
+ "bytes"
+ "sort"
+ "sync"
+
+ "github.com/luci/gae/impl/memory"
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/datastore/serialize"
+ "github.com/luci/luci-go/common/stringset"
+)
+
+// queryToIter takes a FinalizedQuery and returns an iterator function which
+// will produce either *items or errors.
+//
+// - d is the raw datastore to run this query on
+// - filter is a function which will return true if the given key should be
+// excluded from the result set.
+func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterface, filter func(string) bool) func() (*item, error) {
+ c := make(chan *item)
+
+ go func() {
+ defer close(c)
+
+ err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) (keepGoing bool) {
+ i := &item{key: k, data: pm}
+ encKey := i.getEncKey()
+ if filter != nil && filter(encKey) {
+ return true
+ }
+
+ select {
+ case c <- i:
+ return true
+ case <-stopChan:
+ return false
+ }
+ })
+ if err != nil {
+ c <- &item{err: err}
+ }
+ }()
+
+ return func() (*item, error) {
+ for {
dnj 2015/09/30 16:35:26 No need for "for" loop; you always return.
iannucci 2015/09/30 17:10:28 wat. oh, I think i used to have the filter check i
+ itm := <-c
+ if itm == nil {
+ return nil, nil
+ }
+ if itm.err != nil {
+ return nil, itm.err
+ }
+ return itm, nil
+ }
+ }
+}
+
+// adjustQuery applies various mutations to the query to make it suitable for
+// merging. In general, this removes limits and offsets the 'distinct' modifier,
+// and it ensures that if there are sort orders which won't appear in the
+// result data that the query is transformed into a projection query which
+// contains all of the data. A non-projection query will never be transformed
+// in this way.
+func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
+ q := fq.Original()
+
+ // The limit and offset must be done in-memory because otherwise we may
+ // request too few entities from the underlying store if many matching
+ // entities have been deleted in the buffered transaction.
+ q = q.Limit(-1)
+ q = q.Offset(-1)
+
+ // distinction must be done in-memory, because otherwise there's no way
+ // to merge in the effect of the in-flight changes (because there's no way
+ // to push back to the datastore "yeah, I know you told me that the (1, 2)
+ // result came from `/Bob,1`, but would you mind pretending that it didn't
+ // and tell me next the one instead?
+ if fq.Distinct() {
dnj 2015/09/30 16:35:26 nit: ditch the "if", just say q = q.Distinct(false
iannucci 2015/09/30 17:10:28 done
+ q = q.Distinct(false)
+ }
+
+ // since we need to merge results, we must have all order-related fields
+ // in each result. The only time we wouldn't have all the data available would
+ // be for a keys-only or projection query. To fix this, we convert all
+ // Projection and KeysOnly queries to project on /all/ Orders.
+ //
+ // FinalizedQuery already guarantees that all projected fields show up in
+ // the Orders, but the projected fields could be a subset of the orders.
+ //
+ // Additionally on a keys-only query, any orders other than __key__ require
+ // conversion of this query to a projection query including those orders in
+ // order to merge the results correctly.
+ //
+ // In both cases, the resulting objects returned to the higher layers of
+ // the stack will only include the information requested by the user; keysonly
dnj 2015/09/30 16:35:26 nit: keys-only
iannucci 2015/09/30 17:10:27 done
+ // queries will discard all PropertyMap data, and projection queries will
+ // discard any field data that the user didn't ask for.
+ orders := fq.Orders()
+ if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
+ q = q.KeysOnly(false)
+
+ for _, o := range orders {
+ if o.Property == "__key__" {
+ continue
+ }
+ q = q.Project(o.Property)
+ }
+ }
+
+ return q.Finalize()
dnj 2015/09/30 16:35:26 All hail editable queries!
iannucci 2015/09/30 17:10:28 indeed :)
+}
+
+// runMergedQueries executes a user query `fq` against the parent datastore as
+// well as the in-memory datastore, calling `cb` with the merged result set.
+//
+// It's expected that the caller of this function will apply limit and offset
+// if the query contains those restrictions. This may convert the query to
+// an expanded projection query with more data than the user asked for. It's the
+// caller's responsibility to prune away the extra data.
+//
+// See also `dsTxnBuf.Run()`.
+func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error {
dnj 2015/09/30 16:35:26 nit: This function signature is huge. Can you line
iannucci 2015/09/30 17:10:28 done
+ toRun, err := adjustQuery(fq)
+ if err != nil {
+ return err
+ }
+
+ cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
+ cmpOrder := fq.Orders()
+ cmpFn := func(i *item) string {
+ return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
+ }
+
+ dedup := stringset.Set(nil)
+ distinct := stringset.Set(nil)
+ distinctOrder := []ds.IndexColumn(nil)
+ if len(fq.Project()) > 0 { // the original query was a projection query
+ if fq.Distinct() {
+ // it was a distinct projection query, so we need to dedup by distinct
+ // options.
+ distinct = stringset.New(0)
+ proj := fq.Project()
+ distinctOrder = make([]ds.IndexColumn, len(proj))
+ for i, p := range proj {
+ distinctOrder[i].Property = p
+ }
+ }
+ } else {
+ // the original was a normal or keysonly query, so we need to dedup by keys
dnj 2015/09/30 16:35:26 nit: keys-only. Also period at the end of comment.
iannucci 2015/09/30 17:10:28 done
+ dedup = stringset.New(0)
+ }
+
+ // need lock around dedup since it's not threadsafe and we read/write it in
+ // different goroutines. No lock is needed around state.entState.has because
+ // the whole transaction is locked during this query and so the entState is
+ // effectively read-only.
+ dedupLock := sync.Mutex{}
+ dedupFn := (func(string) bool)(nil)
+ if dedup != nil {
+ dedupFn = func(val string) bool {
+ dedupLock.Lock()
+ defer dedupLock.Unlock()
+ return dedup.Has(val)
+ }
+ }
+
+ stopChan := make(chan struct{})
+
+ parItemGet := queryToIter(stopChan, toRun, parentDS, func(key string) bool {
+ return sizes.has(key) || (dedupFn != nil && dedupFn(key))
+ })
+ memItemGet := queryToIter(stopChan, toRun, memDS, dedupFn)
+
+ defer func() {
+ close(stopChan)
+ parItemGet()
+ memItemGet()
+ }()
+
+ pitm, err := parItemGet()
+ if err != nil {
+ return err
+ }
+
+ mitm, err := memItemGet()
+ if err != nil {
+ return err
+ }
+
+ for {
+ usePitm := pitm != nil
+ if pitm != nil && mitm != nil {
+ usePitm = cmpFn(pitm) < cmpFn(mitm)
+ } else if pitm == nil && mitm == nil {
+ break
+ }
+
+ toUse := (*item)(nil)
+ if usePitm {
+ toUse = pitm
+ if pitm, err = parItemGet(); err != nil {
+ return err
dnj 2015/09/30 16:35:26 Should punt toUse before failing from error gettin
iannucci 2015/09/30 17:10:28 ah, yeah, ok.
+ }
+ } else {
+ toUse = mitm
+ if mitm, err = memItemGet(); err != nil {
+ return err
+ }
+ }
+
+ if dedup != nil {
+ encKey := toUse.getEncKey()
+ dedupLock.Lock()
+ added := dedup.Add(encKey)
+ dedupLock.Unlock()
+ if !added {
+ continue
+ }
+ }
+ if distinct != nil {
+ // NOTE: We know that toUse will not be used after this point for
+ // comparison purposes, so re-use its cmpRow property for our distinct
+ // filter here.
+ toUse.cmpRow = ""
+ if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder)) {
+ continue
+ }
+ }
+ if !cb(toUse.key, toUse.data) {
+ break
+ }
+ }
+
+ return nil
+}
+
+// toComparableString computes the byte-sortable 'order' string for the given
+// key/PropertyMap.
+//
+// * start/end are byte sequences which are the inequality bounds of the
+// query, if any. These are a serialized datastore.Property. If the
+// inequality column is inverted, then start and end are also inverted and
+// swapped with each other.
+// * order is the list of sort orders in the actual executing queries.
+// * k / pm are the data to derive a sortable string for.
+//
+// The result of this function is the series of serialized properties, one per
+// order column, which represent this key/pm's first entry in the composite
+// index that would point to it (e.g. the one with `order` sort orders).
+func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
+ doCmp := true
+ soFar := []byte{}
+ ps := serialize.PropertyMapPartially(k, nil)
+ for _, ord := range order {
+ row, ok := ps[ord.Property]
+ if !ok {
+ if vals, ok := pm[ord.Property]; ok {
+ row = serialize.PropertySlice(vals)
+ }
+ }
+ sort.Sort(row)
+ foundOne := false
+ for _, serialized := range row {
+ if ord.Descending {
+ serialized = serialize.Invert(serialized)
+ }
+ if doCmp {
+ maybe := serialize.Join(soFar, serialized)
+ cmp := bytes.Compare(maybe, start)
+ if cmp >= 0 {
+ foundOne = true
+ soFar = maybe
+ doCmp = len(soFar) < len(start)
+ break
+ }
+ } else {
+ foundOne = true
+ soFar = serialize.Join(soFar, serialized)
+ break
+ }
+ }
+ if !foundOne {
+ return nil, nil
+ }
+ }
+ if end != nil && bytes.Compare(soFar, end) >= 0 {
+ return nil, nil
+ }
+ return soFar, ps["__key__"][0]
+}

Powered by Google App Engine
This is Rietveld 408576698