| Index: impl/memory/datastore_query.go
|
| diff --git a/impl/memory/datastore_query.go b/impl/memory/datastore_query.go
|
| index 09460a87025ae5f7f6bb437155173ace54014a8b..47d9199df518b9c342ac62ac10c48fa87b04c8e8 100644
|
| --- a/impl/memory/datastore_query.go
|
| +++ b/impl/memory/datastore_query.go
|
| @@ -5,6 +5,8 @@
|
| package memory
|
|
|
| import (
|
| + "bytes"
|
| + "encoding/base64"
|
| "errors"
|
| "fmt"
|
| "math"
|
| @@ -12,6 +14,7 @@ import (
|
|
|
| ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/gae/service/datastore/serialize"
|
| + "github.com/luci/luci-go/common/cmpbin"
|
| )
|
|
|
| // MaxQueryComponents was lifted from a hard-coded constant in dev_appserver.
|
| @@ -61,32 +64,56 @@ func parseFilter(f string) (prop string, op queryOp, err error) {
|
| return
|
| }
|
|
|
| -type queryCursor string
|
| +// A queryCursor is:
|
| +// {#orders} ++ IndexColumn* ++ RawRowData
|
| +type queryCursor []byte
|
|
|
| -func (q queryCursor) String() string { return string(q) }
|
| -func (q queryCursor) Valid() bool { return q != "" }
|
| +func (q queryCursor) String() string {
|
| + return base64.URLEncoding.EncodeToString([]byte(q))
|
| +}
|
| +
|
| +func (q queryCursor) decode() (cols []ds.IndexColumn, data []byte, err error) {
|
| + buf := bytes.NewBuffer([]byte(q))
|
| + count, _, err := cmpbin.ReadUint(buf)
|
| + if err != nil {
|
| + err = fmt.Errorf("invalid cursor")
|
| + return
|
| + }
|
| +
|
| + if count > ds.MaxIndexColumns {
|
| + err = fmt.Errorf("invalid cursor")
|
| + return
|
| + }
|
| +
|
| + cols = make([]ds.IndexColumn, count)
|
| + for i := range cols {
|
| + if cols[i], err = serialize.ReadIndexColumn(buf); err != nil {
|
| + err = fmt.Errorf("invalid cursor")
|
| + return
|
| + }
|
| + }
|
| +
|
| + data = buf.Bytes()
|
| + return
|
| +}
|
|
|
| type queryIneqFilter struct {
|
| prop string
|
|
|
| - low *string
|
| - high *string
|
| + low []byte
|
| + high []byte
|
| }
|
|
|
| -func increment(bstr string, positive bool) string {
|
| +func increment(bstr []byte) []byte {
|
| + // Copy bstr
|
| + bstr = bjoin(bstr)
|
| +
|
| lastIdx := len(bstr) - 1
|
| - last := bstr[lastIdx]
|
| - if positive {
|
| - if last == 0xFF {
|
| - return bstr + "\x00"
|
| - }
|
| - return bstr[:lastIdx] + string([]byte{last + 1})
|
| - } else {
|
| - if last == 0 {
|
| - return bstr[:lastIdx-1]
|
| - }
|
| - return bstr[:lastIdx] + string([]byte{last - 1})
|
| + if bstr[lastIdx] == 0xFF {
|
| + return append(bstr, 0)
|
| }
|
| + bstr[lastIdx] += 1
|
| + return bstr
|
| }
|
|
|
| // constrain 'folds' a new inequality into the current inequality filter.
|
| @@ -95,24 +122,24 @@ func increment(bstr string, positive bool) string {
|
| // constraint does so.
|
| //
|
| // It returns true iff the filter is overconstrained (i.e. low > high)
|
| -func (q *queryIneqFilter) constrain(op queryOp, val string) bool {
|
| +func (q *queryIneqFilter) constrain(op queryOp, val []byte) bool {
|
| switch op {
|
| - case qLessThan:
|
| - val = increment(val, true)
|
| - fallthrough
|
| case qLessEq:
|
| + val = increment(val)
|
| + fallthrough
|
| + case qLessThan:
|
| // adjust upper bound downwards
|
| - if q.high == nil || *q.high > val {
|
| - q.high = &val
|
| + if q.high == nil || bytes.Compare(q.high, val) > 0 {
|
| + q.high = val
|
| }
|
|
|
| case qGreaterThan:
|
| - val = increment(val, false)
|
| + val = increment(val)
|
| fallthrough
|
| case qGreaterEq:
|
| // adjust lower bound upwards
|
| - if q.low == nil || *q.low < val {
|
| - q.low = &val
|
| + if q.low == nil || bytes.Compare(q.low, val) < 0 {
|
| + q.low = val
|
| }
|
|
|
| default:
|
| @@ -120,7 +147,7 @@ func (q *queryIneqFilter) constrain(op queryOp, val string) bool {
|
| }
|
|
|
| if q.low != nil && q.high != nil {
|
| - return *q.low > *q.high
|
| + return bytes.Compare(q.low, q.high) >= 0
|
| }
|
| return false
|
| }
|
| @@ -135,7 +162,7 @@ type queryImpl struct {
|
| eqFilters map[string]map[string]struct{}
|
| ineqFilter queryIneqFilter
|
| order []ds.IndexColumn
|
| - project map[string]struct{}
|
| + project []string
|
|
|
| distinct bool
|
| eventualConsistency bool
|
| @@ -151,20 +178,31 @@ type queryImpl struct {
|
|
|
| var _ ds.Query = (*queryImpl)(nil)
|
|
|
| -func (q *queryImpl) valid(ns string, isTxn bool) (done bool, err error) {
|
| - if q.err == errQueryDone {
|
| - done = true
|
| - } else if q.err != nil {
|
| - err = q.err
|
| - } else if ns != q.ns {
|
| - err = errors.New(
|
| +func sortOrdersEqual(as, bs []ds.IndexColumn) bool {
|
| + if len(as) != len(bs) {
|
| + return false
|
| + }
|
| + for i, a := range as {
|
| + if a != bs[i] {
|
| + return false
|
| + }
|
| + }
|
| + return true
|
| +}
|
| +
|
| +func (q *queryImpl) prep(ns string, isTxn bool) (*reducedQuery, error) {
|
| + if q.err != nil {
|
| + return nil, q.err
|
| + }
|
| + if ns != q.ns {
|
| + return nil, errors.New(
|
| "gae/memory: Namespace mismatched. Query and Datastore don't agree " +
|
| "on the current namespace")
|
| } else if isTxn && q.ancestor == nil {
|
| - err = errors.New(
|
| + return nil, errors.New(
|
| "gae/memory: Only ancestor queries are allowed inside transactions")
|
| } else if q.numComponents() > MaxQueryComponents {
|
| - err = fmt.Errorf(
|
| + return nil, fmt.Errorf(
|
| "gae/memory: query is too large. may not have more than "+
|
| "%d filters + sort orders + ancestor total: had %d",
|
| MaxQueryComponents, q.numComponents())
|
| @@ -172,10 +210,96 @@ func (q *queryImpl) valid(ns string, isTxn bool) (done bool, err error) {
|
| // This must be delayed, because q.Distinct().Project("foo") is a valid
|
| // construction. If we checked this in Distinct, it could be too early, and
|
| // checking it in Project doesn't matter.
|
| - err = errors.New(
|
| + return nil, errors.New(
|
| "gae/memory: Distinct() only makes sense on projection queries.")
|
| }
|
| - return
|
| + ret := &reducedQuery{
|
| + ns: q.ns,
|
| + kind: q.kind,
|
| + eqFilters: q.eqFilters,
|
| + suffixFormat: q.order,
|
| + }
|
| + if len(ret.suffixFormat) == 0 && q.ineqFilter.prop != "" {
|
| + ret.suffixFormat = []ds.IndexColumn{{Property: q.ineqFilter.prop}}
|
| + }
|
| + if q.ancestor != nil {
|
| + ret.ancestor = serialize.ToBytes(q.ancestor)
|
| + }
|
| +
|
| + // The inequality is specified in natural (ascending) order in the query, but
|
| + // the order information may indicate to use an descending index column. If
|
| + // that's the case, then we must invert, swap and increment the inequality
|
| + // endpoints.
|
| + //
|
| + // Invert so that the desired numbers are represented correctly in the index.
|
| + // Swap so that our iterators still go from >= low to < high.
|
| + // Increment so that >= and < get correctly bounded (since the iterator is
|
| + // still using natrual ordering)
|
| + shouldInvertIneq := false
|
| + if len(ret.suffixFormat) > 0 && ret.suffixFormat[0].Direction == ds.DESCENDING {
|
| + shouldInvertIneq = true
|
| + }
|
| +
|
| + err := error(nil)
|
| + cols := []ds.IndexColumn(nil)
|
| + if q.start != nil {
|
| + cols, ret.low, err = q.start.decode()
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + if !sortOrdersEqual(cols, ret.suffixFormat) {
|
| + return nil, errors.New("gae/memory: start cursor is invalid.")
|
| + }
|
| + } else {
|
| + if shouldInvertIneq {
|
| + ret.high = increment(invert(q.ineqFilter.low))
|
| + } else {
|
| + ret.low = q.ineqFilter.low
|
| + }
|
| + }
|
| +
|
| + if q.end != nil {
|
| + cols, ret.high, err = q.end.decode()
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + if !sortOrdersEqual(cols, ret.suffixFormat) {
|
| + return nil, errors.New("gae/memory: end cursor is invalid.")
|
| + }
|
| + } else {
|
| + if shouldInvertIneq {
|
| + ret.low = increment(invert(q.ineqFilter.high))
|
| + } else {
|
| + ret.high = q.ineqFilter.high
|
| + }
|
| + }
|
| +
|
| + // Add any projection columns not mentioned in the user-defined order as
|
| + // ASCENDING orders. Technically we could be smart and automatically use
|
| + // a DESCENDING ordered index, if it fit, but the logic gets insane, since all
|
| + // suffixes of all used indexes need to be PRECISELY equal (and so you'd have
|
| + // to hunt/invalidate/something to find the combination of indexes that are
|
| + // compatible with each other as well as the query). If you want to use
|
| + // a DESCENDING column, just add it to the user sort order, and this loop will
|
| + // not synthesize a new sort order for it.
|
| + originalStop := len(ret.suffixFormat)
|
| + for _, p := range q.project {
|
| + needAdd := true
|
| + // originalStop prevents this loop from getting longer every time we add
|
| + // a projected property.
|
| + for _, col := range ret.suffixFormat[:originalStop] {
|
| + if col.Property == p {
|
| + needAdd = false
|
| + break
|
| + }
|
| + }
|
| + if needAdd {
|
| + ret.suffixFormat = append(ret.suffixFormat, ds.IndexColumn{Property: p})
|
| + }
|
| + }
|
| +
|
| + return ret, nil
|
| +
|
| }
|
|
|
| func (q *queryImpl) numComponents() int {
|
| @@ -197,21 +321,6 @@ func (q *queryImpl) numComponents() int {
|
| return numComponents
|
| }
|
|
|
| -func (q *queryImpl) calculateIndex() *ds.IndexDefinition {
|
| - // as a nod to simplicity in this code, we'll require that a single index
|
| - // is able to service the entire query. E.g. no zigzag merge joins or
|
| - // multiqueries. This will mean that the user will need to rely on
|
| - // dev_appserver to tell them what indicies they need for real, and for thier
|
| - // tests they'll need to specify the missing composite indices manually.
|
| - //
|
| - // This COULD lead to an exploding indicies problem, but we can fix that when
|
| - // we get to it.
|
| -
|
| - //sortOrders := []qSortBy{}
|
| -
|
| - return nil
|
| -}
|
| -
|
| // checkMutateClone sees if the query has an error. If not, it clones the query,
|
| // and assigns the output of `check` to the query error slot. If check returns
|
| // nil, it calls `mutate` on the cloned query. The (possibly new) query is then
|
| @@ -230,10 +339,8 @@ func (q *queryImpl) checkMutateClone(check func() error, mutate func(*queryImpl)
|
| }
|
| nq.order = make([]ds.IndexColumn, len(q.order))
|
| copy(nq.order, q.order)
|
| - nq.project = make(map[string]struct{}, len(q.project))
|
| - for f := range q.project {
|
| - nq.project[f] = struct{}{}
|
| - }
|
| + nq.project = make([]string, len(q.project))
|
| + copy(nq.project, q.project)
|
| if check != nil {
|
| nq.err = check()
|
| }
|
| @@ -250,6 +357,9 @@ func (q *queryImpl) Ancestor(k ds.Key) ds.Query {
|
| // SDK has an explicit nil-check
|
| return errors.New("datastore: nil query ancestor")
|
| }
|
| + if k.Namespace() != q.ns {
|
| + return fmt.Errorf("bad namespace: %q (expected %q)", k.Namespace(), q.ns)
|
| + }
|
| if !k.Valid(false, globalAppID, q.ns) {
|
| // technically the SDK implementation does a Weird Thing (tm) if both the
|
| // stringID and intID are set on a key; it only serializes the stringID in
|
| @@ -258,9 +368,6 @@ func (q *queryImpl) Ancestor(k ds.Key) ds.Query {
|
| // just swap to an error here.
|
| return ds.ErrInvalidKey
|
| }
|
| - if k.Namespace() != q.ns {
|
| - return fmt.Errorf("bad namespace: %q (expected %q)", k.Namespace(), q.ns)
|
| - }
|
| if q.ancestor != nil {
|
| return errors.New("cannot have more than one ancestor")
|
| }
|
| @@ -280,7 +387,7 @@ func (q *queryImpl) Distinct() ds.Query {
|
| func (q *queryImpl) Filter(fStr string, val interface{}) ds.Query {
|
| prop := ""
|
| op := qInvalid
|
| - binVal := ""
|
| + binVal := []byte(nil)
|
| return q.checkMutateClone(
|
| func() error {
|
| var err error
|
| @@ -328,21 +435,25 @@ func (q *queryImpl) Filter(fStr string, val interface{}) ds.Query {
|
| "first sort order must match inequality filter: %q v %q",
|
| q.order[0].Property, prop)
|
| }
|
| - } else if _, ok := q.project[prop]; ok {
|
| - return fmt.Errorf(
|
| - "cannot project on field which is used in an equality filter: %q",
|
| - prop)
|
| + } else {
|
| + for _, p := range q.project {
|
| + if p == prop {
|
| + return fmt.Errorf(
|
| + "cannot project on field which is used in an equality filter: %q",
|
| + prop)
|
| + }
|
| + }
|
| }
|
| - binVal = string(serialize.ToBytes(p))
|
| + binVal = serialize.ToBytes(p)
|
| return err
|
| },
|
| func(q *queryImpl) {
|
| if op == qEqual {
|
| // add it to eq filters
|
| if _, ok := q.eqFilters[prop]; !ok {
|
| - q.eqFilters[prop] = map[string]struct{}{binVal: {}}
|
| + q.eqFilters[prop] = map[string]struct{}{string(binVal): {}}
|
| } else {
|
| - q.eqFilters[prop][binVal] = struct{}{}
|
| + q.eqFilters[prop][string(binVal)] = struct{}{}
|
| }
|
|
|
| // remove it from sort orders.
|
| @@ -419,24 +530,32 @@ func (q *queryImpl) Project(fieldName ...string) ds.Query {
|
| if q.keysOnly {
|
| return errors.New("cannot project a keysOnly query")
|
| }
|
| + dupCheck := map[string]struct{}{}
|
| for _, f := range fieldName {
|
| + if _, ok := dupCheck[f]; ok {
|
| + return fmt.Errorf("cannot project on the same field twice: %q", f)
|
| + }
|
| + dupCheck[f] = struct{}{}
|
| if f == "" {
|
| return errors.New("cannot project on an empty field name")
|
| }
|
| - if strings.HasPrefix(f, "__") && strings.HasSuffix(f, "__") {
|
| - return fmt.Errorf("cannot project on %q", f)
|
| + if f == "__key__" {
|
| + return fmt.Errorf("cannot project on __key__")
|
| }
|
| if _, ok := q.eqFilters[f]; ok {
|
| return fmt.Errorf(
|
| "cannot project on field which is used in an equality filter: %q", f)
|
| }
|
| + for _, p := range q.project {
|
| + if p == f {
|
| + return fmt.Errorf("cannot project on the same field twice: %q", f)
|
| + }
|
| + }
|
| }
|
| return nil
|
| },
|
| func(q *queryImpl) {
|
| - for _, f := range fieldName {
|
| - q.project[f] = struct{}{}
|
| - }
|
| + q.project = append(q.project, fieldName...)
|
| })
|
| }
|
|
|
| @@ -482,18 +601,24 @@ func (q *queryImpl) Offset(offset int) ds.Query {
|
| })
|
| }
|
|
|
| +func queryCursorCheck(ns, flavor string, current queryCursor, newCursor ds.Cursor) (queryCursor, error) {
|
| + if current != nil {
|
| + return nil, fmt.Errorf("%s cursor is multiply defined", flavor)
|
| + }
|
| + curs, ok := newCursor.(queryCursor)
|
| + if !ok {
|
| + return nil, fmt.Errorf("%s cursor is unknown type: %T", flavor, curs)
|
| + }
|
| + _, _, err := curs.decode()
|
| + return curs, err
|
| +}
|
| +
|
| func (q *queryImpl) Start(c ds.Cursor) ds.Query {
|
| - curs := queryCursor("")
|
| + curs := queryCursor(nil)
|
| return q.checkMutateClone(
|
| - func() error {
|
| - ok := false
|
| - if curs, ok = c.(queryCursor); !ok {
|
| - return fmt.Errorf("start cursor is unknown type: %T", c)
|
| - }
|
| - if !curs.Valid() {
|
| - return errors.New("datastore: invalid cursor")
|
| - }
|
| - return nil
|
| + func() (err error) {
|
| + curs, err = queryCursorCheck(q.ns, "start", q.start, c)
|
| + return
|
| },
|
| func(q *queryImpl) {
|
| q.start = curs
|
| @@ -501,17 +626,11 @@ func (q *queryImpl) Start(c ds.Cursor) ds.Query {
|
| }
|
|
|
| func (q *queryImpl) End(c ds.Cursor) ds.Query {
|
| - curs := queryCursor("")
|
| + curs := queryCursor(nil)
|
| return q.checkMutateClone(
|
| - func() error {
|
| - ok := false
|
| - if curs, ok = c.(queryCursor); !ok {
|
| - return fmt.Errorf("end cursor is unknown type: %T", c)
|
| - }
|
| - if !curs.Valid() {
|
| - return errors.New("datastore: invalid cursor")
|
| - }
|
| - return nil
|
| + func() (err error) {
|
| + curs, err = queryCursorCheck(q.ns, "end", q.end, c)
|
| + return
|
| },
|
| func(q *queryImpl) {
|
| q.end = curs
|
|
|