Chromium Code Reviews| Index: filter/txnBuf/query_merger.go | 
| diff --git a/filter/txnBuf/query_merger.go b/filter/txnBuf/query_merger.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..b21c1391ce93c1be256530df9c416718e24d1209 | 
| --- /dev/null | 
| +++ b/filter/txnBuf/query_merger.go | 
| @@ -0,0 +1,295 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package txnBuf | 
| + | 
| +import ( | 
| + "bytes" | 
| + "sort" | 
| + "sync" | 
| + | 
| + "github.com/luci/gae/impl/memory" | 
| + ds "github.com/luci/gae/service/datastore" | 
| + "github.com/luci/gae/service/datastore/serialize" | 
| + "github.com/luci/luci-go/common/stringset" | 
| +) | 
| + | 
| +// queryToIter takes a FinalizedQuery and returns an iterator function which | 
| +// will produce either *items or errors. | 
| +// | 
| +// - d is the raw datastore to run this query on | 
| +// - filter is a function which will return true if the given key should be | 
| +// excluded from the result set. | 
| +func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterface, filter func(string) bool) func() (*item, error) { | 
| + c := make(chan *item) | 
| + | 
| + go func() { | 
| + defer close(c) | 
| + | 
| + err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) (keepGoing bool) { | 
| + i := &item{key: k, data: pm} | 
| + encKey := i.getEncKey() | 
| + if filter != nil && filter(encKey) { | 
| + return true | 
| + } | 
| + | 
| + select { | 
| + case c <- i: | 
| + return true | 
| + case <-stopChan: | 
| + return false | 
| + } | 
| + }) | 
| + if err != nil { | 
| + c <- &item{err: err} | 
| + } | 
| + }() | 
| + | 
| + return func() (*item, error) { | 
| + for { | 
| 
 
dnj
2015/09/30 16:35:26
No need for "for" loop; you always return.
 
iannucci
2015/09/30 17:10:28
wat. oh, I think i used to have the filter check i
 
 | 
| + itm := <-c | 
| + if itm == nil { | 
| + return nil, nil | 
| + } | 
| + if itm.err != nil { | 
| + return nil, itm.err | 
| + } | 
| + return itm, nil | 
| + } | 
| + } | 
| +} | 
| + | 
| +// adjustQuery applies various mutations to the query to make it suitable for | 
| +// merging. In general, this removes limits and offsets the 'distinct' modifier, | 
| +// and it ensures that if there are sort orders which won't appear in the | 
| +// result data that the query is transformed into a projection query which | 
| +// contains all of the data. A non-projection query will never be transformed | 
| +// in this way. | 
| +func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { | 
| + q := fq.Original() | 
| + | 
| + // The limit and offset must be done in-memory because otherwise we may | 
| + // request too few entities from the underlying store if many matching | 
| + // entities have been deleted in the buffered transaction. | 
| + q = q.Limit(-1) | 
| + q = q.Offset(-1) | 
| + | 
| + // distinction must be done in-memory, because otherwise there's no way | 
| + // to merge in the effect of the in-flight changes (because there's no way | 
| + // to push back to the datastore "yeah, I know you told me that the (1, 2) | 
| + // result came from `/Bob,1`, but would you mind pretending that it didn't | 
| + // and tell me next the one instead? | 
| + if fq.Distinct() { | 
| 
 
dnj
2015/09/30 16:35:26
nit: ditch the "if", just say q = q.Distinct(false
 
iannucci
2015/09/30 17:10:28
done
 
 | 
| + q = q.Distinct(false) | 
| + } | 
| + | 
| + // since we need to merge results, we must have all order-related fields | 
| + // in each result. The only time we wouldn't have all the data available would | 
| + // be for a keys-only or projection query. To fix this, we convert all | 
| + // Projection and KeysOnly queries to project on /all/ Orders. | 
| + // | 
| + // FinalizedQuery already guarantees that all projected fields show up in | 
| + // the Orders, but the projected fields could be a subset of the orders. | 
| + // | 
| + // Additionally on a keys-only query, any orders other than __key__ require | 
| + // conversion of this query to a projection query including those orders in | 
| + // order to merge the results correctly. | 
| + // | 
| + // In both cases, the resulting objects returned to the higher layers of | 
| + // the stack will only include the information requested by the user; keysonly | 
| 
 
dnj
2015/09/30 16:35:26
nit: keys-only
 
iannucci
2015/09/30 17:10:27
done
 
 | 
| + // queries will discard all PropertyMap data, and projection queries will | 
| + // discard any field data that the user didn't ask for. | 
| + orders := fq.Orders() | 
| + if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) { | 
| + q = q.KeysOnly(false) | 
| + | 
| + for _, o := range orders { | 
| + if o.Property == "__key__" { | 
| + continue | 
| + } | 
| + q = q.Project(o.Property) | 
| + } | 
| + } | 
| + | 
| + return q.Finalize() | 
| 
 
dnj
2015/09/30 16:35:26
All hail editable queries!
 
iannucci
2015/09/30 17:10:28
indeed :)
 
 | 
