Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(192)

Side by Side Diff: impl/memory/datastore_query_execution.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: remove limit double-set restriction Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "bytes"
9 "fmt"
10
11 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/gae/service/datastore/serialize"
13 "github.com/luci/luci-go/common/cmpbin"
14 )
15
16 type queryStrategy interface {
17 // handle applies the strategy to the embedded user callback.
18 // - rawData is the slice of encoded Properties from the index row
19 // (correctly de-inverted).
20 // - decodedProps is the slice of decoded Properties from the index ro w
21 // - key is the decoded Key from the index row (the last item in rawDa ta and
22 // decodedProps)
23 // - gc is the getCursor function to be passed to the user's callback
24 handle(rawData [][]byte, decodedProps []ds.Property, key ds.Key, gc func () (ds.Cursor, error)) bool
25 }
26
27 type projectionLookup struct {
28 suffixIndex int
29 propertyName string
30 }
31
32 type projectionStrategy struct {
33 cb ds.RawRunCB
34
35 project []projectionLookup
36 distinct map[string]struct{}
37 }
38
39 func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) query Strategy {
40 projectionLookups := make([]projectionLookup, len(q.project))
41 for i, prop := range q.project {
42 projectionLookups[i].propertyName = prop
43 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?")
44 for j, col := range rq.suffixFormat {
45 if col.Property == prop {
46 projectionLookups[i].suffixIndex = j
47 lookupErr = nil
48 break
49 }
50 }
51 impossible(lookupErr)
52 }
53 ret := &projectionStrategy{cb: cb, project: projectionLookups}
54 if q.distinct {
55 ret.distinct = map[string]struct{}{}
56 }
57 return ret
58 }
59
60 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key ds.Key, gc func() (ds.Cursor, error)) bool {
61 projectedRaw := [][]byte(nil)
62 if s.distinct != nil {
63 projectedRaw = make([][]byte, len(decodedProps))
64 }
65 pmap := make(ds.PropertyMap, len(s.project))
66 for i, p := range s.project {
67 if s.distinct != nil {
68 projectedRaw[i] = rawData[p.suffixIndex]
69 }
70 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex] }
71 }
72 if s.distinct != nil {
73 rawString := string(bjoin(projectedRaw...))
74 if _, ok := s.distinct[rawString]; ok {
75 return true
76 }
77 s.distinct[rawString] = struct{}{}
78 }
79 return s.cb(key, pmap, gc)
80 }
81
82 type keysOnlyStrategy struct {
83 cb ds.RawRunCB
84
85 dedup map[string]struct{}
86 }
87
88 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
89 strKey := string(rawData[len(rawData)-1])
90 if _, ok := s.dedup[strKey]; ok {
91 return true
92 }
93 s.dedup[strKey] = struct{}{}
94 return s.cb(key, nil, gc)
95 }
96
97 type normalStrategy struct {
98 cb ds.RawRunCB
99
100 ns string
101 head *memCollection
102 dedup map[string]struct{}
103 }
104
105 func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
106 coll := head.GetCollection("ents:" + ns)
107 if coll == nil {
108 return nil
109 }
110 return &normalStrategy{cb, ns, coll, map[string]struct{}{}}
111 }
112
113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, g c func() (ds.Cursor, error)) bool {
114 rawKey := rawData[len(rawData)-1]
115 strKey := string(rawKey)
116 if _, ok := s.dedup[strKey]; ok {
117 return true
118 }
119 s.dedup[strKey] = struct{}{}
120
121 rawEnt := s.head.Get(rawKey)
122 if rawEnt == nil {
123 // entity doesn't exist at head
124 return true
125 }
126 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, globalAppID, s.ns)
127 memoryCorruption(err)
128
129 return s.cb(key, pm, gc)
130 }
131
132 func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *mem Store) queryStrategy {
133 if q.keysOnly {
134 return &keysOnlyStrategy{cb, map[string]struct{}{}}
135 }
136 if len(q.project) > 0 {
137 return newProjectionStrategy(q, rq, cb)
138 }
139 return newNormalStrategy(rq.ns, cb, head)
140 }
141
142 func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) {
143 buf := serialize.Invertible(bytes.NewBuffer(suffix))
144 decoded = make([]ds.Property, len(suffixFormat))
145 raw = make([][]byte, len(suffixFormat))
146
147 err := error(nil)
148 for i := range decoded {
149 if count > 0 && i > count {
150 break
151 }
152 needInvert := suffixFormat[i].Direction == ds.DESCENDING
153
154 buf.SetInvert(needInvert)
155 decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, globalAppID, ns)
156 memoryCorruption(err)
157
158 offset := len(suffix) - buf.Len()
159 raw[i] = suffix[:offset]
160 suffix = suffix[offset:]
161 if needInvert {
162 raw[i] = invert(raw[i])
163 }
164 }
165
166 return
167 }
168
169 func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error {
170 q := origQ.(*queryImpl)
171
172 rq, err := q.reduce(ns, isTxn)
173 if err == errQueryDone {
174 return nil
175 }
176 if err != nil {
177 return err
178 }
179
180 idxs, err := getIndexes(rq, idx)
181 if err == errQueryDone {
182 return nil
183 }
184 if err != nil {
185 return err
186 }
187
188 strategy := pickQueryStrategy(q, rq, cb, head)
189 if strategy == nil {
190 // e.g. the normalStrategy found that there were NO entities in the current
191 // namespace.
192 return nil
193 }
194
195 offset := q.offset
196 limit := q.limit
197 hasLimit := q.limitSet && limit >= 0
198
199 cursorPrefix := []byte(nil)
200 getCursorFn := func(suffix []byte) func() (ds.Cursor, error) {
201 return func() (ds.Cursor, error) {
202 if cursorPrefix == nil {
203 buf := &bytes.Buffer{}
204 _, err := cmpbin.WriteUint(buf, uint64(len(rq.su ffixFormat)))
205 memoryCorruption(err)
206
207 for _, col := range rq.suffixFormat {
208 err := serialize.WriteIndexColumn(buf, c ol)
209 memoryCorruption(err)
210 }
211 cursorPrefix = buf.Bytes()
212 }
213 // TODO(riannucci): Do we need to decrement suffix inste ad of increment
214 // if we're sorting by __key__ DESCENDING?
215 return queryCursor(bjoin(cursorPrefix, increment(suffix) )), nil
216 }
217 }
218
219 multiIterate(idxs, func(suffix []byte) bool {
220 if offset > 0 {
221 offset--
222 return true
223 }
224 if hasLimit {
225 if limit <= 0 {
226 return false
227 }
228 limit--
229 }
230
231 rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix , -1)
232
233 keyProp := decodedProps[len(decodedProps)-1]
234 if keyProp.Type() != ds.PTKey {
235 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp))
236 }
237
238 return strategy.handle(
239 rawData, decodedProps, keyProp.Value().(ds.Key),
240 getCursorFn(suffix))
241 })
242
243 return nil
244 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698