Index: impl/memory/datastore_query.go |
diff --git a/impl/memory/datastore_query.go b/impl/memory/datastore_query.go |
index 09460a87025ae5f7f6bb437155173ace54014a8b..47d9199df518b9c342ac62ac10c48fa87b04c8e8 100644 |
--- a/impl/memory/datastore_query.go |
+++ b/impl/memory/datastore_query.go |
@@ -5,6 +5,8 @@ |
package memory |
import ( |
+ "bytes" |
+ "encoding/base64" |
"errors" |
"fmt" |
"math" |
@@ -12,6 +14,7 @@ import ( |
ds "github.com/luci/gae/service/datastore" |
"github.com/luci/gae/service/datastore/serialize" |
+ "github.com/luci/luci-go/common/cmpbin" |
) |
// MaxQueryComponents was lifted from a hard-coded constant in dev_appserver. |
@@ -61,32 +64,56 @@ func parseFilter(f string) (prop string, op queryOp, err error) { |
return |
} |
-type queryCursor string |
+// A queryCursor is: |
+// {#orders} ++ IndexColumn* ++ RawRowData |
+type queryCursor []byte |
-func (q queryCursor) String() string { return string(q) } |
-func (q queryCursor) Valid() bool { return q != "" } |
+func (q queryCursor) String() string { |
+ return base64.URLEncoding.EncodeToString([]byte(q)) |
+} |
+ |
+func (q queryCursor) decode() (cols []ds.IndexColumn, data []byte, err error) { |
+ buf := bytes.NewBuffer([]byte(q)) |
+ count, _, err := cmpbin.ReadUint(buf) |
+ if err != nil { |
+ err = fmt.Errorf("invalid cursor") |
+ return |
+ } |
+ |
+ if count > ds.MaxIndexColumns { |
+ err = fmt.Errorf("invalid cursor") |
+ return |
+ } |
+ |
+ cols = make([]ds.IndexColumn, count) |
+ for i := range cols { |
+ if cols[i], err = serialize.ReadIndexColumn(buf); err != nil { |
+ err = fmt.Errorf("invalid cursor") |
+ return |
+ } |
+ } |
+ |
+ data = buf.Bytes() |
+ return |
+} |
type queryIneqFilter struct { |
prop string |
- low *string |
- high *string |
+ low []byte |
+ high []byte |
} |
-func increment(bstr string, positive bool) string { |
+func increment(bstr []byte) []byte { |
+ // Copy bstr |
+ bstr = bjoin(bstr) |
+ |
lastIdx := len(bstr) - 1 |
- last := bstr[lastIdx] |
- if positive { |
- if last == 0xFF { |
- return bstr + "\x00" |
- } |
- return bstr[:lastIdx] + string([]byte{last + 1}) |
- } else { |
- if last == 0 { |
- return bstr[:lastIdx-1] |
- } |
- return bstr[:lastIdx] + string([]byte{last - 1}) |
+ if bstr[lastIdx] == 0xFF { |
+ return append(bstr, 0) |
} |
+ bstr[lastIdx] += 1 |
+ return bstr |
} |
// constrain 'folds' a new inequality into the current inequality filter. |
@@ -95,24 +122,24 @@ func increment(bstr string, positive bool) string { |
// constraint does so. |
// |
// It returns true iff the filter is overconstrained (i.e. low > high) |
-func (q *queryIneqFilter) constrain(op queryOp, val string) bool { |
+func (q *queryIneqFilter) constrain(op queryOp, val []byte) bool { |
switch op { |
- case qLessThan: |
- val = increment(val, true) |
- fallthrough |
case qLessEq: |
+ val = increment(val) |
+ fallthrough |
+ case qLessThan: |
// adjust upper bound downwards |
- if q.high == nil || *q.high > val { |
- q.high = &val |
+ if q.high == nil || bytes.Compare(q.high, val) > 0 { |
+ q.high = val |
} |
case qGreaterThan: |
- val = increment(val, false) |
+ val = increment(val) |
fallthrough |
case qGreaterEq: |
// adjust lower bound upwards |
- if q.low == nil || *q.low < val { |
- q.low = &val |
+ if q.low == nil || bytes.Compare(q.low, val) < 0 { |
+ q.low = val |
} |
default: |
@@ -120,7 +147,7 @@ func (q *queryIneqFilter) constrain(op queryOp, val string) bool { |
} |
if q.low != nil && q.high != nil { |
- return *q.low > *q.high |
+ return bytes.Compare(q.low, q.high) >= 0 |
} |
return false |
} |
@@ -135,7 +162,7 @@ type queryImpl struct { |
eqFilters map[string]map[string]struct{} |
ineqFilter queryIneqFilter |
order []ds.IndexColumn |
- project map[string]struct{} |
+ project []string |
distinct bool |
eventualConsistency bool |
@@ -151,20 +178,31 @@ type queryImpl struct { |
var _ ds.Query = (*queryImpl)(nil) |
-func (q *queryImpl) valid(ns string, isTxn bool) (done bool, err error) { |
- if q.err == errQueryDone { |
- done = true |
- } else if q.err != nil { |
- err = q.err |
- } else if ns != q.ns { |
- err = errors.New( |
+func sortOrdersEqual(as, bs []ds.IndexColumn) bool { |
+ if len(as) != len(bs) { |
+ return false |
+ } |
+ for i, a := range as { |
+ if a != bs[i] { |
+ return false |
+ } |
+ } |
+ return true |
+} |
+ |
+func (q *queryImpl) prep(ns string, isTxn bool) (*reducedQuery, error) { |
+ if q.err != nil { |
+ return nil, q.err |
+ } |
+ if ns != q.ns { |
+ return nil, errors.New( |
"gae/memory: Namespace mismatched. Query and Datastore don't agree " + |
"on the current namespace") |
} else if isTxn && q.ancestor == nil { |
- err = errors.New( |
+ return nil, errors.New( |
"gae/memory: Only ancestor queries are allowed inside transactions") |
} else if q.numComponents() > MaxQueryComponents { |
- err = fmt.Errorf( |
+ return nil, fmt.Errorf( |
"gae/memory: query is too large. may not have more than "+ |
"%d filters + sort orders + ancestor total: had %d", |
MaxQueryComponents, q.numComponents()) |
@@ -172,10 +210,96 @@ func (q *queryImpl) valid(ns string, isTxn bool) (done bool, err error) { |
// This must be delayed, because q.Distinct().Project("foo") is a valid |
// construction. If we checked this in Distinct, it could be too early, and |
// checking it in Project doesn't matter. |
- err = errors.New( |
+ return nil, errors.New( |
"gae/memory: Distinct() only makes sense on projection queries.") |
} |
- return |
+ ret := &reducedQuery{ |
+ ns: q.ns, |
+ kind: q.kind, |
+ eqFilters: q.eqFilters, |
+ suffixFormat: q.order, |
+ } |
+ if len(ret.suffixFormat) == 0 && q.ineqFilter.prop != "" { |
+ ret.suffixFormat = []ds.IndexColumn{{Property: q.ineqFilter.prop}} |
+ } |
+ if q.ancestor != nil { |
+ ret.ancestor = serialize.ToBytes(q.ancestor) |
+ } |
+ |
+ // The inequality is specified in natural (ascending) order in the query, but |
+ // the order information may indicate to use an descending index column. If |
+ // that's the case, then we must invert, swap and increment the inequality |
+ // endpoints. |
+ // |
+ // Invert so that the desired numbers are represented correctly in the index. |
+ // Swap so that our iterators still go from >= low to < high. |
+ // Increment so that >= and < get correctly bounded (since the iterator is |
+ // still using natrual ordering) |
+ shouldInvertIneq := false |
+ if len(ret.suffixFormat) > 0 && ret.suffixFormat[0].Direction == ds.DESCENDING { |
+ shouldInvertIneq = true |
+ } |
+ |
+ err := error(nil) |
+ cols := []ds.IndexColumn(nil) |
+ if q.start != nil { |
+ cols, ret.low, err = q.start.decode() |
+ if err != nil { |
+ return nil, err |
+ } |
+ if !sortOrdersEqual(cols, ret.suffixFormat) { |
+ return nil, errors.New("gae/memory: start cursor is invalid.") |
+ } |
+ } else { |
+ if shouldInvertIneq { |
+ ret.high = increment(invert(q.ineqFilter.low)) |
+ } else { |
+ ret.low = q.ineqFilter.low |
+ } |
+ } |
+ |
+ if q.end != nil { |
+ cols, ret.high, err = q.end.decode() |
+ if err != nil { |
+ return nil, err |
+ } |
+ if !sortOrdersEqual(cols, ret.suffixFormat) { |
+ return nil, errors.New("gae/memory: end cursor is invalid.") |
+ } |
+ } else { |
+ if shouldInvertIneq { |
+ ret.low = increment(invert(q.ineqFilter.high)) |
+ } else { |
+ ret.high = q.ineqFilter.high |
+ } |
+ } |
+ |
+ // Add any projection columns not mentioned in the user-defined order as |
+ // ASCENDING orders. Technically we could be smart and automatically use |
+ // a DESCENDING ordered index, if it fit, but the logic gets insane, since all |
+ // suffixes of all used indexes need to be PRECISELY equal (and so you'd have |
+ // to hunt/invalidate/something to find the combination of indexes that are |
+ // compatible with each other as well as the query). If you want to use |
+ // a DESCENDING column, just add it to the user sort order, and this loop will |
+ // not synthesize a new sort order for it. |
+ originalStop := len(ret.suffixFormat) |
+ for _, p := range q.project { |
+ needAdd := true |
+ // originalStop prevents this loop from getting longer every time we add |
+ // a projected property. |
+ for _, col := range ret.suffixFormat[:originalStop] { |
+ if col.Property == p { |
+ needAdd = false |
+ break |
+ } |
+ } |
+ if needAdd { |
+ ret.suffixFormat = append(ret.suffixFormat, ds.IndexColumn{Property: p}) |
+ } |
+ } |
+ |
+ return ret, nil |
+ |
} |
func (q *queryImpl) numComponents() int { |
@@ -197,21 +321,6 @@ func (q *queryImpl) numComponents() int { |
return numComponents |
} |
-func (q *queryImpl) calculateIndex() *ds.IndexDefinition { |
- // as a nod to simplicity in this code, we'll require that a single index |
- // is able to service the entire query. E.g. no zigzag merge joins or |
- // multiqueries. This will mean that the user will need to rely on |
- // dev_appserver to tell them what indicies they need for real, and for thier |
- // tests they'll need to specify the missing composite indices manually. |
- // |
- // This COULD lead to an exploding indicies problem, but we can fix that when |
- // we get to it. |
- |
- //sortOrders := []qSortBy{} |
- |
- return nil |
-} |
- |
// checkMutateClone sees if the query has an error. If not, it clones the query, |
// and assigns the output of `check` to the query error slot. If check returns |
// nil, it calls `mutate` on the cloned query. The (possibly new) query is then |
@@ -230,10 +339,8 @@ func (q *queryImpl) checkMutateClone(check func() error, mutate func(*queryImpl) |
} |
nq.order = make([]ds.IndexColumn, len(q.order)) |
copy(nq.order, q.order) |
- nq.project = make(map[string]struct{}, len(q.project)) |
- for f := range q.project { |
- nq.project[f] = struct{}{} |
- } |
+ nq.project = make([]string, len(q.project)) |
+ copy(nq.project, q.project) |
if check != nil { |
nq.err = check() |
} |
@@ -250,6 +357,9 @@ func (q *queryImpl) Ancestor(k ds.Key) ds.Query { |
// SDK has an explicit nil-check |
return errors.New("datastore: nil query ancestor") |
} |
+ if k.Namespace() != q.ns { |
+ return fmt.Errorf("bad namespace: %q (expected %q)", k.Namespace(), q.ns) |
+ } |
if !k.Valid(false, globalAppID, q.ns) { |
// technically the SDK implementation does a Weird Thing (tm) if both the |
// stringID and intID are set on a key; it only serializes the stringID in |
@@ -258,9 +368,6 @@ func (q *queryImpl) Ancestor(k ds.Key) ds.Query { |
// just swap to an error here. |
return ds.ErrInvalidKey |
} |
- if k.Namespace() != q.ns { |
- return fmt.Errorf("bad namespace: %q (expected %q)", k.Namespace(), q.ns) |
- } |
if q.ancestor != nil { |
return errors.New("cannot have more than one ancestor") |
} |
@@ -280,7 +387,7 @@ func (q *queryImpl) Distinct() ds.Query { |
func (q *queryImpl) Filter(fStr string, val interface{}) ds.Query { |
prop := "" |
op := qInvalid |
- binVal := "" |
+ binVal := []byte(nil) |
return q.checkMutateClone( |
func() error { |
var err error |
@@ -328,21 +435,25 @@ func (q *queryImpl) Filter(fStr string, val interface{}) ds.Query { |
"first sort order must match inequality filter: %q v %q", |
q.order[0].Property, prop) |
} |
- } else if _, ok := q.project[prop]; ok { |
- return fmt.Errorf( |
- "cannot project on field which is used in an equality filter: %q", |
- prop) |
+ } else { |
+ for _, p := range q.project { |
+ if p == prop { |
+ return fmt.Errorf( |
+ "cannot project on field which is used in an equality filter: %q", |
+ prop) |
+ } |
+ } |
} |
- binVal = string(serialize.ToBytes(p)) |
+ binVal = serialize.ToBytes(p) |
return err |
}, |
func(q *queryImpl) { |
if op == qEqual { |
// add it to eq filters |
if _, ok := q.eqFilters[prop]; !ok { |
- q.eqFilters[prop] = map[string]struct{}{binVal: {}} |
+ q.eqFilters[prop] = map[string]struct{}{string(binVal): {}} |
} else { |
- q.eqFilters[prop][binVal] = struct{}{} |
+ q.eqFilters[prop][string(binVal)] = struct{}{} |
} |
// remove it from sort orders. |
@@ -419,24 +530,32 @@ func (q *queryImpl) Project(fieldName ...string) ds.Query { |
if q.keysOnly { |
return errors.New("cannot project a keysOnly query") |
} |
+ dupCheck := map[string]struct{}{} |
for _, f := range fieldName { |
+ if _, ok := dupCheck[f]; ok { |
+ return fmt.Errorf("cannot project on the same field twice: %q", f) |
+ } |
+ dupCheck[f] = struct{}{} |
if f == "" { |
return errors.New("cannot project on an empty field name") |
} |
- if strings.HasPrefix(f, "__") && strings.HasSuffix(f, "__") { |
- return fmt.Errorf("cannot project on %q", f) |
+ if f == "__key__" { |
+ return fmt.Errorf("cannot project on __key__") |
} |
if _, ok := q.eqFilters[f]; ok { |
return fmt.Errorf( |
"cannot project on field which is used in an equality filter: %q", f) |
} |
+ for _, p := range q.project { |
+ if p == f { |
+ return fmt.Errorf("cannot project on the same field twice: %q", f) |
+ } |
+ } |
} |
return nil |
}, |
func(q *queryImpl) { |
- for _, f := range fieldName { |
- q.project[f] = struct{}{} |
- } |
+ q.project = append(q.project, fieldName...) |
}) |
} |
@@ -482,18 +601,24 @@ func (q *queryImpl) Offset(offset int) ds.Query { |
}) |
} |
+func queryCursorCheck(ns, flavor string, current queryCursor, newCursor ds.Cursor) (queryCursor, error) { |
+ if current != nil { |
+ return nil, fmt.Errorf("%s cursor is multiply defined", flavor) |
+ } |
+ curs, ok := newCursor.(queryCursor) |
+ if !ok { |
+ return nil, fmt.Errorf("%s cursor is unknown type: %T", flavor, curs) |
+ } |
+ _, _, err := curs.decode() |
+ return curs, err |
+} |
+ |
func (q *queryImpl) Start(c ds.Cursor) ds.Query { |
- curs := queryCursor("") |
+ curs := queryCursor(nil) |
return q.checkMutateClone( |
- func() error { |
- ok := false |
- if curs, ok = c.(queryCursor); !ok { |
- return fmt.Errorf("start cursor is unknown type: %T", c) |
- } |
- if !curs.Valid() { |
- return errors.New("datastore: invalid cursor") |
- } |
- return nil |
+ func() (err error) { |
+ curs, err = queryCursorCheck(q.ns, "start", q.start, c) |
+ return |
}, |
func(q *queryImpl) { |
q.start = curs |
@@ -501,17 +626,11 @@ func (q *queryImpl) Start(c ds.Cursor) ds.Query { |
} |
func (q *queryImpl) End(c ds.Cursor) ds.Query { |
- curs := queryCursor("") |
+ curs := queryCursor(nil) |
return q.checkMutateClone( |
- func() error { |
- ok := false |
- if curs, ok = c.(queryCursor); !ok { |
- return fmt.Errorf("end cursor is unknown type: %T", c) |
- } |
- if !curs.Valid() { |
- return errors.New("datastore: invalid cursor") |
- } |
- return nil |
+ func() (err error) { |
+ curs, err = queryCursorCheck(q.ns, "end", q.end, c) |
+ return |
}, |
func(q *queryImpl) { |
q.end = curs |