OLD | NEW |
---|---|
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package cloud | 5 package cloud |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "reflect" | 9 "reflect" |
10 "strings" | 10 "strings" |
11 "time" | 11 "time" |
12 | 12 |
13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
14 | 14 |
15 ds "github.com/luci/gae/service/datastore" | |
16 "github.com/luci/gae/service/info" | |
17 | |
15 "cloud.google.com/go/datastore" | 18 "cloud.google.com/go/datastore" |
16 ds "github.com/luci/gae/service/datastore" | |
17 infoS "github.com/luci/gae/service/info" | |
18 | 19 |
19 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
20 ) | 21 ) |
21 | 22 |
22 type cloudDatastore struct { | 23 type cloudDatastore struct { |
23 client *datastore.Client | 24 client *datastore.Client |
24 } | 25 } |
25 | 26 |
26 func (cds *cloudDatastore) use(c context.Context) context.Context { | 27 func (cds *cloudDatastore) use(c context.Context) context.Context { |
27 » return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.Raw Interface { | 28 » return ds.SetRawFactory(c, func(ic context.Context) ds.RawInterface { |
28 » » inf := infoS.Get(ic) | 29 » » if ns := info.GetNamespace(ic); ns != "" { |
29 » » if ns, ok := inf.GetNamespace(); ok { | |
30 ic = datastore.WithNamespace(ic, ns) | 30 ic = datastore.WithNamespace(ic, ns) |
31 } | 31 } |
32 | 32 |
33 » » bds := boundDatastore{ | 33 » » return &boundDatastore{ |
34 Context: ic, | 34 Context: ic, |
35 cloudDatastore: cds, | 35 cloudDatastore: cds, |
36 » » » appID: inf.FullyQualifiedAppID(), | 36 » » » transaction: datastoreTransaction(ic), |
37 » » » kc: ds.GetKeyContext(ic), | |
37 } | 38 } |
38 if wantTxn { | |
39 bds.transaction = datastoreTransaction(ic) | |
40 } | |
41 return &bds | |
42 }) | 39 }) |
43 } | 40 } |
44 | 41 |
45 // boundDatastore is a bound instance of the cloudDatastore installed in the | 42 // boundDatastore is a bound instance of the cloudDatastore installed in the |
46 // Context. | 43 // Context. |
47 type boundDatastore struct { | 44 type boundDatastore struct { |
45 context.Context | |
46 | |
48 // Context is the bound user Context. It includes the datastore namespac e, if | 47 // Context is the bound user Context. It includes the datastore namespac e, if |
49 // one is set. | 48 // one is set. |
50 context.Context | |
51 *cloudDatastore | 49 *cloudDatastore |
52 | 50 |
53 appID string | |
54 transaction *datastore.Transaction | 51 transaction *datastore.Transaction |
52 kc ds.KeyContext | |
55 } | 53 } |
56 | 54 |
57 func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { | 55 func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { |
58 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys. ..)) | 56 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys. ..)) |
59 if err != nil { | 57 if err != nil { |
60 return normalizeError(err) | 58 return normalizeError(err) |
61 } | 59 } |
62 | 60 |
63 keys = bds.nativeKeysToGAE(nativeKeys...) | 61 keys = bds.nativeKeysToGAE(nativeKeys...) |
64 for _, key := range keys { | 62 for _, key := range keys { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
112 npls = bds.mkNPLS(nil) | 110 npls = bds.mkNPLS(nil) |
113 } | 111 } |
114 nativeKey, err := it.Next(npls) | 112 nativeKey, err := it.Next(npls) |
115 if err != nil { | 113 if err != nil { |
116 if err == datastore.Done { | 114 if err == datastore.Done { |
117 return nil | 115 return nil |
118 } | 116 } |
119 return normalizeError(err) | 117 return normalizeError(err) |
120 } | 118 } |
121 | 119 |
122 » » if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, curso rFn); err != nil { | 120 » » var pmap ds.PropertyMap |
dnj
2016/09/01 15:25:39
I think this was a bug.
iannucci
2016/09/16 01:01:13
K
| |
121 » » if npls != nil { | |
122 » » » pmap = npls.pmap | |
123 » » } | |
124 » » if err := cb(bds.nativeKeysToGAE(nativeKey)[0], pmap, cursorFn); err != nil { | |
123 if err == ds.Stop { | 125 if err == ds.Stop { |
124 return nil | 126 return nil |
125 } | 127 } |
126 return normalizeError(err) | 128 return normalizeError(err) |
127 } | 129 } |
128 } | 130 } |
129 } | 131 } |
130 | 132 |
131 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { | 133 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { |
132 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) | 134 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) |
(...skipping 26 matching lines...) Expand all Loading... | |
159 } | 161 } |
160 | 162 |
161 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error { | 163 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error { |
162 nativeKeys := bds.gaeKeysToNative(keys...) | 164 nativeKeys := bds.gaeKeysToNative(keys...) |
163 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) | 165 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) |
164 for i := range nativePLS { | 166 for i := range nativePLS { |
165 nativePLS[i] = bds.mkNPLS(nil) | 167 nativePLS[i] = bds.mkNPLS(nil) |
166 } | 168 } |
167 | 169 |
168 var err error | 170 var err error |
169 » if tx := bds.transaction; tx != nil { | 171 » if bds.transaction != nil { |
170 // Transactional GetMulti. | 172 // Transactional GetMulti. |
171 » » err = tx.GetMulti(nativeKeys, nativePLS) | 173 » » err = bds.transaction.GetMulti(nativeKeys, nativePLS) |
172 } else { | 174 } else { |
173 // Non-transactional GetMulti. | 175 // Non-transactional GetMulti. |
174 err = bds.client.GetMulti(bds, nativeKeys, nativePLS) | 176 err = bds.client.GetMulti(bds, nativeKeys, nativePLS) |
175 } | 177 } |
176 | 178 |
177 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error { | 179 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error { |
178 return cb(nativePLS[idx].pmap, err) | 180 return cb(nativePLS[idx].pmap, err) |
179 }) | 181 }) |
180 } | 182 } |
181 | 183 |
182 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds .NewKeyCB) error { | 184 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds .NewKeyCB) error { |
183 nativeKeys := bds.gaeKeysToNative(keys...) | 185 nativeKeys := bds.gaeKeysToNative(keys...) |
184 nativePLS := make([]*nativePropertyLoadSaver, len(vals)) | 186 nativePLS := make([]*nativePropertyLoadSaver, len(vals)) |
185 for i := range nativePLS { | 187 for i := range nativePLS { |
186 nativePLS[i] = bds.mkNPLS(vals[i]) | 188 nativePLS[i] = bds.mkNPLS(vals[i]) |
187 } | 189 } |
188 | 190 |
189 var err error | 191 var err error |
190 » if tx := bds.transaction; tx != nil { | 192 » if bds.transaction != nil { |
191 // Transactional PutMulti. | 193 // Transactional PutMulti. |
192 // | 194 // |
193 // In order to simulate the presence of mid-transaction key allo cation, we | 195 // In order to simulate the presence of mid-transaction key allo cation, we |
194 // will identify any incomplete keys and allocate IDs for them. This is | 196 // will identify any incomplete keys and allocate IDs for them. This is |
195 // potentially wasteful in the event of failed or retried transa ctions, but | 197 // potentially wasteful in the event of failed or retried transa ctions, but |
196 // it is required to maintain API compatibility with the datasto re | 198 // it is required to maintain API compatibility with the datasto re |
197 // interface. | 199 // interface. |
198 var incompleteKeys []*datastore.Key | 200 var incompleteKeys []*datastore.Key |
199 var incompleteKeyMap map[int]int | 201 var incompleteKeyMap map[int]int |
200 for i, k := range nativeKeys { | 202 for i, k := range nativeKeys { |
(...skipping 11 matching lines...) Expand all Loading... | |
212 if len(incompleteKeys) > 0 { | 214 if len(incompleteKeys) > 0 { |
213 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey s) | 215 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey s) |
214 if err != nil { | 216 if err != nil { |
215 return err | 217 return err |
216 } | 218 } |
217 for i, idKey := range idKeys { | 219 for i, idKey := range idKeys { |
218 nativeKeys[incompleteKeyMap[i]] = idKey | 220 nativeKeys[incompleteKeyMap[i]] = idKey |
219 } | 221 } |
220 } | 222 } |
221 | 223 |
222 » » _, err = tx.PutMulti(nativeKeys, nativePLS) | 224 » » _, err = bds.transaction.PutMulti(nativeKeys, nativePLS) |
223 } else { | 225 } else { |
224 // Non-transactional PutMulti. | 226 // Non-transactional PutMulti. |
225 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS ) | 227 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS ) |
226 } | 228 } |
227 | 229 |
228 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro r { | 230 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro r { |
229 if err == nil { | 231 if err == nil { |
230 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) | 232 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) |
231 } | 233 } |
232 return cb(nil, err) | 234 return cb(nil, err) |
233 }) | 235 }) |
234 } | 236 } |
235 | 237 |
236 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro r { | 238 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro r { |
237 nativeKeys := bds.gaeKeysToNative(keys...) | 239 nativeKeys := bds.gaeKeysToNative(keys...) |
238 | 240 |
239 var err error | 241 var err error |
240 » if tx := bds.transaction; tx != nil { | 242 » if bds.transaction != nil { |
241 // Transactional DeleteMulti. | 243 // Transactional DeleteMulti. |
242 » » err = tx.DeleteMulti(nativeKeys) | 244 » » err = bds.transaction.DeleteMulti(nativeKeys) |
243 } else { | 245 } else { |
244 // Non-transactional DeleteMulti. | 246 // Non-transactional DeleteMulti. |
245 err = bds.client.DeleteMulti(bds, nativeKeys) | 247 err = bds.client.DeleteMulti(bds, nativeKeys) |
246 } | 248 } |
247 | 249 |
248 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error { | 250 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error { |
249 return cb(err) | 251 return cb(err) |
250 }) | 252 }) |
251 } | 253 } |
252 | 254 |
253 func (bds *boundDatastore) Testable() ds.Testable { | 255 func (bds *boundDatastore) WithTransaction(t ds.Transaction) context.Context { |
iannucci
2016/09/16 01:01:13
let's just make this WithoutTransaction() (in the
dnj
2016/09/16 05:44:42
Done.
| |
254 » return nil | 256 » if t != nil { |
257 » » // Manually set the Transaction. | |
258 » » return withDatastoreTransaction(bds, t.(*datastore.Transaction)) | |
259 » } | |
260 » return withDatastoreTransaction(bds, nil) | |
255 } | 261 } |
256 | 262 |
263 func (bds *boundDatastore) CurrentTransaction() ds.Transaction { return bds.tran saction } | |
264 | |
265 func (bds *boundDatastore) GetTestable() ds.Testable { return nil } | |
266 | |
257 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore. Query { | 267 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore. Query { |
258 nq := datastore.NewQuery(fq.Kind()) | 268 nq := datastore.NewQuery(fq.Kind()) |
259 if bds.transaction != nil { | 269 if bds.transaction != nil { |
260 nq = nq.Transaction(bds.transaction) | 270 nq = nq.Transaction(bds.transaction) |
261 } | 271 } |
262 | 272 |
263 // nativeFilter translates a filter field. If the translation fails, we' ll | 273 // nativeFilter translates a filter field. If the translation fails, we' ll |
264 // pass the result through to the underlying datastore and allow it to | 274 // pass the result through to the underlying datastore and allow it to |
265 // reject it. | 275 // reject it. |
266 nativeFilter := func(prop ds.Property) interface{} { | 276 nativeFilter := func(prop ds.Property) interface{} { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
323 if ic.Descending { | 333 if ic.Descending { |
324 prop = "-" + prop | 334 prop = "-" + prop |
325 } | 335 } |
326 nq = nq.Order(prop) | 336 nq = nq.Order(prop) |
327 } | 337 } |
328 | 338 |
329 return nq | 339 return nq |
330 } | 340 } |
331 | 341 |
332 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver { | 342 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver { |
333 » return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)} | 343 » return &nativePropertyLoadSaver{ |
344 » » bds: bds, | |
345 » » pmap: clonePropertyMap(base), | |
346 » } | |
334 } | 347 } |
335 | 348 |
336 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property) (nativeProp datastore.Property, err error) { | 349 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property) ( |
350 » nativeProp datastore.Property, err error) { | |
351 | |
337 nativeProp.Name = name | 352 nativeProp.Name = name |
338 | 353 |
339 nativeValues := make([]interface{}, len(props)) | 354 nativeValues := make([]interface{}, len(props)) |
340 for i, prop := range props { | 355 for i, prop := range props { |
341 switch pt := prop.Type(); pt { | 356 switch pt := prop.Type(); pt { |
342 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P TString, ds.PTFloat: | 357 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P TString, ds.PTFloat: |
343 nativeValues[i] = prop.Value() | 358 nativeValues[i] = prop.Value() |
344 break | 359 break |
345 | 360 |
346 case ds.PTKey: | 361 case ds.PTKey: |
347 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds. Key))[0] | 362 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds. Key))[0] |
348 | 363 |
349 default: | 364 default: |
350 err = fmt.Errorf("unsupported property type at %d: %v", i, pt) | 365 err = fmt.Errorf("unsupported property type at %d: %v", i, pt) |
351 return | 366 return |
352 } | 367 } |
353 } | 368 } |
354 | 369 |
355 if len(nativeValues) == 1 { | 370 if len(nativeValues) == 1 { |
356 nativeProp.Value = nativeValues[0] | 371 nativeProp.Value = nativeValues[0] |
357 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) | 372 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) |
358 } else { | 373 } else { |
359 // We must always index list values. | 374 // We must always index list values. |
360 nativeProp.Value = nativeValues | 375 nativeProp.Value = nativeValues |
361 } | 376 } |
362 return | 377 return |
363 } | 378 } |
364 | 379 |
365 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (n ame string, props []ds.Property, err error) { | 380 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) ( |
381 » name string, props []ds.Property, err error) { | |
382 | |
366 name = nativeProp.Name | 383 name = nativeProp.Name |
367 | 384 |
368 var nativeValues []interface{} | 385 var nativeValues []interface{} |
369 // Slice of supported native type. Break this into a slice of datastore | 386 // Slice of supported native type. Break this into a slice of datastore |
370 // properties. | 387 // properties. |
371 // | 388 // |
372 // It must be an []interface{}. | 389 // It must be an []interface{}. |
373 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice & & rv.Type().Elem().Kind() == reflect.Interface { | 390 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice & & rv.Type().Elem().Kind() == reflect.Interface { |
374 nativeValues = rv.Interface().([]interface{}) | 391 nativeValues = rv.Interface().([]interface{}) |
375 } else { | 392 } else { |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
417 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID , tok.IntID, nativeKey) | 434 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID , tok.IntID, nativeKey) |
418 } | 435 } |
419 nativeKeys[i] = nativeKey | 436 nativeKeys[i] = nativeKey |
420 } | 437 } |
421 return nativeKeys | 438 return nativeKeys |
422 } | 439 } |
423 | 440 |
424 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K ey { | 441 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K ey { |
425 keys := make([]*ds.Key, len(nativeKeys)) | 442 keys := make([]*ds.Key, len(nativeKeys)) |
426 toks := make([]ds.KeyTok, 1) | 443 toks := make([]ds.KeyTok, 1) |
444 | |
445 kc := bds.kc | |
427 for i, nativeKey := range nativeKeys { | 446 for i, nativeKey := range nativeKeys { |
428 toks = toks[:0] | 447 toks = toks[:0] |
429 cur := nativeKey | 448 cur := nativeKey |
430 for { | 449 for { |
431 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c ur.ID(), StringID: cur.Name()}) | 450 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c ur.ID(), StringID: cur.Name()}) |
432 cur = cur.Parent() | 451 cur = cur.Parent() |
433 if cur == nil { | 452 if cur == nil { |
434 break | 453 break |
435 } | 454 } |
436 } | 455 } |
437 | 456 |
438 // Reverse "toks" so we have ancestor-to-child lineage. | 457 // Reverse "toks" so we have ancestor-to-child lineage. |
439 for i := 0; i < len(toks)/2; i++ { | 458 for i := 0; i < len(toks)/2; i++ { |
440 ri := len(toks) - i - 1 | 459 ri := len(toks) - i - 1 |
441 toks[i], toks[ri] = toks[ri], toks[i] | 460 toks[i], toks[ri] = toks[ri], toks[i] |
442 } | 461 } |
443 » » keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks) | 462 » » kc.Namespace = nativeKey.Namespace() |
463 » » keys[i] = kc.NewKeyToks(toks) | |
444 } | 464 } |
445 return keys | 465 return keys |
446 } | 466 } |
447 | 467 |
448 // nativePropertyLoadSaver is a ds.PropertyMap which implements | 468 // nativePropertyLoadSaver is a ds.PropertyMap which implements |
449 // datastore.PropertyLoadSaver. | 469 // datastore.PropertyLoadSaver. |
450 // | 470 // |
451 // It naturally converts between native and GAE properties and values. | 471 // It naturally converts between native and GAE properties and values. |
452 type nativePropertyLoadSaver struct { | 472 type nativePropertyLoadSaver struct { |
453 bds *boundDatastore | 473 bds *boundDatastore |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
523 case datastore.ErrNoSuchEntity: | 543 case datastore.ErrNoSuchEntity: |
524 return ds.ErrNoSuchEntity | 544 return ds.ErrNoSuchEntity |
525 case datastore.ErrConcurrentTransaction: | 545 case datastore.ErrConcurrentTransaction: |
526 return ds.ErrConcurrentTransaction | 546 return ds.ErrConcurrentTransaction |
527 case datastore.ErrInvalidKey: | 547 case datastore.ErrInvalidKey: |
528 return ds.ErrInvalidKey | 548 return ds.ErrInvalidKey |
529 default: | 549 default: |
530 return err | 550 return err |
531 } | 551 } |
532 } | 552 } |
OLD | NEW |