Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(27)

Side by Side Diff: impl/prod/raw_datastore.go

Issue 1490243004: Make appengine connection explicitly owned by impl/prod. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix other services too Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package prod 5 package prod
6 6
7 import ( 7 import (
8 ds "github.com/luci/gae/service/datastore" 8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/gae/service/info" 9 "github.com/luci/gae/service/info"
10 "github.com/luci/luci-go/common/errors" 10 "github.com/luci/luci-go/common/errors"
11 "golang.org/x/net/context" 11 "golang.org/x/net/context"
12 "google.golang.org/appengine/datastore" 12 "google.golang.org/appengine/datastore"
13 ) 13 )
14 14
15 // useRDS adds a gae.RawDatastore implementation to context, accessible 15 // useRDS adds a gae.RawDatastore implementation to context, accessible
16 // by gae.GetDS(c) 16 // by gae.GetDS(c)
17 func useRDS(c context.Context) context.Context { 17 func useRDS(c context.Context) context.Context {
18 return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface { 18 return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface {
19 » » return rdsImpl{ci, info.Get(ci).GetNamespace()} 19 » » return rdsImpl{ci, AEContext(ci), info.Get(ci).GetNamespace()}
20 }) 20 })
21 } 21 }
22 22
23 ////////// Datastore 23 ////////// Datastore
24 24
25 type rdsImpl struct { 25 type rdsImpl struct {
26 » context.Context 26 » // userCtx is the context that has the luci/gae services and user object s in
27 » // it.
28 » userCtx context.Context
29
30 » // aeCtx is the context with the appengine connection information in it.
31 » aeCtx context.Context
27 32
28 ns string 33 ns string
29 } 34 }
30 35
31 func idxCallbacker(err error, amt int, cb func(idx int, err error)) error { 36 func idxCallbacker(err error, amt int, cb func(idx int, err error)) error {
32 if err == nil { 37 if err == nil {
33 for i := 0; i < amt; i++ { 38 for i := 0; i < amt; i++ {
34 cb(i, nil) 39 cb(i, nil)
35 } 40 }
36 return nil 41 return nil
37 } 42 }
38 err = errors.Fix(err) 43 err = errors.Fix(err)
39 me, ok := err.(errors.MultiError) 44 me, ok := err.(errors.MultiError)
40 if ok { 45 if ok {
41 for i, err := range me { 46 for i, err := range me {
42 cb(i, err) 47 cb(i, err)
43 } 48 }
44 return nil 49 return nil
45 } 50 }
46 return err 51 return err
47 } 52 }
48 53
49 func (d rdsImpl) AllocateIDs(incomplete *ds.Key, n int) (start int64, err error) { 54 func (d rdsImpl) AllocateIDs(incomplete *ds.Key, n int) (start int64, err error) {
50 » par, err := dsF2R(d, incomplete.Parent()) 55 » par, err := dsF2R(d.aeCtx, incomplete.Parent())
51 if err != nil { 56 if err != nil {
52 return 57 return
53 } 58 }
54 59
55 » start, _, err = datastore.AllocateIDs(d, incomplete.Kind(), par, n) 60 » start, _, err = datastore.AllocateIDs(d.aeCtx, incomplete.Kind(), par, n )
56 return 61 return
57 } 62 }
58 63
59 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { 64 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error {
60 » keys, err := dsMF2R(d, ks) 65 » keys, err := dsMF2R(d.aeCtx, ks)
61 if err == nil { 66 if err == nil {
62 » » err = datastore.DeleteMulti(d, keys) 67 » » err = datastore.DeleteMulti(d.aeCtx, keys)
63 } 68 }
64 return idxCallbacker(err, len(ks), func(_ int, err error) { 69 return idxCallbacker(err, len(ks), func(_ int, err error) {
65 cb(err) 70 cb(err)
66 }) 71 })
67 } 72 }
68 73
69 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul tiCB) error { 74 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul tiCB) error {
70 vals := make([]datastore.PropertyLoadSaver, len(keys)) 75 vals := make([]datastore.PropertyLoadSaver, len(keys))
71 » rkeys, err := dsMF2R(d, keys) 76 » rkeys, err := dsMF2R(d.aeCtx, keys)
72 if err == nil { 77 if err == nil {
73 for i := range keys { 78 for i := range keys {
74 » » » vals[i] = &typeFilter{d, ds.PropertyMap{}} 79 » » » vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}}
75 } 80 }
76 » » err = datastore.GetMulti(d, rkeys, vals) 81 » » err = datastore.GetMulti(d.aeCtx, rkeys, vals)
77 } 82 }
78 return idxCallbacker(err, len(keys), func(idx int, err error) { 83 return idxCallbacker(err, len(keys), func(idx int, err error) {
79 if pls := vals[idx]; pls != nil { 84 if pls := vals[idx]; pls != nil {
80 cb(pls.(*typeFilter).pm, err) 85 cb(pls.(*typeFilter).pm, err)
81 } else { 86 } else {
82 cb(nil, err) 87 cb(nil, err)
83 } 88 }
84 }) 89 })
85 } 90 }
86 91
87 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiC B) error { 92 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiC B) error {
88 » rkeys, err := dsMF2R(d, keys) 93 » rkeys, err := dsMF2R(d.aeCtx, keys)
89 if err == nil { 94 if err == nil {
90 rvals := make([]datastore.PropertyLoadSaver, len(vals)) 95 rvals := make([]datastore.PropertyLoadSaver, len(vals))
91 for i, val := range vals { 96 for i, val := range vals {
92 » » » rvals[i] = &typeFilter{d, val} 97 » » » rvals[i] = &typeFilter{d.aeCtx, val}
93 } 98 }
94 » » rkeys, err = datastore.PutMulti(d, rkeys, rvals) 99 » » rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals)
95 } 100 }
96 return idxCallbacker(err, len(keys), func(idx int, err error) { 101 return idxCallbacker(err, len(keys), func(idx int, err error) {
97 k := (*ds.Key)(nil) 102 k := (*ds.Key)(nil)
98 if err == nil { 103 if err == nil {
99 k = dsR2F(rkeys[idx]) 104 k = dsR2F(rkeys[idx])
100 } 105 }
101 cb(k, err) 106 cb(k, err)
102 }) 107 })
103 } 108 }
104 109
105 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { 110 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) {
106 ret := datastore.NewQuery(fq.Kind()) 111 ret := datastore.NewQuery(fq.Kind())
107 112
108 start, end := fq.Bounds() 113 start, end := fq.Bounds()
109 if start != nil { 114 if start != nil {
110 ret = ret.Start(start.(datastore.Cursor)) 115 ret = ret.Start(start.(datastore.Cursor))
111 } 116 }
112 if end != nil { 117 if end != nil {
113 ret = ret.End(end.(datastore.Cursor)) 118 ret = ret.End(end.(datastore.Cursor))
114 } 119 }
115 120
116 for prop, vals := range fq.EqFilters() { 121 for prop, vals := range fq.EqFilters() {
117 if prop == "__ancestor__" { 122 if prop == "__ancestor__" {
118 » » » p, err := dsF2RProp(d, vals[0]) 123 » » » p, err := dsF2RProp(d.aeCtx, vals[0])
119 if err != nil { 124 if err != nil {
120 return nil, err 125 return nil, err
121 } 126 }
122 ret = ret.Ancestor(p.Value.(*datastore.Key)) 127 ret = ret.Ancestor(p.Value.(*datastore.Key))
123 } else { 128 } else {
124 filt := prop + "=" 129 filt := prop + "="
125 for _, v := range vals { 130 for _, v := range vals {
126 » » » » p, err := dsF2RProp(d, v) 131 » » » » p, err := dsF2RProp(d.aeCtx, v)
127 if err != nil { 132 if err != nil {
128 return nil, err 133 return nil, err
129 } 134 }
130 135
131 ret = ret.Filter(filt, p.Value) 136 ret = ret.Filter(filt, p.Value)
132 } 137 }
133 } 138 }
134 } 139 }
135 140
136 if lnam, lop, lprop := fq.IneqFilterLow(); lnam != "" { 141 if lnam, lop, lprop := fq.IneqFilterLow(); lnam != "" {
137 » » p, err := dsF2RProp(d, lprop) 142 » » p, err := dsF2RProp(d.aeCtx, lprop)
138 if err != nil { 143 if err != nil {
139 return nil, err 144 return nil, err
140 } 145 }
141 ret = ret.Filter(lnam+" "+lop, p.Value) 146 ret = ret.Filter(lnam+" "+lop, p.Value)
142 } 147 }
143 148
144 if hnam, hop, hprop := fq.IneqFilterHigh(); hnam != "" { 149 if hnam, hop, hprop := fq.IneqFilterHigh(); hnam != "" {
145 » » p, err := dsF2RProp(d, hprop) 150 » » p, err := dsF2RProp(d.aeCtx, hprop)
146 if err != nil { 151 if err != nil {
147 return nil, err 152 return nil, err
148 } 153 }
149 ret = ret.Filter(hnam+" "+hop, p.Value) 154 ret = ret.Filter(hnam+" "+hop, p.Value)
150 } 155 }
151 156
152 if fq.EventuallyConsistent() { 157 if fq.EventuallyConsistent() {
153 ret = ret.EventualConsistency() 158 ret = ret.EventualConsistency()
154 } 159 }
155 160
(...skipping 24 matching lines...) Expand all
180 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { 185 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) {
181 return datastore.DecodeCursor(s) 186 return datastore.DecodeCursor(s)
182 } 187 }
183 188
184 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { 189 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
185 q, err := d.fixQuery(fq) 190 q, err := d.fixQuery(fq)
186 if err != nil { 191 if err != nil {
187 return err 192 return err
188 } 193 }
189 194
190 » t := q.Run(d) 195 » t := q.Run(d.aeCtx)
191 196
192 cfunc := func() (ds.Cursor, error) { 197 cfunc := func() (ds.Cursor, error) {
193 return t.Cursor() 198 return t.Cursor()
194 } 199 }
195 tf := typeFilter{} 200 tf := typeFilter{}
196 for { 201 for {
197 k, err := t.Next(&tf) 202 k, err := t.Next(&tf)
198 if err == datastore.Done { 203 if err == datastore.Done {
199 return nil 204 return nil
200 } 205 }
201 if err != nil { 206 if err != nil {
202 return err 207 return err
203 } 208 }
204 if !cb(dsR2F(k), tf.pm, cfunc) { 209 if !cb(dsR2F(k), tf.pm, cfunc) {
205 return nil 210 return nil
206 } 211 }
207 } 212 }
208 } 213 }
209 214
210 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { 215 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) {
211 q, err := d.fixQuery(fq) 216 q, err := d.fixQuery(fq)
212 if err != nil { 217 if err != nil {
213 return 0, err 218 return 0, err
214 } 219 }
215 » ret, err := q.Count(d) 220 » ret, err := q.Count(d.aeCtx)
216 return int64(ret), err 221 return int64(ret), err
217 } 222 }
218 223
219 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran sactionOptions) error { 224 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran sactionOptions) error {
220 ropts := (*datastore.TransactionOptions)(opts) 225 ropts := (*datastore.TransactionOptions)(opts)
221 » return datastore.RunInTransaction(d, f, ropts) 226 » return datastore.RunInTransaction(d.aeCtx, func(aeCtx context.Context) e rror {
227 » » return f(context.WithValue(d.userCtx, prodContextKey, aeCtx))
228 » }, ropts)
222 } 229 }
223 230
224 func (d rdsImpl) Testable() ds.Testable { 231 func (d rdsImpl) Testable() ds.Testable {
225 return nil 232 return nil
226 } 233 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698