OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 | |
11 ds "github.com/luci/gae/service/datastore" | |
12 "github.com/luci/gae/service/datastore/serialize" | |
13 "github.com/luci/luci-go/common/cmpbin" | |
14 ) | |
15 | |
16 type queryStrategy interface { | |
17 // handle applies the strategy to the embedded user callback. | |
18 // - rawData is the slice of encoded Properties from the index row | |
19 // (correctly de-inverted). | |
20 // - data is the slice of decoded Properties from the index row | |
21 // - key is the decoded Key from the index row (the last item in rawDa ta and | |
22 // data) | |
23 // - gc is the getCursor function to be passed to the user's callback | |
24 handle(rawData [][]byte, data []ds.Property, key ds.Key, gc func() (ds.C ursor, error)) bool | |
25 } | |
26 | |
27 type projectionLookup struct { | |
28 suffixIndex int | |
29 propertyName string | |
30 } | |
31 | |
32 type projectionStrategy struct { | |
33 cb ds.RawRunCB | |
34 | |
35 project []projectionLookup | |
36 distinct map[string]struct{} | |
37 } | |
38 | |
39 func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) query Strategy { | |
40 projectionLookups := make([]projectionLookup, len(q.project)) | |
41 for i, prop := range q.project { | |
42 projectionLookups[i].propertyName = prop | |
43 foundIt := false | |
44 for j, col := range rq.suffixFormat { | |
45 if col.Property == prop { | |
46 projectionLookups[i].suffixIndex = j | |
47 foundIt = true | |
48 break | |
49 } | |
50 } | |
51 if !foundIt { | |
52 impossible(fmt.Errorf("planning a strategy for an unfulf illable query?")) | |
53 } | |
54 } | |
55 ret := &projectionStrategy{cb: cb, project: projectionLookups} | |
56 if q.distinct { | |
57 ret.distinct = map[string]struct{}{} | |
58 } | |
59 return ret | |
60 } | |
61 | |
62 func (s *projectionStrategy) handle(rawData [][]byte, suffixData []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { | |
63 projectedRaw := [][]byte(nil) | |
64 if s.distinct != nil { | |
65 projectedRaw = make([][]byte, len(suffixData)) | |
66 } | |
67 pmap := make(ds.PropertyMap, len(s.project)) | |
68 for i, p := range s.project { | |
69 if s.distinct != nil { | |
70 projectedRaw[i] = rawData[p.suffixIndex] | |
71 } | |
72 pmap[p.propertyName] = []ds.Property{suffixData[p.suffixIndex]} | |
73 } | |
74 if s.distinct != nil { | |
75 rawString := string(bjoin(projectedRaw...)) | |
76 if _, ok := s.distinct[rawString]; ok { | |
77 return true | |
78 } | |
79 s.distinct[rawString] = struct{}{} | |
80 } | |
81 return s.cb(key, pmap, gc) | |
82 } | |
83 | |
84 type keysOnlyStrategy ds.RawRunCB | |
85 | |
86 func (s keysOnlyStrategy) handle(_ [][]byte, _ []ds.Property, key ds.Key, gc fun c() (ds.Cursor, error)) bool { | |
87 return ds.RawRunCB(s)(key, nil, gc) | |
88 } | |
89 | |
90 type normalStrategy struct { | |
91 cb ds.RawRunCB | |
92 | |
93 ns string | |
94 head *memCollection | |
95 } | |
96 | |
97 func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy { | |
98 coll := head.GetCollection("ents:" + ns) | |
99 if coll == nil { | |
100 return nil | |
101 } | |
102 return &normalStrategy{cb, ns, coll} | |
103 } | |
104 | |
105 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, g c func() (ds.Cursor, error)) bool { | |
106 rawEnt := s.head.Get(rawData[len(rawData)-1][1:]) | |
107 if rawEnt == nil { | |
108 // entity doesn't exist at head | |
109 return true | |
110 } | |
111 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, globalAppID, s.ns) | |
112 memoryCorruption(err) | |
113 | |
114 return s.cb(key, pm, gc) | |
115 } | |
116 | |
117 func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *mem Store) queryStrategy { | |
118 if q.keysOnly { | |
119 return keysOnlyStrategy(cb) | |
120 } else if len(q.project) > 0 { | |
121 return newProjectionStrategy(q, rq, cb) | |
dnj (Google)
2015/08/23 06:50:06
No need for "else" here.
iannucci
2015/08/23 18:19:43
Done.
| |
122 } | |
123 return newNormalStrategy(rq.ns, cb, head) | |
124 } | |
125 | |
126 func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte) (raw [ ][]byte, decoded []ds.Property) { | |
127 buf := serialize.Invertible(bytes.NewBuffer(suffix)) | |
128 decoded = make([]ds.Property, len(suffixFormat)) | |
129 raw = make([][]byte, len(suffixFormat)) | |
130 | |
131 err := error(nil) | |
132 prevOffset := 0 | |
133 for i := range decoded { | |
134 needInvert := suffixFormat[i].Direction == ds.DESCENDING | |
135 | |
136 buf.SetInvert(needInvert) | |
dnj (Google)
2015/08/23 06:50:06
Just inline the condition here: buf.SetInvert(suff
iannucci
2015/08/23 18:19:42
Need this below (line 141) as well
| |
137 decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, globalAppID, ns) | |
138 memoryCorruption(err) | |
139 | |
140 offset := len(suffix) - buf.Len() | |
141 if needInvert { | |
142 raw[i] = invert(suffix[prevOffset:offset]) | |
dnj (Google)
2015/08/23 06:50:06
Consider walking suffix:
suffix = suffix[:offset]
iannucci
2015/08/23 18:19:42
ya. done
| |
143 } else { | |
144 raw[i] = suffix[prevOffset:offset] | |
145 } | |
146 prevOffset = offset | |
147 } | |
148 | |
149 return | |
150 } | |
151 | |
152 func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error { | |
153 q, ok := origQ.(*queryImpl) | |
154 if !ok { | |
155 return fmt.Errorf("unsupported Query implementation: %#v", q) | |
dnj (Google)
2015/08/23 06:50:06
%T?
iannucci
2015/08/23 18:19:43
Done.
| |
156 } | |
157 | |
158 rq, err := q.reduce(ns, isTxn) | |
159 if err != nil { | |
160 if err == errQueryDone { | |
161 return nil | |
162 } | |
163 return err | |
164 } | |
165 | |
166 strategy := pickQueryStrategy(q, rq, cb, head) | |
167 if strategy == nil { | |
168 // e.g. the normalStrategy found that there were NO entities in the current | |
169 // namespace. | |
170 return nil | |
171 } | |
172 | |
173 dedup := map[string]struct{}{} | |
174 | |
175 offset := q.offset | |
176 limit := q.limit | |
177 hasLimit := limit < 0 | |
dnj (Google)
2015/08/23 06:50:06
There's a limit if either (a) limit < 0, or (b) li
iannucci
2015/08/23 18:19:43
:)
"https://cloud.google.com/appengine/docs/go/da
| |
178 | |
179 cursorPrefix := []byte(nil) | |
180 getCursorFn := func(suffix []byte) func() (ds.Cursor, error) { | |
181 return func() (ds.Cursor, error) { | |
182 if cursorPrefix == nil { | |
183 buf := &bytes.Buffer{} | |
184 _, err := cmpbin.WriteUint(buf, uint64(len(rq.su ffixFormat))) | |
185 memoryCorruption(err) | |
186 | |
187 for _, col := range rq.suffixFormat { | |
188 err := serialize.WriteIndexColumn(buf, c ol) | |
189 memoryCorruption(err) | |
190 } | |
191 cursorPrefix = buf.Bytes() | |
192 } | |
193 return queryCursor(bjoin(cursorPrefix, suffix)), nil | |
194 } | |
195 } | |
196 | |
197 idxs, err := getIndicies(rq, idx) | |
198 if err == errQueryDone { | |
199 return nil | |
200 } else if err != nil { | |
dnj (Google)
2015/08/23 06:50:06
No need for "else".
iannucci
2015/08/23 18:19:42
Done.
| |
201 return err | |
202 } | |
203 | |
204 multiIterate(idxs, func(suffix []byte) bool { | |
205 if offset > 0 { | |
206 offset-- | |
207 return true | |
208 } | |
209 if hasLimit { | |
210 if limit <= 0 { | |
211 return false | |
212 } | |
213 limit-- | |
214 } | |
215 | |
216 rawData, data := parseSuffix(ns, rq.suffixFormat, suffix) | |
dnj (Google)
2015/08/23 06:50:06
I don't think "data" is a good name here. Maybe "d
iannucci
2015/08/23 18:19:42
Done.
| |
217 | |
218 keyProp := data[len(data)-1] | |
219 if keyProp.Type() != ds.PTKey { | |
220 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", data)) | |
dnj (Google)
2015/08/23 06:50:06
%T?
iannucci
2015/08/23 18:19:43
Nah. Changed to keyProp tho.
| |
221 } | |
222 keyRaw := string(rawData[len(rawData)-1]) | |
223 if _, ok := dedup[keyRaw]; ok { | |
224 // dedup'd! | |
225 return true | |
226 } | |
227 dedup[keyRaw] = struct{}{} | |
228 | |
229 return strategy.handle( | |
230 rawData, data, keyProp.Value().(ds.Key), | |
231 getCursorFn(suffix)) | |
232 }) | |
233 | |
234 return nil | |
235 } | |
OLD | NEW |