Chromium Code Reviews| Index: impl/memory/datastore_query_execution.go |
| diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..e219b524e2188ef81f0e47dee35b536af0c9433a |
| --- /dev/null |
| +++ b/impl/memory/datastore_query_execution.go |
| @@ -0,0 +1,235 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package memory |
| + |
| +import ( |
| + "bytes" |
| + "fmt" |
| + |
| + ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/datastore/serialize" |
| + "github.com/luci/luci-go/common/cmpbin" |
| +) |
| + |
| +type queryStrategy interface { |
| + // handle applies the strategy to the embedded user callback. |
| + // - rawData is the slice of encoded Properties from the index row |
| + // (correctly de-inverted). |
| + // - data is the slice of decoded Properties from the index row |
| + // - key is the decoded Key from the index row (the last item in rawData and |
| + // data) |
| + // - gc is the getCursor function to be passed to the user's callback |
| + handle(rawData [][]byte, data []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool |
| +} |
| + |
| +type projectionLookup struct { |
| + suffixIndex int |
| + propertyName string |
| +} |
| + |
| +type projectionStrategy struct { |
| + cb ds.RawRunCB |
| + |
| + project []projectionLookup |
| + distinct map[string]struct{} |
| +} |
| + |
| +func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) queryStrategy { |
| + projectionLookups := make([]projectionLookup, len(q.project)) |
| + for i, prop := range q.project { |
| + projectionLookups[i].propertyName = prop |
| + foundIt := false |
| + for j, col := range rq.suffixFormat { |
| + if col.Property == prop { |
| + projectionLookups[i].suffixIndex = j |
| + foundIt = true |
| + break |
| + } |
| + } |
| + if !foundIt { |
| + impossible(fmt.Errorf("planning a strategy for an unfulfillable query?")) |
| + } |
| + } |
| + ret := &projectionStrategy{cb: cb, project: projectionLookups} |
| + if q.distinct { |
| + ret.distinct = map[string]struct{}{} |
| + } |
| + return ret |
| +} |
| + |
| +func (s *projectionStrategy) handle(rawData [][]byte, suffixData []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { |
| + projectedRaw := [][]byte(nil) |
| + if s.distinct != nil { |
| + projectedRaw = make([][]byte, len(suffixData)) |
| + } |
| + pmap := make(ds.PropertyMap, len(s.project)) |
| + for i, p := range s.project { |
| + if s.distinct != nil { |
| + projectedRaw[i] = rawData[p.suffixIndex] |
| + } |
| + pmap[p.propertyName] = []ds.Property{suffixData[p.suffixIndex]} |
| + } |
| + if s.distinct != nil { |
| + rawString := string(bjoin(projectedRaw...)) |
| + if _, ok := s.distinct[rawString]; ok { |
| + return true |
| + } |
| + s.distinct[rawString] = struct{}{} |
| + } |
| + return s.cb(key, pmap, gc) |
| +} |
| + |
| +type keysOnlyStrategy ds.RawRunCB |
| + |
| +func (s keysOnlyStrategy) handle(_ [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { |
| + return ds.RawRunCB(s)(key, nil, gc) |
| +} |
| + |
| +type normalStrategy struct { |
| + cb ds.RawRunCB |
| + |
| + ns string |
| + head *memCollection |
| +} |
| + |
| +func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy { |
| + coll := head.GetCollection("ents:" + ns) |
| + if coll == nil { |
| + return nil |
| + } |
| + return &normalStrategy{cb, ns, coll} |
| +} |
| + |
| +func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { |
| + rawEnt := s.head.Get(rawData[len(rawData)-1][1:]) |
| + if rawEnt == nil { |
| + // entity doesn't exist at head |
| + return true |
| + } |
| + pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, globalAppID, s.ns) |
| + memoryCorruption(err) |
| + |
| + return s.cb(key, pm, gc) |
| +} |
| + |
| +func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy { |
| + if q.keysOnly { |
| + return keysOnlyStrategy(cb) |
| + } else if len(q.project) > 0 { |
| + return newProjectionStrategy(q, rq, cb) |
|
dnj (Google)
2015/08/23 06:50:06
No need for "else" here.
iannucci
2015/08/23 18:19:43
Done.
|
| + } |
| + return newNormalStrategy(rq.ns, cb, head) |
| +} |
| + |
| +func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte) (raw [][]byte, decoded []ds.Property) { |
| + buf := serialize.Invertible(bytes.NewBuffer(suffix)) |
| + decoded = make([]ds.Property, len(suffixFormat)) |
| + raw = make([][]byte, len(suffixFormat)) |
| + |
| + err := error(nil) |
| + prevOffset := 0 |
| + for i := range decoded { |
| + needInvert := suffixFormat[i].Direction == ds.DESCENDING |
| + |
| + buf.SetInvert(needInvert) |
|
dnj (Google)
2015/08/23 06:50:06
Just inline the condition here: buf.SetInvert(suff
iannucci
2015/08/23 18:19:42
Need this below (line 141) as well
|
| + decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutContext, globalAppID, ns) |
| + memoryCorruption(err) |
| + |
| + offset := len(suffix) - buf.Len() |
| + if needInvert { |
| + raw[i] = invert(suffix[prevOffset:offset]) |
|
dnj (Google)
2015/08/23 06:50:06
Consider walking suffix:
suffix = suffix[:offset]
iannucci
2015/08/23 18:19:42
ya. done
|
| + } else { |
| + raw[i] = suffix[prevOffset:offset] |
| + } |
| + prevOffset = offset |
| + } |
| + |
| + return |
| +} |
| + |
| +func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error { |
| + q, ok := origQ.(*queryImpl) |
| + if !ok { |
| + return fmt.Errorf("unsupported Query implementation: %#v", q) |
|
dnj (Google)
2015/08/23 06:50:06
%T?
iannucci
2015/08/23 18:19:43
Done.
|
| + } |
| + |
| + rq, err := q.reduce(ns, isTxn) |
| + if err != nil { |
| + if err == errQueryDone { |
| + return nil |
| + } |
| + return err |
| + } |
| + |
| + strategy := pickQueryStrategy(q, rq, cb, head) |
| + if strategy == nil { |
| + // e.g. the normalStrategy found that there were NO entities in the current |
| + // namespace. |
| + return nil |
| + } |
| + |
| + dedup := map[string]struct{}{} |
| + |
| + offset := q.offset |
| + limit := q.limit |
| + hasLimit := limit < 0 |
|
dnj (Google)
2015/08/23 06:50:06
There's a limit if either (a) limit < 0, or (b) li
iannucci
2015/08/23 18:19:43
:)
"https://cloud.google.com/appengine/docs/go/da
|
| + |
| + cursorPrefix := []byte(nil) |
| + getCursorFn := func(suffix []byte) func() (ds.Cursor, error) { |
| + return func() (ds.Cursor, error) { |
| + if cursorPrefix == nil { |
| + buf := &bytes.Buffer{} |
| + _, err := cmpbin.WriteUint(buf, uint64(len(rq.suffixFormat))) |
| + memoryCorruption(err) |
| + |
| + for _, col := range rq.suffixFormat { |
| + err := serialize.WriteIndexColumn(buf, col) |
| + memoryCorruption(err) |
| + } |
| + cursorPrefix = buf.Bytes() |
| + } |
| + return queryCursor(bjoin(cursorPrefix, suffix)), nil |
| + } |
| + } |
| + |
| + idxs, err := getIndicies(rq, idx) |
| + if err == errQueryDone { |
| + return nil |
| + } else if err != nil { |
|
dnj (Google)
2015/08/23 06:50:06
No need for "else".
iannucci
2015/08/23 18:19:42
Done.
|
| + return err |
| + } |
| + |
| + multiIterate(idxs, func(suffix []byte) bool { |
| + if offset > 0 { |
| + offset-- |
| + return true |
| + } |
| + if hasLimit { |
| + if limit <= 0 { |
| + return false |
| + } |
| + limit-- |
| + } |
| + |
| + rawData, data := parseSuffix(ns, rq.suffixFormat, suffix) |
|
dnj (Google)
2015/08/23 06:50:06
I don't think "data" is a good name here. Maybe "d
iannucci
2015/08/23 18:19:42
Done.
|
| + |
| + keyProp := data[len(data)-1] |
| + if keyProp.Type() != ds.PTKey { |
| + impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", data)) |
|
dnj (Google)
2015/08/23 06:50:06
%T?
iannucci
2015/08/23 18:19:43
Nah. Changed to keyProp tho.
|
| + } |
| + keyRaw := string(rawData[len(rawData)-1]) |
| + if _, ok := dedup[keyRaw]; ok { |
| + // dedup'd! |
| + return true |
| + } |
| + dedup[keyRaw] = struct{}{} |
| + |
| + return strategy.handle( |
| + rawData, data, keyProp.Value().(ds.Key), |
| + getCursorFn(suffix)) |
| + }) |
| + |
| + return nil |
| +} |