Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1068)

Side by Side Diff: impl/memory/datastore_query_execution.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: stringSet everywhere Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/datastore_query.go ('k') | impl/memory/datastore_query_execution_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "bytes"
9 "fmt"
10
11 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/gae/service/datastore/serialize"
13 "github.com/luci/luci-go/common/cmpbin"
14 )
15
16 type queryStrategy interface {
17 // handle applies the strategy to the embedded user callback.
18 // - rawData is the slice of encoded Properties from the index row
19 // (correctly de-inverted).
20 // - decodedProps is the slice of decoded Properties from the index ro w
21 // - key is the decoded Key from the index row (the last item in rawDa ta and
22 // decodedProps)
23 // - gc is the getCursor function to be passed to the user's callback
24 handle(rawData [][]byte, decodedProps []ds.Property, key ds.Key, gc func () (ds.Cursor, error)) bool
25 }
26
27 type projectionLookup struct {
28 suffixIndex int
29 propertyName string
30 }
31
32 type projectionStrategy struct {
33 cb ds.RawRunCB
34
35 project []projectionLookup
36 distinct stringSet
37 }
38
39 func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) query Strategy {
40 projectionLookups := make([]projectionLookup, len(q.project))
41 for i, prop := range q.project {
42 projectionLookups[i].propertyName = prop
43 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?")
44 for j, col := range rq.suffixFormat {
45 if col.Property == prop {
46 projectionLookups[i].suffixIndex = j
47 lookupErr = nil
48 break
49 }
50 }
51 impossible(lookupErr)
52 }
53 ret := &projectionStrategy{cb: cb, project: projectionLookups}
54 if q.distinct {
55 ret.distinct = stringSet{}
56 }
57 return ret
58 }
59
60 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key ds.Key, gc func() (ds.Cursor, error)) bool {
61 projectedRaw := [][]byte(nil)
62 if s.distinct != nil {
63 projectedRaw = make([][]byte, len(decodedProps))
64 }
65 pmap := make(ds.PropertyMap, len(s.project))
66 for i, p := range s.project {
67 if s.distinct != nil {
68 projectedRaw[i] = rawData[p.suffixIndex]
69 }
70 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex] }
71 }
72 if s.distinct != nil {
73 if !s.distinct.add(string(bjoin(projectedRaw...))) {
74 return true
75 }
76 }
77 return s.cb(key, pmap, gc)
78 }
79
80 type keysOnlyStrategy struct {
81 cb ds.RawRunCB
82
83 dedup stringSet
84 }
85
86 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
87 if !s.dedup.add(string(rawData[len(rawData)-1])) {
88 return true
89 }
90 return s.cb(key, nil, gc)
91 }
92
93 type normalStrategy struct {
94 cb ds.RawRunCB
95
96 ns string
97 head *memCollection
98 dedup stringSet
99 }
100
101 func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
102 coll := head.GetCollection("ents:" + ns)
103 if coll == nil {
104 return nil
105 }
106 return &normalStrategy{cb, ns, coll, stringSet{}}
107 }
108
109 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, g c func() (ds.Cursor, error)) bool {
110 rawKey := rawData[len(rawData)-1]
111 if !s.dedup.add(string(rawKey)) {
112 return true
113 }
114
115 rawEnt := s.head.Get(rawKey)
116 if rawEnt == nil {
117 // entity doesn't exist at head
118 return true
119 }
120 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, globalAppID, s.ns)
121 memoryCorruption(err)
122
123 return s.cb(key, pm, gc)
124 }
125
126 func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *mem Store) queryStrategy {
127 if q.keysOnly {
128 return &keysOnlyStrategy{cb, stringSet{}}
129 }
130 if len(q.project) > 0 {
131 return newProjectionStrategy(q, rq, cb)
132 }
133 return newNormalStrategy(rq.ns, cb, head)
134 }
135
136 func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) {
137 buf := serialize.Invertible(bytes.NewBuffer(suffix))
138 decoded = make([]ds.Property, len(suffixFormat))
139 raw = make([][]byte, len(suffixFormat))
140
141 err := error(nil)
142 for i := range decoded {
143 if count > 0 && i > count {
144 break
145 }
146 needInvert := suffixFormat[i].Direction == ds.DESCENDING
147
148 buf.SetInvert(needInvert)
149 decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, globalAppID, ns)
150 memoryCorruption(err)
151
152 offset := len(suffix) - buf.Len()
153 raw[i] = suffix[:offset]
154 suffix = suffix[offset:]
155 if needInvert {
156 raw[i] = invert(raw[i])
157 }
158 }
159
160 return
161 }
162
163 func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error {
164 q := origQ.(*queryImpl)
165
166 rq, err := q.reduce(ns, isTxn)
167 if err == errQueryDone {
168 return nil
169 }
170 if err != nil {
171 return err
172 }
173
174 idxs, err := getIndexes(rq, idx)
175 if err == errQueryDone {
176 return nil
177 }
178 if err != nil {
179 return err
180 }
181
182 strategy := pickQueryStrategy(q, rq, cb, head)
183 if strategy == nil {
184 // e.g. the normalStrategy found that there were NO entities in the current
185 // namespace.
186 return nil
187 }
188
189 offset := q.offset
190 limit := q.limit
191 hasLimit := q.limitSet && limit >= 0
192
193 cursorPrefix := []byte(nil)
194 getCursorFn := func(suffix []byte) func() (ds.Cursor, error) {
195 return func() (ds.Cursor, error) {
196 if cursorPrefix == nil {
197 buf := &bytes.Buffer{}
198 _, err := cmpbin.WriteUint(buf, uint64(len(rq.su ffixFormat)))
199 memoryCorruption(err)
200
201 for _, col := range rq.suffixFormat {
202 err := serialize.WriteIndexColumn(buf, c ol)
203 memoryCorruption(err)
204 }
205 cursorPrefix = buf.Bytes()
206 }
207 // TODO(riannucci): Do we need to decrement suffix inste ad of increment
208 // if we're sorting by __key__ DESCENDING?
209 return queryCursor(bjoin(cursorPrefix, increment(suffix) )), nil
210 }
211 }
212
213 multiIterate(idxs, func(suffix []byte) bool {
214 if offset > 0 {
215 offset--
216 return true
217 }
218 if hasLimit {
219 if limit <= 0 {
220 return false
221 }
222 limit--
223 }
224
225 rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix , -1)
226
227 keyProp := decodedProps[len(decodedProps)-1]
228 if keyProp.Type() != ds.PTKey {
229 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp))
230 }
231
232 return strategy.handle(
233 rawData, decodedProps, keyProp.Value().(ds.Key),
234 getCursorFn(suffix))
235 })
236
237 return nil
238 }
OLDNEW
« no previous file with comments | « impl/memory/datastore_query.go ('k') | impl/memory/datastore_query_execution_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698