Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(338)

Unified Diff: impl/memory/datastore_query_execution.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: Baby's first query! Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: impl/memory/datastore_query_execution.go
diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go
new file mode 100644
index 0000000000000000000000000000000000000000..e219b524e2188ef81f0e47dee35b536af0c9433a
--- /dev/null
+++ b/impl/memory/datastore_query_execution.go
@@ -0,0 +1,235 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package memory
+
+import (
+ "bytes"
+ "fmt"
+
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/datastore/serialize"
+ "github.com/luci/luci-go/common/cmpbin"
+)
+
+type queryStrategy interface {
+ // handle applies the strategy to the embedded user callback.
+ // - rawData is the slice of encoded Properties from the index row
+ // (correctly de-inverted).
+ // - data is the slice of decoded Properties from the index row
+ // - key is the decoded Key from the index row (the last item in rawData and
+ // data)
+ // - gc is the getCursor function to be passed to the user's callback
+ handle(rawData [][]byte, data []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool
+}
+
+type projectionLookup struct {
+ suffixIndex int
+ propertyName string
+}
+
+type projectionStrategy struct {
+ cb ds.RawRunCB
+
+ project []projectionLookup
+ distinct map[string]struct{}
+}
+
+func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) queryStrategy {
+ projectionLookups := make([]projectionLookup, len(q.project))
+ for i, prop := range q.project {
+ projectionLookups[i].propertyName = prop
+ foundIt := false
+ for j, col := range rq.suffixFormat {
+ if col.Property == prop {
+ projectionLookups[i].suffixIndex = j
+ foundIt = true
+ break
+ }
+ }
+ if !foundIt {
+ impossible(fmt.Errorf("planning a strategy for an unfulfillable query?"))
+ }
+ }
+ ret := &projectionStrategy{cb: cb, project: projectionLookups}
+ if q.distinct {
+ ret.distinct = map[string]struct{}{}
+ }
+ return ret
+}
+
+func (s *projectionStrategy) handle(rawData [][]byte, suffixData []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
+ projectedRaw := [][]byte(nil)
+ if s.distinct != nil {
+ projectedRaw = make([][]byte, len(suffixData))
+ }
+ pmap := make(ds.PropertyMap, len(s.project))
+ for i, p := range s.project {
+ if s.distinct != nil {
+ projectedRaw[i] = rawData[p.suffixIndex]
+ }
+ pmap[p.propertyName] = []ds.Property{suffixData[p.suffixIndex]}
+ }
+ if s.distinct != nil {
+ rawString := string(bjoin(projectedRaw...))
+ if _, ok := s.distinct[rawString]; ok {
+ return true
+ }
+ s.distinct[rawString] = struct{}{}
+ }
+ return s.cb(key, pmap, gc)
+}
+
+type keysOnlyStrategy ds.RawRunCB
+
+func (s keysOnlyStrategy) handle(_ [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
+ return ds.RawRunCB(s)(key, nil, gc)
+}
+
+type normalStrategy struct {
+ cb ds.RawRunCB
+
+ ns string
+ head *memCollection
+}
+
+func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
+ coll := head.GetCollection("ents:" + ns)
+ if coll == nil {
+ return nil
+ }
+ return &normalStrategy{cb, ns, coll}
+}
+
+func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
+ rawEnt := s.head.Get(rawData[len(rawData)-1][1:])
+ if rawEnt == nil {
+ // entity doesn't exist at head
+ return true
+ }
+ pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, globalAppID, s.ns)
+ memoryCorruption(err)
+
+ return s.cb(key, pm, gc)
+}
+
+func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy {
+ if q.keysOnly {
+ return keysOnlyStrategy(cb)
+ } else if len(q.project) > 0 {
+ return newProjectionStrategy(q, rq, cb)
dnj (Google) 2015/08/23 06:50:06 No need for "else" here.
iannucci 2015/08/23 18:19:43 Done.
+ }
+ return newNormalStrategy(rq.ns, cb, head)
+}
+
+func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte) (raw [][]byte, decoded []ds.Property) {
+ buf := serialize.Invertible(bytes.NewBuffer(suffix))
+ decoded = make([]ds.Property, len(suffixFormat))
+ raw = make([][]byte, len(suffixFormat))
+
+ err := error(nil)
+ prevOffset := 0
+ for i := range decoded {
+ needInvert := suffixFormat[i].Direction == ds.DESCENDING
+
+ buf.SetInvert(needInvert)
dnj (Google) 2015/08/23 06:50:06 Just inline the condition here: buf.SetInvert(suff
iannucci 2015/08/23 18:19:42 Need this below (line 141) as well
+ decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutContext, globalAppID, ns)
+ memoryCorruption(err)
+
+ offset := len(suffix) - buf.Len()
+ if needInvert {
+ raw[i] = invert(suffix[prevOffset:offset])
dnj (Google) 2015/08/23 06:50:06 Consider walking suffix: suffix = suffix[:offset]
iannucci 2015/08/23 18:19:42 ya. done
+ } else {
+ raw[i] = suffix[prevOffset:offset]
+ }
+ prevOffset = offset
+ }
+
+ return
+}
+
+func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error {
+ q, ok := origQ.(*queryImpl)
+ if !ok {
+ return fmt.Errorf("unsupported Query implementation: %#v", q)
dnj (Google) 2015/08/23 06:50:06 %T?
iannucci 2015/08/23 18:19:43 Done.
+ }
+
+ rq, err := q.reduce(ns, isTxn)
+ if err != nil {
+ if err == errQueryDone {
+ return nil
+ }
+ return err
+ }
+
+ strategy := pickQueryStrategy(q, rq, cb, head)
+ if strategy == nil {
+ // e.g. the normalStrategy found that there were NO entities in the current
+ // namespace.
+ return nil
+ }
+
+ dedup := map[string]struct{}{}
+
+ offset := q.offset
+ limit := q.limit
+ hasLimit := limit < 0
dnj (Google) 2015/08/23 06:50:06 There's a limit if either (a) limit < 0, or (b) li
iannucci 2015/08/23 18:19:43 :) "https://cloud.google.com/appengine/docs/go/da
+
+ cursorPrefix := []byte(nil)
+ getCursorFn := func(suffix []byte) func() (ds.Cursor, error) {
+ return func() (ds.Cursor, error) {
+ if cursorPrefix == nil {
+ buf := &bytes.Buffer{}
+ _, err := cmpbin.WriteUint(buf, uint64(len(rq.suffixFormat)))
+ memoryCorruption(err)
+
+ for _, col := range rq.suffixFormat {
+ err := serialize.WriteIndexColumn(buf, col)
+ memoryCorruption(err)
+ }
+ cursorPrefix = buf.Bytes()
+ }
+ return queryCursor(bjoin(cursorPrefix, suffix)), nil
+ }
+ }
+
+ idxs, err := getIndicies(rq, idx)
+ if err == errQueryDone {
+ return nil
+ } else if err != nil {
dnj (Google) 2015/08/23 06:50:06 No need for "else".
iannucci 2015/08/23 18:19:42 Done.
+ return err
+ }
+
+ multiIterate(idxs, func(suffix []byte) bool {
+ if offset > 0 {
+ offset--
+ return true
+ }
+ if hasLimit {
+ if limit <= 0 {
+ return false
+ }
+ limit--
+ }
+
+ rawData, data := parseSuffix(ns, rq.suffixFormat, suffix)
dnj (Google) 2015/08/23 06:50:06 I don't think "data" is a good name here. Maybe "d
iannucci 2015/08/23 18:19:42 Done.
+
+ keyProp := data[len(data)-1]
+ if keyProp.Type() != ds.PTKey {
+ impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", data))
dnj (Google) 2015/08/23 06:50:06 %T?
iannucci 2015/08/23 18:19:43 Nah. Changed to keyProp tho.
+ }
+ keyRaw := string(rawData[len(rawData)-1])
+ if _, ok := dedup[keyRaw]; ok {
+ // dedup'd!
+ return true
+ }
+ dedup[keyRaw] = struct{}{}
+
+ return strategy.handle(
+ rawData, data, keyProp.Value().(ds.Key),
+ getCursorFn(suffix))
+ })
+
+ return nil
+}

Powered by Google App Engine
This is Rietveld 408576698