| +} | 
| + | 
| +// runMergedQueries executes a user query `fq` against the parent datastore as | 
| +// well as the in-memory datastore, calling `cb` with the merged result set. | 
| +// | 
| +// It's expected that the caller of this function will apply limit and offset | 
| +// if the query contains those restrictions. This may convert the query to | 
| +// an expanded projection query with more data than the user asked for. It's the | 
| +// caller's responsibility to prune away the extra data. | 
| +// | 
| +// See also `dsTxnBuf.Run()`. | 
| +func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error { | 
| 
 
dnj
2015/09/30 16:35:26
nit: This function signature is huge. Can you line
 
iannucci
2015/09/30 17:10:28
done
 
 | 
| + toRun, err := adjustQuery(fq) | 
| + if err != nil { | 
| + return err | 
| + } | 
| + | 
| + cmpLower, cmpUpper := memory.GetBinaryBounds(fq) | 
| + cmpOrder := fq.Orders() | 
| + cmpFn := func(i *item) string { | 
| + return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) | 
| + } | 
| + | 
| + dedup := stringset.Set(nil) | 
| + distinct := stringset.Set(nil) | 
| + distinctOrder := []ds.IndexColumn(nil) | 
| + if len(fq.Project()) > 0 { // the original query was a projection query | 
| + if fq.Distinct() { | 
| + // it was a distinct projection query, so we need to dedup by distinct | 
| + // options. | 
| + distinct = stringset.New(0) | 
| + proj := fq.Project() | 
| + distinctOrder = make([]ds.IndexColumn, len(proj)) | 
| + for i, p := range proj { | 
| + distinctOrder[i].Property = p | 
| + } | 
| + } | 
| + } else { | 
| + // the original was a normal or keysonly query, so we need to dedup by keys | 
| 
 
dnj
2015/09/30 16:35:26
nit: keys-only. Also period at the end of comment.
 
iannucci
2015/09/30 17:10:28
done
 
 | 
| + dedup = stringset.New(0) | 
| + } | 
| + | 
| + // need lock around dedup since it's not threadsafe and we read/write it in | 
| + // different goroutines. No lock is needed around state.entState.has because | 
| + // the whole transaction is locked during this query and so the entState is | 
| + // effectively read-only. | 
| + dedupLock := sync.Mutex{} | 
| + dedupFn := (func(string) bool)(nil) | 
| + if dedup != nil { | 
| + dedupFn = func(val string) bool { | 
| + dedupLock.Lock() | 
| + defer dedupLock.Unlock() | 
| + return dedup.Has(val) | 
| + } | 
| + } | 
| + | 
| + stopChan := make(chan struct{}) | 
| + | 
| + parItemGet := queryToIter(stopChan, toRun, parentDS, func(key string) bool { | 
| + return sizes.has(key) || (dedupFn != nil && dedupFn(key)) | 
| + }) | 
| + memItemGet := queryToIter(stopChan, toRun, memDS, dedupFn) | 
| + | 
| + defer func() { | 
| + close(stopChan) | 
| + parItemGet() | 
| + memItemGet() | 
| + }() | 
| + | 
| + pitm, err := parItemGet() | 
| + if err != nil { | 
| + return err | 
| + } | 
| + | 
| + mitm, err := memItemGet() | 
| + if err != nil { | 
| + return err | 
| + } | 
| + | 
| + for { | 
| + usePitm := pitm != nil | 
| + if pitm != nil && mitm != nil { | 
| + usePitm = cmpFn(pitm) < cmpFn(mitm) | 
| + } else if pitm == nil && mitm == nil { | 
| + break | 
| + } | 
| + | 
| + toUse := (*item)(nil) | 
| + if usePitm { | 
| + toUse = pitm | 
| + if pitm, err = parItemGet(); err != nil { | 
| + return err | 
| 
 
dnj
2015/09/30 16:35:26
Should punt toUse before failing from error gettin
 
iannucci
2015/09/30 17:10:28
ah, yeah, ok.
 
 | 
| + } | 
| + } else { | 
| + toUse = mitm | 
| + if mitm, err = memItemGet(); err != nil { | 
| + return err | 
| + } | 
| + } | 
| + | 
| + if dedup != nil { | 
| + encKey := toUse.getEncKey() | 
| + dedupLock.Lock() | 
| + added := dedup.Add(encKey) | 
| + dedupLock.Unlock() | 
| + if !added { | 
| + continue | 
| + } | 
| + } | 
| + if distinct != nil { | 
| + // NOTE: We know that toUse will not be used after this point for | 
| + // comparison purposes, so re-use its cmpRow property for our distinct | 
| + // filter here. | 
| + toUse.cmpRow = "" | 
| + if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder)) { | 
| + continue | 
| + } | 
| + } | 
| + if !cb(toUse.key, toUse.data) { | 
| + break | 
| + } | 
| + } | 
| + | 
| + return nil | 
| +} | 
| + | 
| +// toComparableString computes the byte-sortable 'order' string for the given | 
| +// key/PropertyMap. | 
| +// | 
| +// * start/end are byte sequences which are the inequality bounds of the | 
| +// query, if any. These are a serialized datastore.Property. If the | 
| +// inequality column is inverted, then start and end are also inverted and | 
| +// swapped with each other. | 
| +// * order is the list of sort orders in the actual executing queries. | 
| +// * k / pm are the data to derive a sortable string for. | 
| +// | 
| +// The result of this function is the series of serialized properties, one per | 
| +// order column, which represent this key/pm's first entry in the composite | 
| +// index that would point to it (e.g. the one with `order` sort orders). | 
| +func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) { | 
| + doCmp := true | 
| + soFar := []byte{} | 
| + ps := serialize.PropertyMapPartially(k, nil) | 
| + for _, ord := range order { | 
| + row, ok := ps[ord.Property] | 
| + if !ok { | 
| + if vals, ok := pm[ord.Property]; ok { | 
| + row = serialize.PropertySlice(vals) | 
| + } | 
| + } | 
| + sort.Sort(row) | 
| + foundOne := false | 
| + for _, serialized := range row { | 
| + if ord.Descending { | 
| + serialized = serialize.Invert(serialized) | 
| + } | 
| + if doCmp { | 
| + maybe := serialize.Join(soFar, serialized) | 
| + cmp := bytes.Compare(maybe, start) | 
| + if cmp >= 0 { | 
| + foundOne = true | 
| + soFar = maybe | 
| + doCmp = len(soFar) < len(start) | 
| + break | 
| + } | 
| + } else { | 
| + foundOne = true | 
| + soFar = serialize.Join(soFar, serialized) | 
| + break | 
| + } | 
| + } | 
| + if !foundOne { | 
| + return nil, nil | 
| + } | 
| + } | 
| + if end != nil && bytes.Compare(soFar, end) >= 0 { | 
| + return nil, nil | 
| + } | 
| + return soFar, ps["__key__"][0] | 
| +} |