| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package cloud | 5 package cloud |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "reflect" | 9 "reflect" |
| 10 "strings" | 10 "strings" |
| 11 "time" | 11 "time" |
| 12 | 12 |
| 13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 14 | 14 |
| 15 ds "github.com/luci/gae/service/datastore" |
| 16 "github.com/luci/gae/service/info" |
| 17 |
| 15 "cloud.google.com/go/datastore" | 18 "cloud.google.com/go/datastore" |
| 16 ds "github.com/luci/gae/service/datastore" | |
| 17 infoS "github.com/luci/gae/service/info" | |
| 18 | 19 |
| 19 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 20 ) | 21 ) |
| 21 | 22 |
| 22 type cloudDatastore struct { | 23 type cloudDatastore struct { |
| 23 client *datastore.Client | 24 client *datastore.Client |
| 24 } | 25 } |
| 25 | 26 |
| 26 func (cds *cloudDatastore) use(c context.Context) context.Context { | 27 func (cds *cloudDatastore) use(c context.Context) context.Context { |
| 27 » return ds.SetRawFactory(c, func(ic context.Context, wantTxn bool) ds.Raw
Interface { | 28 » return ds.SetRawFactory(c, func(ic context.Context) ds.RawInterface { |
| 28 » » inf := infoS.Get(ic) | 29 » » if ns := info.GetNamespace(ic); ns != "" { |
| 29 » » if ns, ok := inf.GetNamespace(); ok { | |
| 30 ic = datastore.WithNamespace(ic, ns) | 30 ic = datastore.WithNamespace(ic, ns) |
| 31 } | 31 } |
| 32 | 32 |
| 33 » » bds := boundDatastore{ | 33 » » return &boundDatastore{ |
| 34 Context: ic, | 34 Context: ic, |
| 35 cloudDatastore: cds, | 35 cloudDatastore: cds, |
| 36 » » » appID: inf.FullyQualifiedAppID(), | 36 » » » transaction: datastoreTransaction(ic), |
| 37 » » » kc: ds.GetKeyContext(ic), |
| 37 } | 38 } |
| 38 if wantTxn { | |
| 39 bds.transaction = datastoreTransaction(ic) | |
| 40 } | |
| 41 return &bds | |
| 42 }) | 39 }) |
| 43 } | 40 } |
| 44 | 41 |
| 45 // boundDatastore is a bound instance of the cloudDatastore installed in the | 42 // boundDatastore is a bound instance of the cloudDatastore installed in the |
| 46 // Context. | 43 // Context. |
| 47 type boundDatastore struct { | 44 type boundDatastore struct { |
| 45 context.Context |
| 46 |
| 48 // Context is the bound user Context. It includes the datastore namespac
e, if | 47 // Context is the bound user Context. It includes the datastore namespac
e, if |
| 49 // one is set. | 48 // one is set. |
| 50 context.Context | |
| 51 *cloudDatastore | 49 *cloudDatastore |
| 52 | 50 |
| 53 appID string | |
| 54 transaction *datastore.Transaction | 51 transaction *datastore.Transaction |
| 52 kc ds.KeyContext |
| 55 } | 53 } |
| 56 | 54 |
| 57 func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { | 55 func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { |
| 58 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys.
..)) | 56 nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys.
..)) |
| 59 if err != nil { | 57 if err != nil { |
| 60 return normalizeError(err) | 58 return normalizeError(err) |
| 61 } | 59 } |
| 62 | 60 |
| 63 keys = bds.nativeKeysToGAE(nativeKeys...) | 61 keys = bds.nativeKeysToGAE(nativeKeys...) |
| 64 for _, key := range keys { | 62 for _, key := range keys { |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 112 npls = bds.mkNPLS(nil) | 110 npls = bds.mkNPLS(nil) |
| 113 } | 111 } |
| 114 nativeKey, err := it.Next(npls) | 112 nativeKey, err := it.Next(npls) |
| 115 if err != nil { | 113 if err != nil { |
| 116 if err == datastore.Done { | 114 if err == datastore.Done { |
| 117 return nil | 115 return nil |
| 118 } | 116 } |
| 119 return normalizeError(err) | 117 return normalizeError(err) |
| 120 } | 118 } |
| 121 | 119 |
| 122 » » if err := cb(bds.nativeKeysToGAE(nativeKey)[0], npls.pmap, curso
rFn); err != nil { | 120 » » var pmap ds.PropertyMap |
| 121 » » if npls != nil { |
| 122 » » » pmap = npls.pmap |
| 123 » » } |
| 124 » » if err := cb(bds.nativeKeysToGAE(nativeKey)[0], pmap, cursorFn);
err != nil { |
| 123 if err == ds.Stop { | 125 if err == ds.Stop { |
| 124 return nil | 126 return nil |
| 125 } | 127 } |
| 126 return normalizeError(err) | 128 return normalizeError(err) |
| 127 } | 129 } |
| 128 } | 130 } |
| 129 } | 131 } |
| 130 | 132 |
| 131 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { | 133 func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) { |
| 132 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) | 134 v, err := bds.client.Count(bds, bds.prepareNativeQuery(q)) |
| (...skipping 26 matching lines...) Expand all Loading... |
| 159 } | 161 } |
| 160 | 162 |
| 161 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb
ds.GetMultiCB) error { | 163 func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb
ds.GetMultiCB) error { |
| 162 nativeKeys := bds.gaeKeysToNative(keys...) | 164 nativeKeys := bds.gaeKeysToNative(keys...) |
| 163 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) | 165 nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys)) |
| 164 for i := range nativePLS { | 166 for i := range nativePLS { |
| 165 nativePLS[i] = bds.mkNPLS(nil) | 167 nativePLS[i] = bds.mkNPLS(nil) |
| 166 } | 168 } |
| 167 | 169 |
| 168 var err error | 170 var err error |
| 169 » if tx := bds.transaction; tx != nil { | 171 » if bds.transaction != nil { |
| 170 // Transactional GetMulti. | 172 // Transactional GetMulti. |
| 171 » » err = tx.GetMulti(nativeKeys, nativePLS) | 173 » » err = bds.transaction.GetMulti(nativeKeys, nativePLS) |
| 172 } else { | 174 } else { |
| 173 // Non-transactional GetMulti. | 175 // Non-transactional GetMulti. |
| 174 err = bds.client.GetMulti(bds, nativeKeys, nativePLS) | 176 err = bds.client.GetMulti(bds, nativeKeys, nativePLS) |
| 175 } | 177 } |
| 176 | 178 |
| 177 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error
{ | 179 return idxCallbacker(err, len(nativePLS), func(idx int, err error) error
{ |
| 178 return cb(nativePLS[idx].pmap, err) | 180 return cb(nativePLS[idx].pmap, err) |
| 179 }) | 181 }) |
| 180 } | 182 } |
| 181 | 183 |
| 182 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds
.NewKeyCB) error { | 184 func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds
.NewKeyCB) error { |
| 183 nativeKeys := bds.gaeKeysToNative(keys...) | 185 nativeKeys := bds.gaeKeysToNative(keys...) |
| 184 nativePLS := make([]*nativePropertyLoadSaver, len(vals)) | 186 nativePLS := make([]*nativePropertyLoadSaver, len(vals)) |
| 185 for i := range nativePLS { | 187 for i := range nativePLS { |
| 186 nativePLS[i] = bds.mkNPLS(vals[i]) | 188 nativePLS[i] = bds.mkNPLS(vals[i]) |
| 187 } | 189 } |
| 188 | 190 |
| 189 var err error | 191 var err error |
| 190 » if tx := bds.transaction; tx != nil { | 192 » if bds.transaction != nil { |
| 191 // Transactional PutMulti. | 193 // Transactional PutMulti. |
| 192 // | 194 // |
| 193 // In order to simulate the presence of mid-transaction key allo
cation, we | 195 // In order to simulate the presence of mid-transaction key allo
cation, we |
| 194 // will identify any incomplete keys and allocate IDs for them.
This is | 196 // will identify any incomplete keys and allocate IDs for them.
This is |
| 195 // potentially wasteful in the event of failed or retried transa
ctions, but | 197 // potentially wasteful in the event of failed or retried transa
ctions, but |
| 196 // it is required to maintain API compatibility with the datasto
re | 198 // it is required to maintain API compatibility with the datasto
re |
| 197 // interface. | 199 // interface. |
| 198 var incompleteKeys []*datastore.Key | 200 var incompleteKeys []*datastore.Key |
| 199 var incompleteKeyMap map[int]int | 201 var incompleteKeyMap map[int]int |
| 200 for i, k := range nativeKeys { | 202 for i, k := range nativeKeys { |
| (...skipping 11 matching lines...) Expand all Loading... |
| 212 if len(incompleteKeys) > 0 { | 214 if len(incompleteKeys) > 0 { |
| 213 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey
s) | 215 idKeys, err := bds.client.AllocateIDs(bds, incompleteKey
s) |
| 214 if err != nil { | 216 if err != nil { |
| 215 return err | 217 return err |
| 216 } | 218 } |
| 217 for i, idKey := range idKeys { | 219 for i, idKey := range idKeys { |
| 218 nativeKeys[incompleteKeyMap[i]] = idKey | 220 nativeKeys[incompleteKeyMap[i]] = idKey |
| 219 } | 221 } |
| 220 } | 222 } |
| 221 | 223 |
| 222 » » _, err = tx.PutMulti(nativeKeys, nativePLS) | 224 » » _, err = bds.transaction.PutMulti(nativeKeys, nativePLS) |
| 223 } else { | 225 } else { |
| 224 // Non-transactional PutMulti. | 226 // Non-transactional PutMulti. |
| 225 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS
) | 227 nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS
) |
| 226 } | 228 } |
| 227 | 229 |
| 228 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro
r { | 230 return idxCallbacker(err, len(nativeKeys), func(idx int, err error) erro
r { |
| 229 if err == nil { | 231 if err == nil { |
| 230 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) | 232 return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil) |
| 231 } | 233 } |
| 232 return cb(nil, err) | 234 return cb(nil, err) |
| 233 }) | 235 }) |
| 234 } | 236 } |
| 235 | 237 |
| 236 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro
r { | 238 func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) erro
r { |
| 237 nativeKeys := bds.gaeKeysToNative(keys...) | 239 nativeKeys := bds.gaeKeysToNative(keys...) |
| 238 | 240 |
| 239 var err error | 241 var err error |
| 240 » if tx := bds.transaction; tx != nil { | 242 » if bds.transaction != nil { |
| 241 // Transactional DeleteMulti. | 243 // Transactional DeleteMulti. |
| 242 » » err = tx.DeleteMulti(nativeKeys) | 244 » » err = bds.transaction.DeleteMulti(nativeKeys) |
| 243 } else { | 245 } else { |
| 244 // Non-transactional DeleteMulti. | 246 // Non-transactional DeleteMulti. |
| 245 err = bds.client.DeleteMulti(bds, nativeKeys) | 247 err = bds.client.DeleteMulti(bds, nativeKeys) |
| 246 } | 248 } |
| 247 | 249 |
| 248 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error
{ | 250 return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error
{ |
| 249 return cb(err) | 251 return cb(err) |
| 250 }) | 252 }) |
| 251 } | 253 } |
| 252 | 254 |
| 253 func (bds *boundDatastore) Testable() ds.Testable { | 255 func (bds *boundDatastore) WithoutTransaction() context.Context { |
| 254 » return nil | 256 » return withDatastoreTransaction(bds, nil) |
| 255 } | 257 } |
| 256 | 258 |
| 259 func (bds *boundDatastore) CurrentTransaction() ds.Transaction { return bds.tran
saction } |
| 260 |
| 261 func (bds *boundDatastore) GetTestable() ds.Testable { return nil } |
| 262 |
| 257 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore.
Query { | 263 func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore.
Query { |
| 258 nq := datastore.NewQuery(fq.Kind()) | 264 nq := datastore.NewQuery(fq.Kind()) |
| 259 if bds.transaction != nil { | 265 if bds.transaction != nil { |
| 260 nq = nq.Transaction(bds.transaction) | 266 nq = nq.Transaction(bds.transaction) |
| 261 } | 267 } |
| 262 | 268 |
| 263 // nativeFilter translates a filter field. If the translation fails, we'
ll | 269 // nativeFilter translates a filter field. If the translation fails, we'
ll |
| 264 // pass the result through to the underlying datastore and allow it to | 270 // pass the result through to the underlying datastore and allow it to |
| 265 // reject it. | 271 // reject it. |
| 266 nativeFilter := func(prop ds.Property) interface{} { | 272 nativeFilter := func(prop ds.Property) interface{} { |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 323 if ic.Descending { | 329 if ic.Descending { |
| 324 prop = "-" + prop | 330 prop = "-" + prop |
| 325 } | 331 } |
| 326 nq = nq.Order(prop) | 332 nq = nq.Order(prop) |
| 327 } | 333 } |
| 328 | 334 |
| 329 return nq | 335 return nq |
| 330 } | 336 } |
| 331 | 337 |
| 332 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver
{ | 338 func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver
{ |
| 333 » return &nativePropertyLoadSaver{bds: bds, pmap: clonePropertyMap(base)} | 339 » return &nativePropertyLoadSaver{ |
| 340 » » bds: bds, |
| 341 » » pmap: clonePropertyMap(base), |
| 342 » } |
| 334 } | 343 } |
| 335 | 344 |
| 336 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property)
(nativeProp datastore.Property, err error) { | 345 func (bds *boundDatastore) gaePropertyToNative(name string, props []ds.Property)
( |
| 346 » nativeProp datastore.Property, err error) { |
| 347 |
| 337 nativeProp.Name = name | 348 nativeProp.Name = name |
| 338 | 349 |
| 339 nativeValues := make([]interface{}, len(props)) | 350 nativeValues := make([]interface{}, len(props)) |
| 340 for i, prop := range props { | 351 for i, prop := range props { |
| 341 switch pt := prop.Type(); pt { | 352 switch pt := prop.Type(); pt { |
| 342 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P
TString, ds.PTFloat: | 353 case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.P
TString, ds.PTFloat: |
| 343 nativeValues[i] = prop.Value() | 354 nativeValues[i] = prop.Value() |
| 344 break | 355 break |
| 345 | 356 |
| 346 case ds.PTKey: | 357 case ds.PTKey: |
| 347 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds.
Key))[0] | 358 nativeValues[i] = bds.gaeKeysToNative(prop.Value().(*ds.
Key))[0] |
| 348 | 359 |
| 349 default: | 360 default: |
| 350 err = fmt.Errorf("unsupported property type at %d: %v",
i, pt) | 361 err = fmt.Errorf("unsupported property type at %d: %v",
i, pt) |
| 351 return | 362 return |
| 352 } | 363 } |
| 353 } | 364 } |
| 354 | 365 |
| 355 if len(nativeValues) == 1 { | 366 if len(nativeValues) == 1 { |
| 356 nativeProp.Value = nativeValues[0] | 367 nativeProp.Value = nativeValues[0] |
| 357 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) | 368 nativeProp.NoIndex = (props[0].IndexSetting() != ds.ShouldIndex) |
| 358 } else { | 369 } else { |
| 359 // We must always index list values. | 370 // We must always index list values. |
| 360 nativeProp.Value = nativeValues | 371 nativeProp.Value = nativeValues |
| 361 } | 372 } |
| 362 return | 373 return |
| 363 } | 374 } |
| 364 | 375 |
| 365 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (n
ame string, props []ds.Property, err error) { | 376 func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) ( |
| 377 » name string, props []ds.Property, err error) { |
| 378 |
| 366 name = nativeProp.Name | 379 name = nativeProp.Name |
| 367 | 380 |
| 368 var nativeValues []interface{} | 381 var nativeValues []interface{} |
| 369 // Slice of supported native type. Break this into a slice of datastore | 382 // Slice of supported native type. Break this into a slice of datastore |
| 370 // properties. | 383 // properties. |
| 371 // | 384 // |
| 372 // It must be an []interface{}. | 385 // It must be an []interface{}. |
| 373 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice &
& rv.Type().Elem().Kind() == reflect.Interface { | 386 if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice &
& rv.Type().Elem().Kind() == reflect.Interface { |
| 374 nativeValues = rv.Interface().([]interface{}) | 387 nativeValues = rv.Interface().([]interface{}) |
| 375 } else { | 388 } else { |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 417 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID
, tok.IntID, nativeKey) | 430 nativeKey = datastore.NewKey(bds, tok.Kind, tok.StringID
, tok.IntID, nativeKey) |
| 418 } | 431 } |
| 419 nativeKeys[i] = nativeKey | 432 nativeKeys[i] = nativeKey |
| 420 } | 433 } |
| 421 return nativeKeys | 434 return nativeKeys |
| 422 } | 435 } |
| 423 | 436 |
| 424 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K
ey { | 437 func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.K
ey { |
| 425 keys := make([]*ds.Key, len(nativeKeys)) | 438 keys := make([]*ds.Key, len(nativeKeys)) |
| 426 toks := make([]ds.KeyTok, 1) | 439 toks := make([]ds.KeyTok, 1) |
| 440 |
| 441 kc := bds.kc |
| 427 for i, nativeKey := range nativeKeys { | 442 for i, nativeKey := range nativeKeys { |
| 428 toks = toks[:0] | 443 toks = toks[:0] |
| 429 cur := nativeKey | 444 cur := nativeKey |
| 430 for { | 445 for { |
| 431 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c
ur.ID(), StringID: cur.Name()}) | 446 toks = append(toks, ds.KeyTok{Kind: cur.Kind(), IntID: c
ur.ID(), StringID: cur.Name()}) |
| 432 cur = cur.Parent() | 447 cur = cur.Parent() |
| 433 if cur == nil { | 448 if cur == nil { |
| 434 break | 449 break |
| 435 } | 450 } |
| 436 } | 451 } |
| 437 | 452 |
| 438 // Reverse "toks" so we have ancestor-to-child lineage. | 453 // Reverse "toks" so we have ancestor-to-child lineage. |
| 439 for i := 0; i < len(toks)/2; i++ { | 454 for i := 0; i < len(toks)/2; i++ { |
| 440 ri := len(toks) - i - 1 | 455 ri := len(toks) - i - 1 |
| 441 toks[i], toks[ri] = toks[ri], toks[i] | 456 toks[i], toks[ri] = toks[ri], toks[i] |
| 442 } | 457 } |
| 443 » » keys[i] = ds.NewKeyToks(bds.appID, nativeKey.Namespace(), toks) | 458 » » kc.Namespace = nativeKey.Namespace() |
| 459 » » keys[i] = kc.NewKeyToks(toks) |
| 444 } | 460 } |
| 445 return keys | 461 return keys |
| 446 } | 462 } |
| 447 | 463 |
| 448 // nativePropertyLoadSaver is a ds.PropertyMap which implements | 464 // nativePropertyLoadSaver is a ds.PropertyMap which implements |
| 449 // datastore.PropertyLoadSaver. | 465 // datastore.PropertyLoadSaver. |
| 450 // | 466 // |
| 451 // It naturally converts between native and GAE properties and values. | 467 // It naturally converts between native and GAE properties and values. |
| 452 type nativePropertyLoadSaver struct { | 468 type nativePropertyLoadSaver struct { |
| 453 bds *boundDatastore | 469 bds *boundDatastore |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 523 case datastore.ErrNoSuchEntity: | 539 case datastore.ErrNoSuchEntity: |
| 524 return ds.ErrNoSuchEntity | 540 return ds.ErrNoSuchEntity |
| 525 case datastore.ErrConcurrentTransaction: | 541 case datastore.ErrConcurrentTransaction: |
| 526 return ds.ErrConcurrentTransaction | 542 return ds.ErrConcurrentTransaction |
| 527 case datastore.ErrInvalidKey: | 543 case datastore.ErrInvalidKey: |
| 528 return ds.ErrInvalidKey | 544 return ds.ErrInvalidKey |
| 529 default: | 545 default: |
| 530 return err | 546 return err |
| 531 } | 547 } |
| 532 } | 548 } |
| OLD | NEW |