OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package cloud | |
6 | |
7 import ( | |
8 "fmt" | |
9 "reflect" | |
10 "strings" | |
11 "time" | |
12 | |
13 "github.com/luci/luci-go/common/errors" | |
14 | |
15 ds "github.com/luci/gae/service/datastore" | |
16 infoS "github.com/luci/gae/service/info" | |
17 "google.golang.org/cloud/datastore" | |
18 | |
19 "golang.org/x/net/context" | |
20 ) | |
21 | |
22 type cloudDatastore struct { | |
23 client *datastore.Client | |
24 } | |
25 | |
26 func (cds *cloudDatastore) use(c context.Context) context.Context { | |
27 return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.Raw Interface { | |
28 inf := infoS.Get(ic) | |
29 if ns, ok := inf.GetNamespace(); ok { | |
30 ic = datastore.WithNamespace(ic, ns) | |
31 } | |
32 | |
33 bds := boundDatastore{ | |
34 Context: ic, | |
35 cloudDatastore: cds, | |
36 appID: inf.FullyQualifiedAppID(), | |
37 } | |
38 if wantTxn { | |
39 bds.transaction = datastoreTransaction(ic) | |
40 } | |
41 return &bds | |
42 }) | |
43 } | |
44 | |
45 // boundDatastore is a bound instance of the cloudDatastore installed in the | |
46 // Context. | |
47 type boundDatastore struct { | |
48 // Context is the bound user Context. It includes the datastore namespac e, if | |
49 // one is set. | |
50 context.Context | |
51 *cloudDatastore | |
52 | |
53 appID string | |
iannucci
2016/05/23 21:53:06
namespace too?
dnj (Google)
2016/07/01 02:25:55
I'm tracking that in Info service and applying it
| |
54 transaction *datastore.Transaction | |
55 } | |
56 | |
57 func (bds *boundDatastore) AllocateIDs(incomplete *ds.Key, n int) (int64, error) { | |
58 // AllocateIDs assumes that a contiguous ID space will be returned. The cloud | |
59 // datastore library does not offer this guarantee. | |
iannucci
2016/05/23 21:53:06
IMO, I would just change the high level AllocateID
dnj (Google)
2016/07/01 02:25:55
Done.
| |
60 // | |
61 // It is our *expectation* that the remote datastore API will return a | |
62 // contiguous set of IDs for a given entity type. We will panic if this | |
63 // expectation is violated. | |
64 keys := make([]*ds.Key, n) | |
65 for i := range keys { | |
66 keys[i] = incomplete | |
67 } | |
68 | |
69 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys. ..)) | |
70 if err != nil { | |
71 return -1, normalizeError(err) | |
72 } | |
73 | |
74 keys = bds.nativeKeysToGAE(nativeKeys...) | |
75 start := keys[0].IntID() | |
76 | |
77 // Assert that the allocated IDs are contiguous. | |
78 expected := start + 1 | |
79 for _, key := range keys[1:] { | |
80 if id := key.IntID(); id != expected { | |
81 panic(fmt.Errorf("non-contiugous key IDs returned (%d != %d)", id, expected)) | |
82 } | |
83 expected++ | |
84 } | |
85 | |
86 return start, nil | |
87 } | |
88 | |
89 func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts *ds.TransactionOptions) error { | |
90 if bds.transaction != nil { | |
91 return errors.New("nested transactions are not supported") | |
92 } | |
93 | |
94 // The cloud datastore SDK does not expose any transaction options. | |
iannucci
2016/05/23 21:53:06
well that's a bummer.
dnj (Google)
2016/07/01 02:25:55
Acknowledged.
| |
95 if opts != nil { | |
96 switch { | |
97 case opts.XG: | |
98 return errors.New("cross-group transactions are not supp orted") | |
99 case opts.Attempts != 0 && opts.Attempts != 3: | |
iannucci
2016/05/23 21:53:06
This is actually a client library responsibility.
dnj (Google)
2016/07/01 02:25:55
Oh good to know. Implemented in the same manner as
| |
100 return errors.New("setting transaction attempts is not s upported") | |
101 } | |
102 } | |
103 | |
104 _, err := bds.client.RunInTransaction(bds, func(tx *datastore.Transactio n) error { | |
105 return fn(withDatastoreTransaction(bds, tx)) | |
106 }) | |
107 return normalizeError(err) | |
108 } | |
109 | |
110 func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) { | |
111 cursor, err := datastore.DecodeCursor(s) | |
112 return cursor, normalizeError(err) | |
113 } | |
114 | |
115 func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error { | |
116 it := bds.client.Run(bds, bds.prepareNativeQuery(q)) | |
117 cursorFn := func() (ds.Cursor, error) { | |
118 return it.Cursor() | |
119 } | |
120 | |
121 for { | |
122 var npls *nativePropertyLoadSaver | |
123 if !q.KeysOnly() { | |
124 npls = bds.mkNPLS(nil) | |
125 } | |
126 nativeKey, err := it.Next(npls) | |
127 if err != nil { | |
128 if err == datastore.Done { | |
129 return nil | |
130 } | |
131 return normalizeError(err) | |
132 } | |
133 | |
134 if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, curso rFn); err != nil { | |
135 if err == ds.Stop { | |
136 return nil | |
137 } | |
138 return normalizeError(err) | |
139 } | |
140 } | |
141 } | |
142 | |
143 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { | |
144 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) | |
145 if err != nil { | |
146 return -1, normalizeError(err) | |
147 } | |
148 return int64(v), nil | |
149 } | |
150 | |
151 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error { | |
152 if err == nil { | |
153 for i := 0; i < amt; i++ { | |
154 if err := cb(i, nil); err != nil { | |
155 return err | |
156 } | |
157 } | |
158 return nil | |
159 } | |
160 | |
161 err = errors.Fix(err) | |
162 if me, ok := err.(errors.MultiError); ok { | |
163 for i, err := range me { | |
164 if err := cb(i, normalizeError(err)); err != nil { | |
165 return err | |
166 } | |
167 } | |
168 return nil | |
169 } | |
170 return normalizeError(err) | |
171 } | |
172 | |
173 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error { | |
174 nativeKeys := bds.gaeKeysToNative(keys...) | |
175 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) | |
176 for i := range nativePLS { | |
177 nativePLS[i] = bds.mkNPLS(nil) | |
178 } | |
179 | |
180 var err error | |
181 if tx := bds.transaction; tx != nil { | |
182 // Transactional GetMulti. | |
183 err = tx.GetMulti(nativeKeys, nativePLS) | |
184 } else { | |
185 // Non-transactional GetMulti. | |
186 err = bds.client.GetMulti(bds, nativeKeys, nativePLS) | |
187 } | |
188 | |
189 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error { | |
190 return cb(nativePLS[idx].pmap, err) | |
191 }) | |
192 } | |
193 | |
194 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds .PutMultiCB) error { | |
195 nativeKeys := bds.gaeKeysToNative(keys...) | |
196 nativePLS := make([]*nativePropertyLoadSaver, len(vals)) | |
197 for i := range nativePLS { | |
198 nativePLS[i] = bds.mkNPLS(vals[i]) | |
199 } | |
200 | |
201 var err error | |
202 if tx := bds.transaction; tx != nil { | |
203 // Transactional PutMulti. | |
204 // | |
205 // In order to simulate the presence of mid-transaction key allo cation, we | |
206 // will identify any incomplete keys and allocate IDs for them. This is | |
207 // potentially wasteful in the event o failed or retried transac tions, but | |
iannucci
2016/05/23 21:53:06
s/o/of
dnj (Google)
2016/07/01 02:25:55
Done.
| |
208 // it is required to maintain API compatibility with the datasto re | |
209 // interface. | |
210 var incompleteKeys []*datastore.Key | |
211 var incompleteKeyMap map[int]int | |
212 for i, k := range nativeKeys { | |
213 if k.Incomplete() { | |
214 if incompleteKeyMap == nil { | |
215 // Optimization: if there are any incomp lete keys, allocate room for | |
216 // the full range. | |
217 incompleteKeyMap = make(map[int]int, len (nativeKeys)-i) | |
218 incompleteKeys = make([]*datastore.Key, 0, len(nativeKeys)-i) | |
219 } | |
220 incompleteKeyMap[len(incompleteKeys)] = i | |
221 incompleteKeys = append(incompleteKeys, k) | |
222 } | |
223 } | |
224 if len(incompleteKeys) > 0 { | |
225 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey s) | |
226 if err != nil { | |
227 return err | |
228 } | |
229 for i, idKey := range idKeys { | |
230 nativeKeys[incompleteKeyMap[i]] = idKey | |
231 } | |
232 } | |
233 | |
234 _, err = tx.PutMulti(nativeKeys, nativePLS) | |
235 } else { | |
236 // Non-transactional PutMulti. | |
237 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS ) | |
238 } | |
239 | |
240 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro r { | |
241 if err == nil { | |
242 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) | |
243 } | |
244 return cb(nil, err) | |
245 }) | |
246 } | |
247 | |
248 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro r { | |
249 nativeKeys := bds.gaeKeysToNative(keys...) | |
250 | |
251 var err error | |
252 if tx := bds.transaction; tx != nil { | |
253 // Transactional DeleteMulti. | |
254 err = tx.DeleteMulti(nativeKeys) | |
255 } else { | |
256 // Non-transactional DeleteMulti. | |
257 err = bds.client.DeleteMulti(bds, nativeKeys) | |
258 } | |
259 | |
260 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error { | |
261 return cb(err) | |
262 }) | |
263 } | |
264 | |
265 func (bds *boundDatastore) Testable() ds.Testable { | |
266 return nil | |
267 } | |
268 | |
269 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore. Query { | |
270 nq := datastore.NewQuery(fq.Kind()) | |
271 if bds.transaction != nil { | |
272 nq = nq.Transaction(bds.transaction) | |
273 } | |
274 | |
275 // nativeFilter translates a filter field. If the translation fails, we' ll | |
276 // pass the result through to the underlying datastore and allow it to | |
277 // reject it. | |
278 nativeFilter := func(prop ds.Property) interface{} { | |
279 if np, err := bds.gaePropertyToNative("", []ds.Property{prop}); err == nil { | |
280 return np.Value | |
281 } | |
282 return prop.Value() | |
283 } | |
284 | |
285 // Equality filters. | |
286 for field, props := range fq.EqFilters() { | |
287 for _, prop := range props { | |
288 nq = nq.Filter(fmt.Sprintf("%s =", field), nativeFilter( prop)) | |
289 } | |
290 } | |
291 | |
292 // Inequality filters. | |
293 if ineq := fq.IneqFilterProp(); ineq != "" { | |
294 if field, op, prop := fq.IneqFilterLow(); field != "" { | |
295 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop)) | |
296 } | |
297 | |
298 if field, op, prop := fq.IneqFilterHigh(); field != "" { | |
299 nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFi lter(prop)) | |
300 } | |
301 } | |
302 | |
303 start, end := fq.Bounds() | |
304 if start != nil { | |
305 nq = nq.Start(start.(datastore.Cursor)) | |
306 } | |
307 if end != nil { | |
308 nq = nq.End(end.(datastore.Cursor)) | |
309 } | |
310 | |
311 if fq.Distinct() { | |
312 nq = nq.Distinct() | |
313 } | |
314 if fq.KeysOnly() { | |
315 nq = nq.KeysOnly() | |
316 } | |
317 if limit, ok := fq.Limit(); ok { | |
318 nq = nq.Limit(int(limit)) | |
319 } | |
320 if offset, ok := fq.Offset(); ok { | |
321 nq = nq.Offset(int(offset)) | |
322 } | |
323 if proj := fq.Project(); proj != nil { | |
324 nq = nq.Project(proj...) | |
325 } | |
326 if ancestor := fq.Ancestor(); ancestor != nil { | |
327 nq = nq.Ancestor(bds.gaeKeysToNative(ancestor)[0]) | |
328 } | |
329 if fq.EventuallyConsistent() { | |
330 nq = nq.EventualConsistency() | |
331 } | |
332 | |
333 for _, ic := range fq.Orders() { | |
334 prop := ic.Property | |
335 if ic.Descending { | |
336 prop = "-" + prop | |
337 } | |
338 nq = nq.Order(prop) | |
339 } | |
340 | |
341 return nq | |
342 } | |
343 | |
344 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver { | |
345 return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)} | |
346 } | |
347 | |
348 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property) (nativeProp datastore.Property, err error) { | |
349 nativeProp.Name = name | |
350 | |
351 nativeValues := make([]interface{}, len(props)) | |
352 for i, prop := range props { | |
353 switch pt := prop.Type(); pt { | |
354 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P TString, ds.PTFloat: | |
355 nativeValues[i] = prop.Value() | |
356 break | |
357 | |
358 case ds.PTKey: | |
359 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds. Key))[0] | |
360 | |
361 default: | |
362 err = fmt.Errorf("unsupported property type at %d: %v", i, pt) | |
363 return | |
364 } | |
365 } | |
366 | |
367 if len(nativeValues) == 1 { | |
368 nativeProp.Value = nativeValues[0] | |
369 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) | |
370 } else { | |
371 // We must always index list values. | |
372 nativeProp.Value = nativeValues | |
373 } | |
374 return | |
375 } | |
376 | |
377 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (n ame string, props []ds.Property, err error) { | |
378 name = nativeProp.Name | |
379 | |
380 var nativeValues []interface{} | |
381 // Slice of supported native type. Break this into a slice of datastore | |
382 // properties. | |
383 // | |
384 // It must be an []interface{}. | |
385 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice & & rv.Type().Elem().Kind() == reflect.Interface { | |
386 nativeValues = rv.Interface().([]interface{}) | |
387 } else { | |
388 nativeValues = []interface{}{nativeProp.Value} | |
389 } | |
390 | |
391 if len(nativeValues) == 0 { | |
392 return | |
393 } | |
394 | |
395 props = make([]ds.Property, len(nativeValues)) | |
396 for i, nv := range nativeValues { | |
397 switch nvt := nv.(type) { | |
398 case int64, bool, string, float64, []byte: | |
399 break | |
400 | |
401 case time.Time: | |
402 // Cloud datastore library returns local time. | |
403 nv = nvt.UTC() | |
404 | |
405 case *datastore.Key: | |
406 nv = bds.nativeKeysToGAE(nvt)[0] | |
407 | |
408 default: | |
409 err = fmt.Errorf("element %d has unsupported datastore.V alue type %T", i, nv) | |
410 return | |
411 } | |
412 | |
413 indexSetting := ds.ShouldIndex | |
414 if nativeProp.NoIndex { | |
415 indexSetting = ds.NoIndex | |
416 } | |
417 props[i].SetValue(nv, indexSetting) | |
418 } | |
419 return | |
420 } | |
421 | |
422 func (bds *boundDatastore) gaeKeysToNative(keys ...*ds.Key) []*datastore.Key { | |
423 nativeKeys := make([]*datastore.Key, len(keys)) | |
424 for i, key := range keys { | |
425 _, _, toks := key.Split() | |
426 | |
427 var nativeKey *datastore.Key | |
428 for _, tok := range toks { | |
429 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID , tok.IntID, nativeKey) | |
430 } | |
431 nativeKeys[i] = nativeKey | |
432 } | |
433 return nativeKeys | |
434 } | |
435 | |
436 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K ey { | |
437 keys := make([]*ds.Key, len(nativeKeys)) | |
438 toks := make([]ds.KeyTok, 1) | |
439 for i, nativeKey := range nativeKeys { | |
440 toks = toks[:0] | |
441 cur := nativeKey | |
442 for { | |
443 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c ur.ID(), StringID: cur.Name()}) | |
444 cur = cur.Parent() | |
445 if cur == nil { | |
446 break | |
447 } | |
448 } | |
449 | |
450 // Reverse "toks" so we have ancestor-to-child lineage. | |
451 for i := 0; i < len(toks)/2; i++ { | |
452 ri := len(toks) - i - 1 | |
453 toks[i], toks[ri] = toks[ri], toks[i] | |
454 } | |
455 keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks) | |
456 } | |
457 return keys | |
458 } | |
459 | |
460 // nativePropertyLoadSaver is a ds.PropertyMap which implements | |
461 // datastore.PropertyLoadSaver. | |
462 // | |
463 // It naturally converts between native and GAE properties and values. | |
464 type nativePropertyLoadSaver struct { | |
465 bds *boundDatastore | |
466 pmap ds.PropertyMap | |
467 } | |
468 | |
469 var _ datastore.PropertyLoadSaver = (*nativePropertyLoadSaver)(nil) | |
470 | |
471 func (npls *nativePropertyLoadSaver) Load(props []datastore.Property) error { | |
472 if npls.pmap == nil { | |
473 // Allocate for common case: one property per property name. | |
474 npls.pmap = make(ds.PropertyMap, len(props)) | |
475 } | |
476 | |
477 for _, nativeProp := range props { | |
478 name, props, err := npls.bds.nativePropertyToGAE(nativeProp) | |
479 if err != nil { | |
480 return err | |
481 } | |
482 npls.pmap[name] = append(npls.pmap[name], props...) | |
483 } | |
484 return nil | |
485 } | |
486 | |
487 func (npls *nativePropertyLoadSaver) Save() ([]datastore.Property, error) { | |
488 if len(npls.pmap) == 0 { | |
489 return nil, nil | |
490 } | |
491 | |
492 props := make([]datastore.Property, 0, len(npls.pmap)) | |
493 for name, plist := range npls.pmap { | |
494 // Strip meta. | |
495 if strings.HasPrefix(name, "$") { | |
496 continue | |
497 } | |
498 | |
499 nativeProp, err := npls.bds.gaePropertyToNative(name, plist) | |
500 if err != nil { | |
501 return nil, err | |
502 } | |
503 props = append(props, nativeProp) | |
504 } | |
505 return props, nil | |
506 } | |
507 | |
508 var datastoreTransactionKey = "*datastore.Transaction" | |
509 | |
510 func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) cont ext.Context { | |
511 return context.WithValue(c, &datastoreTransactionKey, tx) | |
512 } | |
513 | |
514 func datastoreTransaction(c context.Context) *datastore.Transaction { | |
515 if tx, ok := c.Value(&datastoreTransactionKey).(*datastore.Transaction); ok { | |
516 return tx | |
517 } | |
518 return nil | |
519 } | |
520 | |
521 func clonePropertyMap(pmap ds.PropertyMap) ds.PropertyMap { | |
522 if pmap == nil { | |
523 return nil | |
524 } | |
525 | |
526 clone := make(ds.PropertyMap, len(pmap)) | |
527 for k, props := range pmap { | |
528 clone[k] = append([]ds.Property(nil), props...) | |
529 } | |
530 return clone | |
531 } | |
532 | |
533 func normalizeError(err error) error { | |
534 switch err { | |
535 case datastore.ErrNoSuchEntity: | |
536 return ds.ErrNoSuchEntity | |
537 case datastore.ErrConcurrentTransaction: | |
538 return ds.ErrConcurrentTransaction | |
539 case datastore.ErrInvalidKey: | |
540 return ds.ErrInvalidKey | |
541 default: | |
542 return err | |
543 } | |
544 } | |
OLD | NEW |