Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Unified Diff: impl/memory/datastore_query.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: impl/memory/datastore_query.go
diff --git a/impl/memory/datastore_query.go b/impl/memory/datastore_query.go
index 09460a87025ae5f7f6bb437155173ace54014a8b..47d9199df518b9c342ac62ac10c48fa87b04c8e8 100644
--- a/impl/memory/datastore_query.go
+++ b/impl/memory/datastore_query.go
@@ -5,6 +5,8 @@
package memory
import (
+ "bytes"
+ "encoding/base64"
"errors"
"fmt"
"math"
@@ -12,6 +14,7 @@ import (
ds "github.com/luci/gae/service/datastore"
"github.com/luci/gae/service/datastore/serialize"
+ "github.com/luci/luci-go/common/cmpbin"
)
// MaxQueryComponents was lifted from a hard-coded constant in dev_appserver.
@@ -61,32 +64,56 @@ func parseFilter(f string) (prop string, op queryOp, err error) {
return
}
-type queryCursor string
+// A queryCursor is:
+// {#orders} ++ IndexColumn* ++ RawRowData
+type queryCursor []byte
-func (q queryCursor) String() string { return string(q) }
-func (q queryCursor) Valid() bool { return q != "" }
+func (q queryCursor) String() string {
+ return base64.URLEncoding.EncodeToString([]byte(q))
+}
+
+func (q queryCursor) decode() (cols []ds.IndexColumn, data []byte, err error) {
+ buf := bytes.NewBuffer([]byte(q))
+ count, _, err := cmpbin.ReadUint(buf)
+ if err != nil {
+ err = fmt.Errorf("invalid cursor")
+ return
+ }
+
+ if count > ds.MaxIndexColumns {
+ err = fmt.Errorf("invalid cursor")
+ return
+ }
+
+ cols = make([]ds.IndexColumn, count)
+ for i := range cols {
+ if cols[i], err = serialize.ReadIndexColumn(buf); err != nil {
+ err = fmt.Errorf("invalid cursor")
+ return
+ }
+ }
+
+ data = buf.Bytes()
+ return
+}
type queryIneqFilter struct {
prop string
- low *string
- high *string
+ low []byte
+ high []byte
}
-func increment(bstr string, positive bool) string {
+func increment(bstr []byte) []byte {
+ // Copy bstr
+ bstr = bjoin(bstr)
+
lastIdx := len(bstr) - 1
- last := bstr[lastIdx]
- if positive {
- if last == 0xFF {
- return bstr + "\x00"
- }
- return bstr[:lastIdx] + string([]byte{last + 1})
- } else {
- if last == 0 {
- return bstr[:lastIdx-1]
- }
- return bstr[:lastIdx] + string([]byte{last - 1})
+ if bstr[lastIdx] == 0xFF {
+ return append(bstr, 0)
}
+ bstr[lastIdx] += 1
+ return bstr
}
// constrain 'folds' a new inequality into the current inequality filter.
@@ -95,24 +122,24 @@ func increment(bstr string, positive bool) string {
// constraint does so.
//
// It returns true iff the filter is overconstrained (i.e. low > high)
-func (q *queryIneqFilter) constrain(op queryOp, val string) bool {
+func (q *queryIneqFilter) constrain(op queryOp, val []byte) bool {
switch op {
- case qLessThan:
- val = increment(val, true)
- fallthrough
case qLessEq:
+ val = increment(val)
+ fallthrough
+ case qLessThan:
// adjust upper bound downwards
- if q.high == nil || *q.high > val {
- q.high = &val
+ if q.high == nil || bytes.Compare(q.high, val) > 0 {
+ q.high = val
}
case qGreaterThan:
- val = increment(val, false)
+ val = increment(val)
fallthrough
case qGreaterEq:
// adjust lower bound upwards
- if q.low == nil || *q.low < val {
- q.low = &val
+ if q.low == nil || bytes.Compare(q.low, val) < 0 {
+ q.low = val
}
default:
@@ -120,7 +147,7 @@ func (q *queryIneqFilter) constrain(op queryOp, val string) bool {
}
if q.low != nil && q.high != nil {
- return *q.low > *q.high
+ return bytes.Compare(q.low, q.high) >= 0
}
return false
}
@@ -135,7 +162,7 @@ type queryImpl struct {
eqFilters map[string]map[string]struct{}
ineqFilter queryIneqFilter
order []ds.IndexColumn
- project map[string]struct{}
+ project []string
distinct bool
eventualConsistency bool
@@ -151,20 +178,31 @@ type queryImpl struct {
var _ ds.Query = (*queryImpl)(nil)
-func (q *queryImpl) valid(ns string, isTxn bool) (done bool, err error) {
- if q.err == errQueryDone {
- done = true
- } else if q.err != nil {
- err = q.err
- } else if ns != q.ns {
- err = errors.New(
+func sortOrdersEqual(as, bs []ds.IndexColumn) bool {
+ if len(as) != len(bs) {
+ return false
+ }
+ for i, a := range as {
+ if a != bs[i] {
+ return false
+ }
+ }
+ return true
+}
+
+func (q *queryImpl) prep(ns string, isTxn bool) (*reducedQuery, error) {
+ if q.err != nil {
+ return nil, q.err
+ }
+ if ns != q.ns {
+ return nil, errors.New(
"gae/memory: Namespace mismatched. Query and Datastore don't agree " +
"on the current namespace")
} else if isTxn && q.ancestor == nil {
- err = errors.New(
+ return nil, errors.New(
"gae/memory: Only ancestor queries are allowed inside transactions")
} else if q.numComponents() > MaxQueryComponents {
- err = fmt.Errorf(
+ return nil, fmt.Errorf(
"gae/memory: query is too large. may not have more than "+
"%d filters + sort orders + ancestor total: had %d",
MaxQueryComponents, q.numComponents())
@@ -172,10 +210,96 @@ func (q *queryImpl) valid(ns string, isTxn bool) (done bool, err error) {
// This must be delayed, because q.Distinct().Project("foo") is a valid
// construction. If we checked this in Distinct, it could be too early, and
// checking it in Project doesn't matter.
- err = errors.New(
+ return nil, errors.New(
"gae/memory: Distinct() only makes sense on projection queries.")
}
- return
+ ret := &reducedQuery{
+ ns: q.ns,
+ kind: q.kind,
+ eqFilters: q.eqFilters,
+ suffixFormat: q.order,
+ }
+ if len(ret.suffixFormat) == 0 && q.ineqFilter.prop != "" {
+ ret.suffixFormat = []ds.IndexColumn{{Property: q.ineqFilter.prop}}
+ }
+ if q.ancestor != nil {
+ ret.ancestor = serialize.ToBytes(q.ancestor)
+ }
+
+ // The inequality is specified in natural (ascending) order in the query, but
+ // the order information may indicate to use an descending index column. If
+ // that's the case, then we must invert, swap and increment the inequality
+ // endpoints.
+ //
+ // Invert so that the desired numbers are represented correctly in the index.
+ // Swap so that our iterators still go from >= low to < high.
+ // Increment so that >= and < get correctly bounded (since the iterator is
+ // still using natrual ordering)
+ shouldInvertIneq := false
+ if len(ret.suffixFormat) > 0 && ret.suffixFormat[0].Direction == ds.DESCENDING {
+ shouldInvertIneq = true
+ }
+
+ err := error(nil)
+ cols := []ds.IndexColumn(nil)
+ if q.start != nil {
+ cols, ret.low, err = q.start.decode()
+ if err != nil {
+ return nil, err
+ }
+ if !sortOrdersEqual(cols, ret.suffixFormat) {
+ return nil, errors.New("gae/memory: start cursor is invalid.")
+ }
+ } else {
+ if shouldInvertIneq {
+ ret.high = increment(invert(q.ineqFilter.low))
+ } else {
+ ret.low = q.ineqFilter.low
+ }
+ }
+
+ if q.end != nil {
+ cols, ret.high, err = q.end.decode()
+ if err != nil {
+ return nil, err
+ }
+ if !sortOrdersEqual(cols, ret.suffixFormat) {
+ return nil, errors.New("gae/memory: end cursor is invalid.")
+ }
+ } else {
+ if shouldInvertIneq {
+ ret.low = increment(invert(q.ineqFilter.high))
+ } else {
+ ret.high = q.ineqFilter.high
+ }
+ }
+
+ // Add any projection columns not mentioned in the user-defined order as
+ // ASCENDING orders. Technically we could be smart and automatically use
+ // a DESCENDING ordered index, if it fit, but the logic gets insane, since all
+ // suffixes of all used indexes need to be PRECISELY equal (and so you'd have
+ // to hunt/invalidate/something to find the combination of indexes that are
+ // compatible with each other as well as the query). If you want to use
+ // a DESCENDING column, just add it to the user sort order, and this loop will
+ // not synthesize a new sort order for it.
+ originalStop := len(ret.suffixFormat)
+ for _, p := range q.project {
+ needAdd := true
+ // originalStop prevents this loop from getting longer every time we add
+ // a projected property.
+ for _, col := range ret.suffixFormat[:originalStop] {
+ if col.Property == p {
+ needAdd = false
+ break
+ }
+ }
+ if needAdd {
+ ret.suffixFormat = append(ret.suffixFormat, ds.IndexColumn{Property: p})
+ }
+ }
+
+ return ret, nil
+
}
func (q *queryImpl) numComponents() int {
@@ -197,21 +321,6 @@ func (q *queryImpl) numComponents() int {
return numComponents
}
-func (q *queryImpl) calculateIndex() *ds.IndexDefinition {
- // as a nod to simplicity in this code, we'll require that a single index
- // is able to service the entire query. E.g. no zigzag merge joins or
- // multiqueries. This will mean that the user will need to rely on
- // dev_appserver to tell them what indicies they need for real, and for thier
- // tests they'll need to specify the missing composite indices manually.
- //
- // This COULD lead to an exploding indicies problem, but we can fix that when
- // we get to it.
-
- //sortOrders := []qSortBy{}
-
- return nil
-}
-
// checkMutateClone sees if the query has an error. If not, it clones the query,
// and assigns the output of `check` to the query error slot. If check returns
// nil, it calls `mutate` on the cloned query. The (possibly new) query is then
@@ -230,10 +339,8 @@ func (q *queryImpl) checkMutateClone(check func() error, mutate func(*queryImpl)
}
nq.order = make([]ds.IndexColumn, len(q.order))
copy(nq.order, q.order)
- nq.project = make(map[string]struct{}, len(q.project))
- for f := range q.project {
- nq.project[f] = struct{}{}
- }
+ nq.project = make([]string, len(q.project))
+ copy(nq.project, q.project)
if check != nil {
nq.err = check()
}
@@ -250,6 +357,9 @@ func (q *queryImpl) Ancestor(k ds.Key) ds.Query {
// SDK has an explicit nil-check
return errors.New("datastore: nil query ancestor")
}
+ if k.Namespace() != q.ns {
+ return fmt.Errorf("bad namespace: %q (expected %q)", k.Namespace(), q.ns)
+ }
if !k.Valid(false, globalAppID, q.ns) {
// technically the SDK implementation does a Weird Thing (tm) if both the
// stringID and intID are set on a key; it only serializes the stringID in
@@ -258,9 +368,6 @@ func (q *queryImpl) Ancestor(k ds.Key) ds.Query {
// just swap to an error here.
return ds.ErrInvalidKey
}
- if k.Namespace() != q.ns {
- return fmt.Errorf("bad namespace: %q (expected %q)", k.Namespace(), q.ns)
- }
if q.ancestor != nil {
return errors.New("cannot have more than one ancestor")
}
@@ -280,7 +387,7 @@ func (q *queryImpl) Distinct() ds.Query {
func (q *queryImpl) Filter(fStr string, val interface{}) ds.Query {
prop := ""
op := qInvalid
- binVal := ""
+ binVal := []byte(nil)
return q.checkMutateClone(
func() error {
var err error
@@ -328,21 +435,25 @@ func (q *queryImpl) Filter(fStr string, val interface{}) ds.Query {
"first sort order must match inequality filter: %q v %q",
q.order[0].Property, prop)
}
- } else if _, ok := q.project[prop]; ok {
- return fmt.Errorf(
- "cannot project on field which is used in an equality filter: %q",
- prop)
+ } else {
+ for _, p := range q.project {
+ if p == prop {
+ return fmt.Errorf(
+ "cannot project on field which is used in an equality filter: %q",
+ prop)
+ }
+ }
}
- binVal = string(serialize.ToBytes(p))
+ binVal = serialize.ToBytes(p)
return err
},
func(q *queryImpl) {
if op == qEqual {
// add it to eq filters
if _, ok := q.eqFilters[prop]; !ok {
- q.eqFilters[prop] = map[string]struct{}{binVal: {}}
+ q.eqFilters[prop] = map[string]struct{}{string(binVal): {}}
} else {
- q.eqFilters[prop][binVal] = struct{}{}
+ q.eqFilters[prop][string(binVal)] = struct{}{}
}
// remove it from sort orders.
@@ -419,24 +530,32 @@ func (q *queryImpl) Project(fieldName ...string) ds.Query {
if q.keysOnly {
return errors.New("cannot project a keysOnly query")
}
+ dupCheck := map[string]struct{}{}
for _, f := range fieldName {
+ if _, ok := dupCheck[f]; ok {
+ return fmt.Errorf("cannot project on the same field twice: %q", f)
+ }
+ dupCheck[f] = struct{}{}
if f == "" {
return errors.New("cannot project on an empty field name")
}
- if strings.HasPrefix(f, "__") && strings.HasSuffix(f, "__") {
- return fmt.Errorf("cannot project on %q", f)
+ if f == "__key__" {
+ return fmt.Errorf("cannot project on __key__")
}
if _, ok := q.eqFilters[f]; ok {
return fmt.Errorf(
"cannot project on field which is used in an equality filter: %q", f)
}
+ for _, p := range q.project {
+ if p == f {
+ return fmt.Errorf("cannot project on the same field twice: %q", f)
+ }
+ }
}
return nil
},
func(q *queryImpl) {
- for _, f := range fieldName {
- q.project[f] = struct{}{}
- }
+ q.project = append(q.project, fieldName...)
})
}
@@ -482,18 +601,24 @@ func (q *queryImpl) Offset(offset int) ds.Query {
})
}
+func queryCursorCheck(ns, flavor string, current queryCursor, newCursor ds.Cursor) (queryCursor, error) {
+ if current != nil {
+ return nil, fmt.Errorf("%s cursor is multiply defined", flavor)
+ }
+ curs, ok := newCursor.(queryCursor)
+ if !ok {
+ return nil, fmt.Errorf("%s cursor is unknown type: %T", flavor, curs)
+ }
+ _, _, err := curs.decode()
+ return curs, err
+}
+
func (q *queryImpl) Start(c ds.Cursor) ds.Query {
- curs := queryCursor("")
+ curs := queryCursor(nil)
return q.checkMutateClone(
- func() error {
- ok := false
- if curs, ok = c.(queryCursor); !ok {
- return fmt.Errorf("start cursor is unknown type: %T", c)
- }
- if !curs.Valid() {
- return errors.New("datastore: invalid cursor")
- }
- return nil
+ func() (err error) {
+ curs, err = queryCursorCheck(q.ns, "start", q.start, c)
+ return
},
func(q *queryImpl) {
q.start = curs
@@ -501,17 +626,11 @@ func (q *queryImpl) Start(c ds.Cursor) ds.Query {
}
func (q *queryImpl) End(c ds.Cursor) ds.Query {
- curs := queryCursor("")
+ curs := queryCursor(nil)
return q.checkMutateClone(
- func() error {
- ok := false
- if curs, ok = c.(queryCursor); !ok {
- return fmt.Errorf("end cursor is unknown type: %T", c)
- }
- if !curs.Valid() {
- return errors.New("datastore: invalid cursor")
- }
- return nil
+ func() (err error) {
+ curs, err = queryCursorCheck(q.ns, "end", q.end, c)
+ return
},
func(q *queryImpl) {
q.end = curs

Powered by Google App Engine
This is Rietveld 408576698