| OLD | NEW | 
|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 | 
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 package prod | 5 package prod | 
| 6 | 6 | 
| 7 import ( | 7 import ( | 
| 8         ds "github.com/luci/gae/service/datastore" | 8         ds "github.com/luci/gae/service/datastore" | 
| 9         "github.com/luci/luci-go/common/errors" | 9         "github.com/luci/luci-go/common/errors" | 
| 10         "golang.org/x/net/context" | 10         "golang.org/x/net/context" | 
| 11         "google.golang.org/appengine/datastore" | 11         "google.golang.org/appengine/datastore" | 
| 12 ) | 12 ) | 
| 13 | 13 | 
| 14 // useRDS adds a gae.RawDatastore implementation to context, accessible | 14 // useRDS adds a gae.RawDatastore implementation to context, accessible | 
| 15 // by gae.GetDS(c) | 15 // by gae.GetDS(c) | 
| 16 func useRDS(c context.Context) context.Context { | 16 func useRDS(c context.Context) context.Context { | 
| 17 »       return ds.SetRawFactory(c, func(ci context.Context, wantTxn bool) ds.Raw
     Interface { | 17 »       return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface { | 
| 18 »       »       maybeTxnCtx := AEContext(ci) | 18 »       »       rds := rdsImpl{ | 
| 19 | 19 »       »       »       userCtx: ci, | 
| 20 »       »       if wantTxn { | 20 »       »       »       ps:      getProdState(ci), | 
| 21 »       »       »       return rdsImpl{ci, maybeTxnCtx} |  | 
| 22                 } | 21                 } | 
| 23 »       »       aeCtx := AEContextNoTxn(ci) | 22 »       »       rds.aeCtx = rds.ps.context(ci) | 
| 24 »       »       if maybeTxnCtx != aeCtx { | 23 »       »       return &rds | 
| 25 »       »       »       ci = context.WithValue(ci, prodContextKey, aeCtx) |  | 
| 26 »       »       } |  | 
| 27 »       »       return rdsImpl{ci, aeCtx} |  | 
| 28         }) | 24         }) | 
| 29 } | 25 } | 
| 30 | 26 | 
| 31 ////////// Datastore | 27 ////////// Datastore | 
| 32 | 28 | 
| 33 type rdsImpl struct { | 29 type rdsImpl struct { | 
| 34         // userCtx is the context that has the luci/gae services and user object
     s in | 30         // userCtx is the context that has the luci/gae services and user object
     s in | 
| 35         // it. | 31         // it. | 
| 36         userCtx context.Context | 32         userCtx context.Context | 
| 37 | 33 | 
| 38 »       // aeCtx is the context with the appengine connection information in it. | 34 »       // aeCtx is the AppEngine Context that will be used in method calls. Thi
     s is | 
|  | 35 »       // derived from ps. | 
| 39         aeCtx context.Context | 36         aeCtx context.Context | 
|  | 37 | 
|  | 38         // ps is the current production state. | 
|  | 39         ps prodState | 
| 40 } | 40 } | 
| 41 | 41 | 
| 42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error 
     { | 42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error 
     { | 
| 43         if err == nil { | 43         if err == nil { | 
| 44                 for i := 0; i < amt; i++ { | 44                 for i := 0; i < amt; i++ { | 
| 45                         if err := cb(i, nil); err != nil { | 45                         if err := cb(i, nil); err != nil { | 
| 46                                 return err | 46                                 return err | 
| 47                         } | 47                         } | 
| 48                 } | 48                 } | 
| 49                 return nil | 49                 return nil | 
| 50         } | 50         } | 
| 51         err = errors.Fix(err) | 51         err = errors.Fix(err) | 
| 52         me, ok := err.(errors.MultiError) | 52         me, ok := err.(errors.MultiError) | 
| 53         if ok { | 53         if ok { | 
| 54                 for i, err := range me { | 54                 for i, err := range me { | 
| 55                         if err := cb(i, err); err != nil { | 55                         if err := cb(i, err); err != nil { | 
| 56                                 return err | 56                                 return err | 
| 57                         } | 57                         } | 
| 58                 } | 58                 } | 
| 59                 return nil | 59                 return nil | 
| 60         } | 60         } | 
| 61         return err | 61         return err | 
| 62 } | 62 } | 
| 63 | 63 | 
| 64 func (d rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { | 64 func (d *rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { | 
| 65         // Map keys by entity type. | 65         // Map keys by entity type. | 
| 66         entityMap := make(map[string][]int) | 66         entityMap := make(map[string][]int) | 
| 67         for i, key := range keys { | 67         for i, key := range keys { | 
| 68                 ks := key.String() | 68                 ks := key.String() | 
| 69                 entityMap[ks] = append(entityMap[ks], i) | 69                 entityMap[ks] = append(entityMap[ks], i) | 
| 70         } | 70         } | 
| 71 | 71 | 
| 72         // Allocate a set of IDs for each unique entity type. | 72         // Allocate a set of IDs for each unique entity type. | 
| 73         errors := errors.NewLazyMultiError(len(keys)) | 73         errors := errors.NewLazyMultiError(len(keys)) | 
| 74         setErrs := func(idxs []int, err error) { | 74         setErrs := func(idxs []int, err error) { | 
| (...skipping 24 matching lines...) Expand all  Loading... | 
| 99         for i, key := range keys { | 99         for i, key := range keys { | 
| 100                 if err := errors.GetOne(i); err != nil { | 100                 if err := errors.GetOne(i); err != nil { | 
| 101                         cb(nil, err) | 101                         cb(nil, err) | 
| 102                 } else { | 102                 } else { | 
| 103                         cb(key, nil) | 103                         cb(key, nil) | 
| 104                 } | 104                 } | 
| 105         } | 105         } | 
| 106         return nil | 106         return nil | 
| 107 } | 107 } | 
| 108 | 108 | 
| 109 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { | 109 func (d *rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { | 
| 110         keys, err := dsMF2R(d.aeCtx, ks) | 110         keys, err := dsMF2R(d.aeCtx, ks) | 
| 111         if err == nil { | 111         if err == nil { | 
| 112                 err = datastore.DeleteMulti(d.aeCtx, keys) | 112                 err = datastore.DeleteMulti(d.aeCtx, keys) | 
| 113         } | 113         } | 
| 114         return idxCallbacker(err, len(ks), func(_ int, err error) error { | 114         return idxCallbacker(err, len(ks), func(_ int, err error) error { | 
| 115                 return cb(err) | 115                 return cb(err) | 
| 116         }) | 116         }) | 
| 117 } | 117 } | 
| 118 | 118 | 
| 119 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul
     tiCB) error { | 119 func (d *rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMu
     ltiCB) error { | 
| 120         vals := make([]datastore.PropertyLoadSaver, len(keys)) | 120         vals := make([]datastore.PropertyLoadSaver, len(keys)) | 
| 121         rkeys, err := dsMF2R(d.aeCtx, keys) | 121         rkeys, err := dsMF2R(d.aeCtx, keys) | 
| 122         if err == nil { | 122         if err == nil { | 
| 123                 for i := range keys { | 123                 for i := range keys { | 
| 124                         vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} | 124                         vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} | 
| 125                 } | 125                 } | 
| 126                 err = datastore.GetMulti(d.aeCtx, rkeys, vals) | 126                 err = datastore.GetMulti(d.aeCtx, rkeys, vals) | 
| 127         } | 127         } | 
| 128         return idxCallbacker(err, len(keys), func(idx int, err error) error { | 128         return idxCallbacker(err, len(keys), func(idx int, err error) error { | 
| 129                 if pls := vals[idx]; pls != nil { | 129                 if pls := vals[idx]; pls != nil { | 
| 130                         return cb(pls.(*typeFilter).pm, err) | 130                         return cb(pls.(*typeFilter).pm, err) | 
| 131                 } | 131                 } | 
| 132                 return cb(nil, err) | 132                 return cb(nil, err) | 
| 133         }) | 133         }) | 
| 134 } | 134 } | 
| 135 | 135 | 
| 136 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB)
      error { | 136 func (d *rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB
     ) error { | 
| 137         rkeys, err := dsMF2R(d.aeCtx, keys) | 137         rkeys, err := dsMF2R(d.aeCtx, keys) | 
| 138         if err == nil { | 138         if err == nil { | 
| 139                 rvals := make([]datastore.PropertyLoadSaver, len(vals)) | 139                 rvals := make([]datastore.PropertyLoadSaver, len(vals)) | 
| 140                 for i, val := range vals { | 140                 for i, val := range vals { | 
| 141                         rvals[i] = &typeFilter{d.aeCtx, val} | 141                         rvals[i] = &typeFilter{d.aeCtx, val} | 
| 142                 } | 142                 } | 
| 143                 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) | 143                 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) | 
| 144         } | 144         } | 
| 145         return idxCallbacker(err, len(keys), func(idx int, err error) error { | 145         return idxCallbacker(err, len(keys), func(idx int, err error) error { | 
| 146                 k := (*ds.Key)(nil) | 146                 k := (*ds.Key)(nil) | 
| 147                 if err == nil { | 147                 if err == nil { | 
| 148                         k = dsR2F(rkeys[idx]) | 148                         k = dsR2F(rkeys[idx]) | 
| 149                 } | 149                 } | 
| 150                 return cb(k, err) | 150                 return cb(k, err) | 
| 151         }) | 151         }) | 
| 152 } | 152 } | 
| 153 | 153 | 
| 154 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { | 154 func (d *rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { | 
| 155         ret := datastore.NewQuery(fq.Kind()) | 155         ret := datastore.NewQuery(fq.Kind()) | 
| 156 | 156 | 
| 157         start, end := fq.Bounds() | 157         start, end := fq.Bounds() | 
| 158         if start != nil { | 158         if start != nil { | 
| 159                 ret = ret.Start(start.(datastore.Cursor)) | 159                 ret = ret.Start(start.(datastore.Cursor)) | 
| 160         } | 160         } | 
| 161         if end != nil { | 161         if end != nil { | 
| 162                 ret = ret.End(end.(datastore.Cursor)) | 162                 ret = ret.End(end.(datastore.Cursor)) | 
| 163         } | 163         } | 
| 164 | 164 | 
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 219         } | 219         } | 
| 220 | 220 | 
| 221         ret = ret.Project(fq.Project()...) | 221         ret = ret.Project(fq.Project()...) | 
| 222         if fq.Distinct() { | 222         if fq.Distinct() { | 
| 223                 ret = ret.Distinct() | 223                 ret = ret.Distinct() | 
| 224         } | 224         } | 
| 225 | 225 | 
| 226         return ret, nil | 226         return ret, nil | 
| 227 } | 227 } | 
| 228 | 228 | 
| 229 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { | 229 func (d *rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { | 
| 230         return datastore.DecodeCursor(s) | 230         return datastore.DecodeCursor(s) | 
| 231 } | 231 } | 
| 232 | 232 | 
| 233 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | 233 func (d *rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | 
| 234         q, err := d.fixQuery(fq) | 234         q, err := d.fixQuery(fq) | 
| 235         if err != nil { | 235         if err != nil { | 
| 236                 return err | 236                 return err | 
| 237         } | 237         } | 
| 238 | 238 | 
| 239         t := q.Run(d.aeCtx) | 239         t := q.Run(d.aeCtx) | 
| 240 | 240 | 
| 241         cfunc := func() (ds.Cursor, error) { | 241         cfunc := func() (ds.Cursor, error) { | 
| 242                 return t.Cursor() | 242                 return t.Cursor() | 
| 243         } | 243         } | 
| 244         tf := typeFilter{} | 244         tf := typeFilter{} | 
| 245         for { | 245         for { | 
| 246                 k, err := t.Next(&tf) | 246                 k, err := t.Next(&tf) | 
| 247                 if err == datastore.Done { | 247                 if err == datastore.Done { | 
| 248                         return nil | 248                         return nil | 
| 249                 } | 249                 } | 
| 250                 if err != nil { | 250                 if err != nil { | 
| 251                         return err | 251                         return err | 
| 252                 } | 252                 } | 
| 253                 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil { | 253                 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil { | 
| 254                         return err | 254                         return err | 
| 255                 } | 255                 } | 
| 256         } | 256         } | 
| 257 } | 257 } | 
| 258 | 258 | 
| 259 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { | 259 func (d *rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { | 
| 260         q, err := d.fixQuery(fq) | 260         q, err := d.fixQuery(fq) | 
| 261         if err != nil { | 261         if err != nil { | 
| 262                 return 0, err | 262                 return 0, err | 
| 263         } | 263         } | 
| 264         ret, err := q.Count(d.aeCtx) | 264         ret, err := q.Count(d.aeCtx) | 
| 265         return int64(ret), err | 265         return int64(ret), err | 
| 266 } | 266 } | 
| 267 | 267 | 
| 268 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran
     sactionOptions) error { | 268 func (d *rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tra
     nsactionOptions) error { | 
| 269         ropts := (*datastore.TransactionOptions)(opts) | 269         ropts := (*datastore.TransactionOptions)(opts) | 
| 270         return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error
      { | 270         return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error
      { | 
| 271 »       »       return f(context.WithValue(d.userCtx, prodContextKey, c)) | 271 »       »       // Derive a prodState with this transaction Context. | 
|  | 272 »       »       ps := d.ps | 
|  | 273 »       »       ps.ctx = c | 
|  | 274 »       »       ps.inTxn = true | 
|  | 275 | 
|  | 276 »       »       c = withProdState(d.userCtx, ps) | 
|  | 277 »       »       return f(c) | 
| 272         }, ropts) | 278         }, ropts) | 
| 273 } | 279 } | 
| 274 | 280 | 
| 275 func (d rdsImpl) Testable() ds.Testable { | 281 func (d *rdsImpl) WithoutTransaction() context.Context { | 
|  | 282 »       c := d.userCtx | 
|  | 283 »       if d.ps.inTxn { | 
|  | 284 »       »       // We're in a transaction. Reset to non-transactional state. | 
|  | 285 »       »       ps := d.ps | 
|  | 286 »       »       ps.ctx = ps.noTxnCtx | 
|  | 287 »       »       ps.inTxn = false | 
|  | 288 »       »       c = withProdState(c, ps) | 
|  | 289 »       } | 
|  | 290 »       return c | 
|  | 291 } | 
|  | 292 | 
|  | 293 func (d *rdsImpl) CurrentTransaction() ds.Transaction { | 
|  | 294 »       if d.ps.inTxn { | 
|  | 295 »       »       // Since we don't distinguish between transactions (yet), we jus
     t need this | 
|  | 296 »       »       // to be non-nil. | 
|  | 297 »       »       return struct{}{} | 
|  | 298 »       } | 
| 276         return nil | 299         return nil | 
| 277 } | 300 } | 
|  | 301 | 
|  | 302 func (d *rdsImpl) GetTestable() ds.Testable { | 
|  | 303         return nil | 
|  | 304 } | 
| OLD | NEW | 
|---|