| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package prod | 5 package prod |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 ds "github.com/luci/gae/service/datastore" | 8 ds "github.com/luci/gae/service/datastore" |
| 9 "github.com/luci/luci-go/common/errors" | 9 "github.com/luci/luci-go/common/errors" |
| 10 "golang.org/x/net/context" | 10 "golang.org/x/net/context" |
| 11 "google.golang.org/appengine/datastore" | 11 "google.golang.org/appengine/datastore" |
| 12 ) | 12 ) |
| 13 | 13 |
| 14 // useRDS adds a gae.RawDatastore implementation to context, accessible | 14 // useRDS adds a gae.RawDatastore implementation to context, accessible |
| 15 // by gae.GetDS(c) | 15 // by gae.GetDS(c) |
| 16 func useRDS(c context.Context) context.Context { | 16 func useRDS(c context.Context) context.Context { |
| 17 » return ds.SetRawFactory(c, func(ci context.Context, wantTxn bool) ds.Raw
Interface { | 17 » return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface { |
| 18 » » maybeTxnCtx := AEContext(ci) | 18 » » rds := rdsImpl{ |
| 19 | 19 » » » userCtx: ci, |
| 20 » » if wantTxn { | 20 » » » ps: getProdState(ci), |
| 21 » » » return rdsImpl{ci, maybeTxnCtx} | |
| 22 } | 21 } |
| 23 » » aeCtx := AEContextNoTxn(ci) | 22 » » rds.aeCtx = rds.ps.context(ci) |
| 24 » » if maybeTxnCtx != aeCtx { | 23 » » return &rds |
| 25 » » » ci = context.WithValue(ci, prodContextKey, aeCtx) | |
| 26 » » } | |
| 27 » » return rdsImpl{ci, aeCtx} | |
| 28 }) | 24 }) |
| 29 } | 25 } |
| 30 | 26 |
| 31 ////////// Datastore | 27 ////////// Datastore |
| 32 | 28 |
| 33 type rdsImpl struct { | 29 type rdsImpl struct { |
| 34 // userCtx is the context that has the luci/gae services and user object
s in | 30 // userCtx is the context that has the luci/gae services and user object
s in |
| 35 // it. | 31 // it. |
| 36 userCtx context.Context | 32 userCtx context.Context |
| 37 | 33 |
| 38 » // aeCtx is the context with the appengine connection information in it. | 34 » // aeCtx is the AppEngine Context that will be used in method calls. Thi
s is |
| 35 » // derived from ps. |
| 39 aeCtx context.Context | 36 aeCtx context.Context |
| 37 |
| 38 // ps is the current production state. |
| 39 ps prodState |
| 40 } | 40 } |
| 41 | 41 |
| 42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error
{ | 42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error
{ |
| 43 if err == nil { | 43 if err == nil { |
| 44 for i := 0; i < amt; i++ { | 44 for i := 0; i < amt; i++ { |
| 45 if err := cb(i, nil); err != nil { | 45 if err := cb(i, nil); err != nil { |
| 46 return err | 46 return err |
| 47 } | 47 } |
| 48 } | 48 } |
| 49 return nil | 49 return nil |
| 50 } | 50 } |
| 51 err = errors.Fix(err) | 51 err = errors.Fix(err) |
| 52 me, ok := err.(errors.MultiError) | 52 me, ok := err.(errors.MultiError) |
| 53 if ok { | 53 if ok { |
| 54 for i, err := range me { | 54 for i, err := range me { |
| 55 if err := cb(i, err); err != nil { | 55 if err := cb(i, err); err != nil { |
| 56 return err | 56 return err |
| 57 } | 57 } |
| 58 } | 58 } |
| 59 return nil | 59 return nil |
| 60 } | 60 } |
| 61 return err | 61 return err |
| 62 } | 62 } |
| 63 | 63 |
| 64 func (d rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { | 64 func (d *rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { |
| 65 // Map keys by entity type. | 65 // Map keys by entity type. |
| 66 entityMap := make(map[string][]int) | 66 entityMap := make(map[string][]int) |
| 67 for i, key := range keys { | 67 for i, key := range keys { |
| 68 ks := key.String() | 68 ks := key.String() |
| 69 entityMap[ks] = append(entityMap[ks], i) | 69 entityMap[ks] = append(entityMap[ks], i) |
| 70 } | 70 } |
| 71 | 71 |
| 72 // Allocate a set of IDs for each unique entity type. | 72 // Allocate a set of IDs for each unique entity type. |
| 73 errors := errors.NewLazyMultiError(len(keys)) | 73 errors := errors.NewLazyMultiError(len(keys)) |
| 74 setErrs := func(idxs []int, err error) { | 74 setErrs := func(idxs []int, err error) { |
| (...skipping 24 matching lines...) Expand all Loading... |
| 99 for i, key := range keys { | 99 for i, key := range keys { |
| 100 if err := errors.GetOne(i); err != nil { | 100 if err := errors.GetOne(i); err != nil { |
| 101 cb(nil, err) | 101 cb(nil, err) |
| 102 } else { | 102 } else { |
| 103 cb(key, nil) | 103 cb(key, nil) |
| 104 } | 104 } |
| 105 } | 105 } |
| 106 return nil | 106 return nil |
| 107 } | 107 } |
| 108 | 108 |
| 109 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { | 109 func (d *rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { |
| 110 keys, err := dsMF2R(d.aeCtx, ks) | 110 keys, err := dsMF2R(d.aeCtx, ks) |
| 111 if err == nil { | 111 if err == nil { |
| 112 err = datastore.DeleteMulti(d.aeCtx, keys) | 112 err = datastore.DeleteMulti(d.aeCtx, keys) |
| 113 } | 113 } |
| 114 return idxCallbacker(err, len(ks), func(_ int, err error) error { | 114 return idxCallbacker(err, len(ks), func(_ int, err error) error { |
| 115 return cb(err) | 115 return cb(err) |
| 116 }) | 116 }) |
| 117 } | 117 } |
| 118 | 118 |
| 119 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul
tiCB) error { | 119 func (d *rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMu
ltiCB) error { |
| 120 vals := make([]datastore.PropertyLoadSaver, len(keys)) | 120 vals := make([]datastore.PropertyLoadSaver, len(keys)) |
| 121 rkeys, err := dsMF2R(d.aeCtx, keys) | 121 rkeys, err := dsMF2R(d.aeCtx, keys) |
| 122 if err == nil { | 122 if err == nil { |
| 123 for i := range keys { | 123 for i := range keys { |
| 124 vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} | 124 vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} |
| 125 } | 125 } |
| 126 err = datastore.GetMulti(d.aeCtx, rkeys, vals) | 126 err = datastore.GetMulti(d.aeCtx, rkeys, vals) |
| 127 } | 127 } |
| 128 return idxCallbacker(err, len(keys), func(idx int, err error) error { | 128 return idxCallbacker(err, len(keys), func(idx int, err error) error { |
| 129 if pls := vals[idx]; pls != nil { | 129 if pls := vals[idx]; pls != nil { |
| 130 return cb(pls.(*typeFilter).pm, err) | 130 return cb(pls.(*typeFilter).pm, err) |
| 131 } | 131 } |
| 132 return cb(nil, err) | 132 return cb(nil, err) |
| 133 }) | 133 }) |
| 134 } | 134 } |
| 135 | 135 |
| 136 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB)
error { | 136 func (d *rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB
) error { |
| 137 rkeys, err := dsMF2R(d.aeCtx, keys) | 137 rkeys, err := dsMF2R(d.aeCtx, keys) |
| 138 if err == nil { | 138 if err == nil { |
| 139 rvals := make([]datastore.PropertyLoadSaver, len(vals)) | 139 rvals := make([]datastore.PropertyLoadSaver, len(vals)) |
| 140 for i, val := range vals { | 140 for i, val := range vals { |
| 141 rvals[i] = &typeFilter{d.aeCtx, val} | 141 rvals[i] = &typeFilter{d.aeCtx, val} |
| 142 } | 142 } |
| 143 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) | 143 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) |
| 144 } | 144 } |
| 145 return idxCallbacker(err, len(keys), func(idx int, err error) error { | 145 return idxCallbacker(err, len(keys), func(idx int, err error) error { |
| 146 k := (*ds.Key)(nil) | 146 k := (*ds.Key)(nil) |
| 147 if err == nil { | 147 if err == nil { |
| 148 k = dsR2F(rkeys[idx]) | 148 k = dsR2F(rkeys[idx]) |
| 149 } | 149 } |
| 150 return cb(k, err) | 150 return cb(k, err) |
| 151 }) | 151 }) |
| 152 } | 152 } |
| 153 | 153 |
| 154 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { | 154 func (d *rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { |
| 155 ret := datastore.NewQuery(fq.Kind()) | 155 ret := datastore.NewQuery(fq.Kind()) |
| 156 | 156 |
| 157 start, end := fq.Bounds() | 157 start, end := fq.Bounds() |
| 158 if start != nil { | 158 if start != nil { |
| 159 ret = ret.Start(start.(datastore.Cursor)) | 159 ret = ret.Start(start.(datastore.Cursor)) |
| 160 } | 160 } |
| 161 if end != nil { | 161 if end != nil { |
| 162 ret = ret.End(end.(datastore.Cursor)) | 162 ret = ret.End(end.(datastore.Cursor)) |
| 163 } | 163 } |
| 164 | 164 |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 219 } | 219 } |
| 220 | 220 |
| 221 ret = ret.Project(fq.Project()...) | 221 ret = ret.Project(fq.Project()...) |
| 222 if fq.Distinct() { | 222 if fq.Distinct() { |
| 223 ret = ret.Distinct() | 223 ret = ret.Distinct() |
| 224 } | 224 } |
| 225 | 225 |
| 226 return ret, nil | 226 return ret, nil |
| 227 } | 227 } |
| 228 | 228 |
| 229 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { | 229 func (d *rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { |
| 230 return datastore.DecodeCursor(s) | 230 return datastore.DecodeCursor(s) |
| 231 } | 231 } |
| 232 | 232 |
| 233 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | 233 func (d *rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
| 234 q, err := d.fixQuery(fq) | 234 q, err := d.fixQuery(fq) |
| 235 if err != nil { | 235 if err != nil { |
| 236 return err | 236 return err |
| 237 } | 237 } |
| 238 | 238 |
| 239 t := q.Run(d.aeCtx) | 239 t := q.Run(d.aeCtx) |
| 240 | 240 |
| 241 cfunc := func() (ds.Cursor, error) { | 241 cfunc := func() (ds.Cursor, error) { |
| 242 return t.Cursor() | 242 return t.Cursor() |
| 243 } | 243 } |
| 244 tf := typeFilter{} | 244 tf := typeFilter{} |
| 245 for { | 245 for { |
| 246 k, err := t.Next(&tf) | 246 k, err := t.Next(&tf) |
| 247 if err == datastore.Done { | 247 if err == datastore.Done { |
| 248 return nil | 248 return nil |
| 249 } | 249 } |
| 250 if err != nil { | 250 if err != nil { |
| 251 return err | 251 return err |
| 252 } | 252 } |
| 253 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil { | 253 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil { |
| 254 return err | 254 return err |
| 255 } | 255 } |
| 256 } | 256 } |
| 257 } | 257 } |
| 258 | 258 |
| 259 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { | 259 func (d *rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { |
| 260 q, err := d.fixQuery(fq) | 260 q, err := d.fixQuery(fq) |
| 261 if err != nil { | 261 if err != nil { |
| 262 return 0, err | 262 return 0, err |
| 263 } | 263 } |
| 264 ret, err := q.Count(d.aeCtx) | 264 ret, err := q.Count(d.aeCtx) |
| 265 return int64(ret), err | 265 return int64(ret), err |
| 266 } | 266 } |
| 267 | 267 |
| 268 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran
sactionOptions) error { | 268 func (d *rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tra
nsactionOptions) error { |
| 269 ropts := (*datastore.TransactionOptions)(opts) | 269 ropts := (*datastore.TransactionOptions)(opts) |
| 270 return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error
{ | 270 return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error
{ |
| 271 » » return f(context.WithValue(d.userCtx, prodContextKey, c)) | 271 » » // Derive a prodState with this transaction Context. |
| 272 » » ps := d.ps |
| 273 » » ps.ctx = c |
| 274 » » ps.inTxn = true |
| 275 |
| 276 » » c = withProdState(d.userCtx, ps) |
| 277 » » return f(c) |
| 272 }, ropts) | 278 }, ropts) |
| 273 } | 279 } |
| 274 | 280 |
| 275 func (d rdsImpl) Testable() ds.Testable { | 281 func (d *rdsImpl) WithoutTransaction() context.Context { |
| 282 » c := d.userCtx |
| 283 » if d.ps.inTxn { |
| 284 » » // We're in a transaction. Reset to non-transactional state. |
| 285 » » ps := d.ps |
| 286 » » ps.ctx = ps.noTxnCtx |
| 287 » » ps.inTxn = false |
| 288 » » c = withProdState(c, ps) |
| 289 » } |
| 290 » return c |
| 291 } |
| 292 |
| 293 func (d *rdsImpl) CurrentTransaction() ds.Transaction { |
| 294 » if d.ps.inTxn { |
| 295 » » // Since we don't distinguish between transactions (yet), we jus
t need this |
| 296 » » // to be non-nil. |
| 297 » » return struct{}{} |
| 298 » } |
| 276 return nil | 299 return nil |
| 277 } | 300 } |
| 301 |
| 302 func (d *rdsImpl) GetTestable() ds.Testable { |
| 303 return nil |
| 304 } |
| OLD | NEW |