Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(29)

Side by Side Diff: impl/memory/datastore_query_execution.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: Baby's first query! Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "bytes"
9 "fmt"
10
11 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/gae/service/datastore/serialize"
13 "github.com/luci/luci-go/common/cmpbin"
14 )
15
16 type queryStrategy interface {
17 // handle applies the strategy to the embedded user callback.
18 // - rawData is the slice of encoded Properties from the index row
19 // (correctly de-inverted).
20 // - data is the slice of decoded Properties from the index row
21 // - key is the decoded Key from the index row (the last item in rawDa ta and
22 // data)
23 // - gc is the getCursor function to be passed to the user's callback
24 handle(rawData [][]byte, data []ds.Property, key ds.Key, gc func() (ds.C ursor, error)) bool
25 }
26
27 type projectionLookup struct {
28 suffixIndex int
29 propertyName string
30 }
31
32 type projectionStrategy struct {
33 cb ds.RawRunCB
34
35 project []projectionLookup
36 distinct map[string]struct{}
37 }
38
39 func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) query Strategy {
40 projectionLookups := make([]projectionLookup, len(q.project))
41 for i, prop := range q.project {
42 projectionLookups[i].propertyName = prop
43 foundIt := false
44 for j, col := range rq.suffixFormat {
45 if col.Property == prop {
46 projectionLookups[i].suffixIndex = j
47 foundIt = true
48 break
49 }
50 }
51 if !foundIt {
52 impossible(fmt.Errorf("planning a strategy for an unfulf illable query?"))
53 }
54 }
55 ret := &projectionStrategy{cb: cb, project: projectionLookups}
56 if q.distinct {
57 ret.distinct = map[string]struct{}{}
58 }
59 return ret
60 }
61
62 func (s *projectionStrategy) handle(rawData [][]byte, suffixData []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
63 projectedRaw := [][]byte(nil)
64 if s.distinct != nil {
65 projectedRaw = make([][]byte, len(suffixData))
66 }
67 pmap := make(ds.PropertyMap, len(s.project))
68 for i, p := range s.project {
69 if s.distinct != nil {
70 projectedRaw[i] = rawData[p.suffixIndex]
71 }
72 pmap[p.propertyName] = []ds.Property{suffixData[p.suffixIndex]}
73 }
74 if s.distinct != nil {
75 rawString := string(bjoin(projectedRaw...))
76 if _, ok := s.distinct[rawString]; ok {
77 return true
78 }
79 s.distinct[rawString] = struct{}{}
80 }
81 return s.cb(key, pmap, gc)
82 }
83
84 type keysOnlyStrategy ds.RawRunCB
85
86 func (s keysOnlyStrategy) handle(_ [][]byte, _ []ds.Property, key ds.Key, gc fun c() (ds.Cursor, error)) bool {
87 return ds.RawRunCB(s)(key, nil, gc)
88 }
89
90 type normalStrategy struct {
91 cb ds.RawRunCB
92
93 ns string
94 head *memCollection
95 }
96
97 func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
98 coll := head.GetCollection("ents:" + ns)
99 if coll == nil {
100 return nil
101 }
102 return &normalStrategy{cb, ns, coll}
103 }
104
105 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, g c func() (ds.Cursor, error)) bool {
106 rawEnt := s.head.Get(rawData[len(rawData)-1][1:])
107 if rawEnt == nil {
108 // entity doesn't exist at head
109 return true
110 }
111 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, globalAppID, s.ns)
112 memoryCorruption(err)
113
114 return s.cb(key, pm, gc)
115 }
116
117 func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *mem Store) queryStrategy {
118 if q.keysOnly {
119 return keysOnlyStrategy(cb)
120 } else if len(q.project) > 0 {
121 return newProjectionStrategy(q, rq, cb)
dnj (Google) 2015/08/23 06:50:06 No need for "else" here.
iannucci 2015/08/23 18:19:43 Done.
122 }
123 return newNormalStrategy(rq.ns, cb, head)
124 }
125
126 func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte) (raw [ ][]byte, decoded []ds.Property) {
127 buf := serialize.Invertible(bytes.NewBuffer(suffix))
128 decoded = make([]ds.Property, len(suffixFormat))
129 raw = make([][]byte, len(suffixFormat))
130
131 err := error(nil)
132 prevOffset := 0
133 for i := range decoded {
134 needInvert := suffixFormat[i].Direction == ds.DESCENDING
135
136 buf.SetInvert(needInvert)
dnj (Google) 2015/08/23 06:50:06 Just inline the condition here: buf.SetInvert(suff
iannucci 2015/08/23 18:19:42 Need this below (line 141) as well
137 decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, globalAppID, ns)
138 memoryCorruption(err)
139
140 offset := len(suffix) - buf.Len()
141 if needInvert {
142 raw[i] = invert(suffix[prevOffset:offset])
dnj (Google) 2015/08/23 06:50:06 Consider walking suffix: suffix = suffix[:offset]
iannucci 2015/08/23 18:19:42 ya. done
143 } else {
144 raw[i] = suffix[prevOffset:offset]
145 }
146 prevOffset = offset
147 }
148
149 return
150 }
151
152 func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error {
153 q, ok := origQ.(*queryImpl)
154 if !ok {
155 return fmt.Errorf("unsupported Query implementation: %#v", q)
dnj (Google) 2015/08/23 06:50:06 %T?
iannucci 2015/08/23 18:19:43 Done.
156 }
157
158 rq, err := q.reduce(ns, isTxn)
159 if err != nil {
160 if err == errQueryDone {
161 return nil
162 }
163 return err
164 }
165
166 strategy := pickQueryStrategy(q, rq, cb, head)
167 if strategy == nil {
168 // e.g. the normalStrategy found that there were NO entities in the current
169 // namespace.
170 return nil
171 }
172
173 dedup := map[string]struct{}{}
174
175 offset := q.offset
176 limit := q.limit
177 hasLimit := limit < 0
dnj (Google) 2015/08/23 06:50:06 There's a limit if either (a) limit < 0, or (b) li
iannucci 2015/08/23 18:19:43 :) "https://cloud.google.com/appengine/docs/go/da
178
179 cursorPrefix := []byte(nil)
180 getCursorFn := func(suffix []byte) func() (ds.Cursor, error) {
181 return func() (ds.Cursor, error) {
182 if cursorPrefix == nil {
183 buf := &bytes.Buffer{}
184 _, err := cmpbin.WriteUint(buf, uint64(len(rq.su ffixFormat)))
185 memoryCorruption(err)
186
187 for _, col := range rq.suffixFormat {
188 err := serialize.WriteIndexColumn(buf, c ol)
189 memoryCorruption(err)
190 }
191 cursorPrefix = buf.Bytes()
192 }
193 return queryCursor(bjoin(cursorPrefix, suffix)), nil
194 }
195 }
196
197 idxs, err := getIndicies(rq, idx)
198 if err == errQueryDone {
199 return nil
200 } else if err != nil {
dnj (Google) 2015/08/23 06:50:06 No need for "else".
iannucci 2015/08/23 18:19:42 Done.
201 return err
202 }
203
204 multiIterate(idxs, func(suffix []byte) bool {
205 if offset > 0 {
206 offset--
207 return true
208 }
209 if hasLimit {
210 if limit <= 0 {
211 return false
212 }
213 limit--
214 }
215
216 rawData, data := parseSuffix(ns, rq.suffixFormat, suffix)
dnj (Google) 2015/08/23 06:50:06 I don't think "data" is a good name here. Maybe "d
iannucci 2015/08/23 18:19:42 Done.
217
218 keyProp := data[len(data)-1]
219 if keyProp.Type() != ds.PTKey {
220 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", data))
dnj (Google) 2015/08/23 06:50:06 %T?
iannucci 2015/08/23 18:19:43 Nah. Changed to keyProp tho.
221 }
222 keyRaw := string(rawData[len(rawData)-1])
223 if _, ok := dedup[keyRaw]; ok {
224 // dedup'd!
225 return true
226 }
227 dedup[keyRaw] = struct{}{}
228
229 return strategy.handle(
230 rawData, data, keyProp.Value().(ds.Key),
231 getCursorFn(suffix))
232 })
233
234 return nil
235 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698