Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: impl/prod/raw_datastore.go

Issue 2302743002: Interface update, per-method Contexts. (Closed)
Patch Set: Lightning talk licenses. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/prod/module.go ('k') | impl/prod/raw_datastore_type_converter.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package prod 5 package prod
6 6
7 import ( 7 import (
8 ds "github.com/luci/gae/service/datastore" 8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/common/errors" 9 "github.com/luci/luci-go/common/errors"
10 "golang.org/x/net/context" 10 "golang.org/x/net/context"
11 "google.golang.org/appengine/datastore" 11 "google.golang.org/appengine/datastore"
12 ) 12 )
13 13
14 // useRDS adds a gae.RawDatastore implementation to context, accessible 14 // useRDS adds a gae.RawDatastore implementation to context, accessible
15 // by gae.GetDS(c) 15 // by gae.GetDS(c)
16 func useRDS(c context.Context) context.Context { 16 func useRDS(c context.Context) context.Context {
17 » return ds.SetRawFactory(c, func(ci context.Context, wantTxn bool) ds.Raw Interface { 17 » return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface {
18 » » maybeTxnCtx := AEContext(ci) 18 » » rds := rdsImpl{
19 19 » » » userCtx: ci,
20 » » if wantTxn { 20 » » » ps: getProdState(ci),
21 » » » return rdsImpl{ci, maybeTxnCtx}
22 } 21 }
23 » » aeCtx := AEContextNoTxn(ci) 22 » » rds.aeCtx = rds.ps.context(ci)
24 » » if maybeTxnCtx != aeCtx { 23 » » return &rds
25 » » » ci = context.WithValue(ci, prodContextKey, aeCtx)
26 » » }
27 » » return rdsImpl{ci, aeCtx}
28 }) 24 })
29 } 25 }
30 26
31 ////////// Datastore 27 ////////// Datastore
32 28
33 type rdsImpl struct { 29 type rdsImpl struct {
34 // userCtx is the context that has the luci/gae services and user object s in 30 // userCtx is the context that has the luci/gae services and user object s in
35 // it. 31 // it.
36 userCtx context.Context 32 userCtx context.Context
37 33
38 » // aeCtx is the context with the appengine connection information in it. 34 » // aeCtx is the AppEngine Context that will be used in method calls. Thi s is
35 » // derived from ps.
39 aeCtx context.Context 36 aeCtx context.Context
37
38 // ps is the current production state.
39 ps prodState
40 } 40 }
41 41
42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error { 42 func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error {
43 if err == nil { 43 if err == nil {
44 for i := 0; i < amt; i++ { 44 for i := 0; i < amt; i++ {
45 if err := cb(i, nil); err != nil { 45 if err := cb(i, nil); err != nil {
46 return err 46 return err
47 } 47 }
48 } 48 }
49 return nil 49 return nil
50 } 50 }
51 err = errors.Fix(err) 51 err = errors.Fix(err)
52 me, ok := err.(errors.MultiError) 52 me, ok := err.(errors.MultiError)
53 if ok { 53 if ok {
54 for i, err := range me { 54 for i, err := range me {
55 if err := cb(i, err); err != nil { 55 if err := cb(i, err); err != nil {
56 return err 56 return err
57 } 57 }
58 } 58 }
59 return nil 59 return nil
60 } 60 }
61 return err 61 return err
62 } 62 }
63 63
64 func (d rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error { 64 func (d *rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error {
65 // Map keys by entity type. 65 // Map keys by entity type.
66 entityMap := make(map[string][]int) 66 entityMap := make(map[string][]int)
67 for i, key := range keys { 67 for i, key := range keys {
68 ks := key.String() 68 ks := key.String()
69 entityMap[ks] = append(entityMap[ks], i) 69 entityMap[ks] = append(entityMap[ks], i)
70 } 70 }
71 71
72 // Allocate a set of IDs for each unique entity type. 72 // Allocate a set of IDs for each unique entity type.
73 errors := errors.NewLazyMultiError(len(keys)) 73 errors := errors.NewLazyMultiError(len(keys))
74 setErrs := func(idxs []int, err error) { 74 setErrs := func(idxs []int, err error) {
(...skipping 24 matching lines...) Expand all
99 for i, key := range keys { 99 for i, key := range keys {
100 if err := errors.GetOne(i); err != nil { 100 if err := errors.GetOne(i); err != nil {
101 cb(nil, err) 101 cb(nil, err)
102 } else { 102 } else {
103 cb(key, nil) 103 cb(key, nil)
104 } 104 }
105 } 105 }
106 return nil 106 return nil
107 } 107 }
108 108
109 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { 109 func (d *rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error {
110 keys, err := dsMF2R(d.aeCtx, ks) 110 keys, err := dsMF2R(d.aeCtx, ks)
111 if err == nil { 111 if err == nil {
112 err = datastore.DeleteMulti(d.aeCtx, keys) 112 err = datastore.DeleteMulti(d.aeCtx, keys)
113 } 113 }
114 return idxCallbacker(err, len(ks), func(_ int, err error) error { 114 return idxCallbacker(err, len(ks), func(_ int, err error) error {
115 return cb(err) 115 return cb(err)
116 }) 116 })
117 } 117 }
118 118
119 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul tiCB) error { 119 func (d *rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMu ltiCB) error {
120 vals := make([]datastore.PropertyLoadSaver, len(keys)) 120 vals := make([]datastore.PropertyLoadSaver, len(keys))
121 rkeys, err := dsMF2R(d.aeCtx, keys) 121 rkeys, err := dsMF2R(d.aeCtx, keys)
122 if err == nil { 122 if err == nil {
123 for i := range keys { 123 for i := range keys {
124 vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} 124 vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}}
125 } 125 }
126 err = datastore.GetMulti(d.aeCtx, rkeys, vals) 126 err = datastore.GetMulti(d.aeCtx, rkeys, vals)
127 } 127 }
128 return idxCallbacker(err, len(keys), func(idx int, err error) error { 128 return idxCallbacker(err, len(keys), func(idx int, err error) error {
129 if pls := vals[idx]; pls != nil { 129 if pls := vals[idx]; pls != nil {
130 return cb(pls.(*typeFilter).pm, err) 130 return cb(pls.(*typeFilter).pm, err)
131 } 131 }
132 return cb(nil, err) 132 return cb(nil, err)
133 }) 133 })
134 } 134 }
135 135
136 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB) error { 136 func (d *rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB ) error {
137 rkeys, err := dsMF2R(d.aeCtx, keys) 137 rkeys, err := dsMF2R(d.aeCtx, keys)
138 if err == nil { 138 if err == nil {
139 rvals := make([]datastore.PropertyLoadSaver, len(vals)) 139 rvals := make([]datastore.PropertyLoadSaver, len(vals))
140 for i, val := range vals { 140 for i, val := range vals {
141 rvals[i] = &typeFilter{d.aeCtx, val} 141 rvals[i] = &typeFilter{d.aeCtx, val}
142 } 142 }
143 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) 143 rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals)
144 } 144 }
145 return idxCallbacker(err, len(keys), func(idx int, err error) error { 145 return idxCallbacker(err, len(keys), func(idx int, err error) error {
146 k := (*ds.Key)(nil) 146 k := (*ds.Key)(nil)
147 if err == nil { 147 if err == nil {
148 k = dsR2F(rkeys[idx]) 148 k = dsR2F(rkeys[idx])
149 } 149 }
150 return cb(k, err) 150 return cb(k, err)
151 }) 151 })
152 } 152 }
153 153
154 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { 154 func (d *rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) {
155 ret := datastore.NewQuery(fq.Kind()) 155 ret := datastore.NewQuery(fq.Kind())
156 156
157 start, end := fq.Bounds() 157 start, end := fq.Bounds()
158 if start != nil { 158 if start != nil {
159 ret = ret.Start(start.(datastore.Cursor)) 159 ret = ret.Start(start.(datastore.Cursor))
160 } 160 }
161 if end != nil { 161 if end != nil {
162 ret = ret.End(end.(datastore.Cursor)) 162 ret = ret.End(end.(datastore.Cursor))
163 } 163 }
164 164
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
219 } 219 }
220 220
221 ret = ret.Project(fq.Project()...) 221 ret = ret.Project(fq.Project()...)
222 if fq.Distinct() { 222 if fq.Distinct() {
223 ret = ret.Distinct() 223 ret = ret.Distinct()
224 } 224 }
225 225
226 return ret, nil 226 return ret, nil
227 } 227 }
228 228
229 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { 229 func (d *rdsImpl) DecodeCursor(s string) (ds.Cursor, error) {
230 return datastore.DecodeCursor(s) 230 return datastore.DecodeCursor(s)
231 } 231 }
232 232
233 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { 233 func (d *rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
234 q, err := d.fixQuery(fq) 234 q, err := d.fixQuery(fq)
235 if err != nil { 235 if err != nil {
236 return err 236 return err
237 } 237 }
238 238
239 t := q.Run(d.aeCtx) 239 t := q.Run(d.aeCtx)
240 240
241 cfunc := func() (ds.Cursor, error) { 241 cfunc := func() (ds.Cursor, error) {
242 return t.Cursor() 242 return t.Cursor()
243 } 243 }
244 tf := typeFilter{} 244 tf := typeFilter{}
245 for { 245 for {
246 k, err := t.Next(&tf) 246 k, err := t.Next(&tf)
247 if err == datastore.Done { 247 if err == datastore.Done {
248 return nil 248 return nil
249 } 249 }
250 if err != nil { 250 if err != nil {
251 return err 251 return err
252 } 252 }
253 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil { 253 if err := cb(dsR2F(k), tf.pm, cfunc); err != nil {
254 return err 254 return err
255 } 255 }
256 } 256 }
257 } 257 }
258 258
259 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { 259 func (d *rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) {
260 q, err := d.fixQuery(fq) 260 q, err := d.fixQuery(fq)
261 if err != nil { 261 if err != nil {
262 return 0, err 262 return 0, err
263 } 263 }
264 ret, err := q.Count(d.aeCtx) 264 ret, err := q.Count(d.aeCtx)
265 return int64(ret), err 265 return int64(ret), err
266 } 266 }
267 267
268 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran sactionOptions) error { 268 func (d *rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tra nsactionOptions) error {
269 ropts := (*datastore.TransactionOptions)(opts) 269 ropts := (*datastore.TransactionOptions)(opts)
270 return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error { 270 return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error {
271 » » return f(context.WithValue(d.userCtx, prodContextKey, c)) 271 » » // Derive a prodState with this transaction Context.
272 » » ps := d.ps
273 » » ps.ctx = c
274 » » ps.inTxn = true
275
276 » » c = withProdState(d.userCtx, ps)
277 » » return f(c)
272 }, ropts) 278 }, ropts)
273 } 279 }
274 280
275 func (d rdsImpl) Testable() ds.Testable { 281 func (d *rdsImpl) WithoutTransaction() context.Context {
282 » c := d.userCtx
283 » if d.ps.inTxn {
284 » » // We're in a transaction. Reset to non-transactional state.
285 » » ps := d.ps
286 » » ps.ctx = ps.noTxnCtx
287 » » ps.inTxn = false
288 » » c = withProdState(c, ps)
289 » }
290 » return c
291 }
292
293 func (d *rdsImpl) CurrentTransaction() ds.Transaction {
294 » if d.ps.inTxn {
295 » » // Since we don't distinguish between transactions (yet), we jus t need this
296 » » // to be non-nil.
297 » » return struct{}{}
298 » }
276 return nil 299 return nil
277 } 300 }
301
302 func (d *rdsImpl) GetTestable() ds.Testable {
303 return nil
304 }
OLDNEW
« no previous file with comments | « impl/prod/module.go ('k') | impl/prod/raw_datastore_type_converter.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698