OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package prod | 5 package prod |
6 | 6 |
7 import ( | 7 import ( |
8 ds "github.com/luci/gae/service/datastore" | 8 ds "github.com/luci/gae/service/datastore" |
9 "github.com/luci/gae/service/info" | 9 "github.com/luci/gae/service/info" |
10 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
11 "golang.org/x/net/context" | 11 "golang.org/x/net/context" |
12 "google.golang.org/appengine/datastore" | 12 "google.golang.org/appengine/datastore" |
13 ) | 13 ) |
14 | 14 |
15 // useRDS adds a gae.RawDatastore implementation to context, accessible | 15 // useRDS adds a gae.RawDatastore implementation to context, accessible |
16 // by gae.GetDS(c) | 16 // by gae.GetDS(c) |
17 func useRDS(c context.Context) context.Context { | 17 func useRDS(c context.Context) context.Context { |
18 return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface { | 18 return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface { |
19 » » return rdsImpl{ci, info.Get(ci).GetNamespace()} | 19 » » return rdsImpl{ci, AEContext(ci), info.Get(ci).GetNamespace()} |
20 }) | 20 }) |
21 } | 21 } |
22 | 22 |
23 ////////// Datastore | 23 ////////// Datastore |
24 | 24 |
25 type rdsImpl struct { | 25 type rdsImpl struct { |
26 » context.Context | 26 » // userCtx is the context that has the luci/gae services and user object
s in |
| 27 » // it. |
| 28 » userCtx context.Context |
| 29 |
| 30 » // aeCtx is the context with the appengine connection information in it. |
| 31 » aeCtx context.Context |
27 | 32 |
28 ns string | 33 ns string |
29 } | 34 } |
30 | 35 |
31 func idxCallbacker(err error, amt int, cb func(idx int, err error)) error { | 36 func idxCallbacker(err error, amt int, cb func(idx int, err error)) error { |
32 if err == nil { | 37 if err == nil { |
33 for i := 0; i < amt; i++ { | 38 for i := 0; i < amt; i++ { |
34 cb(i, nil) | 39 cb(i, nil) |
35 } | 40 } |
36 return nil | 41 return nil |
37 } | 42 } |
38 err = errors.Fix(err) | 43 err = errors.Fix(err) |
39 me, ok := err.(errors.MultiError) | 44 me, ok := err.(errors.MultiError) |
40 if ok { | 45 if ok { |
41 for i, err := range me { | 46 for i, err := range me { |
42 cb(i, err) | 47 cb(i, err) |
43 } | 48 } |
44 return nil | 49 return nil |
45 } | 50 } |
46 return err | 51 return err |
47 } | 52 } |
48 | 53 |
49 func (d rdsImpl) AllocateIDs(incomplete *ds.Key, n int) (start int64, err error)
{ | 54 func (d rdsImpl) AllocateIDs(incomplete *ds.Key, n int) (start int64, err error)
{ |
50 » par, err := dsF2R(d, incomplete.Parent()) | 55 » par, err := dsF2R(d.aeCtx, incomplete.Parent()) |
51 if err != nil { | 56 if err != nil { |
52 return | 57 return |
53 } | 58 } |
54 | 59 |
55 » start, _, err = datastore.AllocateIDs(d, incomplete.Kind(), par, n) | 60 » start, _, err = datastore.AllocateIDs(d.aeCtx, incomplete.Kind(), par, n
) |
56 return | 61 return |
57 } | 62 } |
58 | 63 |
59 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { | 64 func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { |
60 » keys, err := dsMF2R(d, ks) | 65 » keys, err := dsMF2R(d.aeCtx, ks) |
61 if err == nil { | 66 if err == nil { |
62 » » err = datastore.DeleteMulti(d, keys) | 67 » » err = datastore.DeleteMulti(d.aeCtx, keys) |
63 } | 68 } |
64 return idxCallbacker(err, len(ks), func(_ int, err error) { | 69 return idxCallbacker(err, len(ks), func(_ int, err error) { |
65 cb(err) | 70 cb(err) |
66 }) | 71 }) |
67 } | 72 } |
68 | 73 |
69 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul
tiCB) error { | 74 func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMul
tiCB) error { |
70 vals := make([]datastore.PropertyLoadSaver, len(keys)) | 75 vals := make([]datastore.PropertyLoadSaver, len(keys)) |
71 » rkeys, err := dsMF2R(d, keys) | 76 » rkeys, err := dsMF2R(d.aeCtx, keys) |
72 if err == nil { | 77 if err == nil { |
73 for i := range keys { | 78 for i := range keys { |
74 » » » vals[i] = &typeFilter{d, ds.PropertyMap{}} | 79 » » » vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} |
75 } | 80 } |
76 » » err = datastore.GetMulti(d, rkeys, vals) | 81 » » err = datastore.GetMulti(d.aeCtx, rkeys, vals) |
77 } | 82 } |
78 return idxCallbacker(err, len(keys), func(idx int, err error) { | 83 return idxCallbacker(err, len(keys), func(idx int, err error) { |
79 if pls := vals[idx]; pls != nil { | 84 if pls := vals[idx]; pls != nil { |
80 cb(pls.(*typeFilter).pm, err) | 85 cb(pls.(*typeFilter).pm, err) |
81 } else { | 86 } else { |
82 cb(nil, err) | 87 cb(nil, err) |
83 } | 88 } |
84 }) | 89 }) |
85 } | 90 } |
86 | 91 |
87 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiC
B) error { | 92 func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiC
B) error { |
88 » rkeys, err := dsMF2R(d, keys) | 93 » rkeys, err := dsMF2R(d.aeCtx, keys) |
89 if err == nil { | 94 if err == nil { |
90 rvals := make([]datastore.PropertyLoadSaver, len(vals)) | 95 rvals := make([]datastore.PropertyLoadSaver, len(vals)) |
91 for i, val := range vals { | 96 for i, val := range vals { |
92 » » » rvals[i] = &typeFilter{d, val} | 97 » » » rvals[i] = &typeFilter{d.aeCtx, val} |
93 } | 98 } |
94 » » rkeys, err = datastore.PutMulti(d, rkeys, rvals) | 99 » » rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) |
95 } | 100 } |
96 return idxCallbacker(err, len(keys), func(idx int, err error) { | 101 return idxCallbacker(err, len(keys), func(idx int, err error) { |
97 k := (*ds.Key)(nil) | 102 k := (*ds.Key)(nil) |
98 if err == nil { | 103 if err == nil { |
99 k = dsR2F(rkeys[idx]) | 104 k = dsR2F(rkeys[idx]) |
100 } | 105 } |
101 cb(k, err) | 106 cb(k, err) |
102 }) | 107 }) |
103 } | 108 } |
104 | 109 |
105 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { | 110 func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { |
106 ret := datastore.NewQuery(fq.Kind()) | 111 ret := datastore.NewQuery(fq.Kind()) |
107 | 112 |
108 start, end := fq.Bounds() | 113 start, end := fq.Bounds() |
109 if start != nil { | 114 if start != nil { |
110 ret = ret.Start(start.(datastore.Cursor)) | 115 ret = ret.Start(start.(datastore.Cursor)) |
111 } | 116 } |
112 if end != nil { | 117 if end != nil { |
113 ret = ret.End(end.(datastore.Cursor)) | 118 ret = ret.End(end.(datastore.Cursor)) |
114 } | 119 } |
115 | 120 |
116 for prop, vals := range fq.EqFilters() { | 121 for prop, vals := range fq.EqFilters() { |
117 if prop == "__ancestor__" { | 122 if prop == "__ancestor__" { |
118 » » » p, err := dsF2RProp(d, vals[0]) | 123 » » » p, err := dsF2RProp(d.aeCtx, vals[0]) |
119 if err != nil { | 124 if err != nil { |
120 return nil, err | 125 return nil, err |
121 } | 126 } |
122 ret = ret.Ancestor(p.Value.(*datastore.Key)) | 127 ret = ret.Ancestor(p.Value.(*datastore.Key)) |
123 } else { | 128 } else { |
124 filt := prop + "=" | 129 filt := prop + "=" |
125 for _, v := range vals { | 130 for _, v := range vals { |
126 » » » » p, err := dsF2RProp(d, v) | 131 » » » » p, err := dsF2RProp(d.aeCtx, v) |
127 if err != nil { | 132 if err != nil { |
128 return nil, err | 133 return nil, err |
129 } | 134 } |
130 | 135 |
131 ret = ret.Filter(filt, p.Value) | 136 ret = ret.Filter(filt, p.Value) |
132 } | 137 } |
133 } | 138 } |
134 } | 139 } |
135 | 140 |
136 if lnam, lop, lprop := fq.IneqFilterLow(); lnam != "" { | 141 if lnam, lop, lprop := fq.IneqFilterLow(); lnam != "" { |
137 » » p, err := dsF2RProp(d, lprop) | 142 » » p, err := dsF2RProp(d.aeCtx, lprop) |
138 if err != nil { | 143 if err != nil { |
139 return nil, err | 144 return nil, err |
140 } | 145 } |
141 ret = ret.Filter(lnam+" "+lop, p.Value) | 146 ret = ret.Filter(lnam+" "+lop, p.Value) |
142 } | 147 } |
143 | 148 |
144 if hnam, hop, hprop := fq.IneqFilterHigh(); hnam != "" { | 149 if hnam, hop, hprop := fq.IneqFilterHigh(); hnam != "" { |
145 » » p, err := dsF2RProp(d, hprop) | 150 » » p, err := dsF2RProp(d.aeCtx, hprop) |
146 if err != nil { | 151 if err != nil { |
147 return nil, err | 152 return nil, err |
148 } | 153 } |
149 ret = ret.Filter(hnam+" "+hop, p.Value) | 154 ret = ret.Filter(hnam+" "+hop, p.Value) |
150 } | 155 } |
151 | 156 |
152 if fq.EventuallyConsistent() { | 157 if fq.EventuallyConsistent() { |
153 ret = ret.EventualConsistency() | 158 ret = ret.EventualConsistency() |
154 } | 159 } |
155 | 160 |
(...skipping 24 matching lines...) Expand all Loading... |
180 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { | 185 func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { |
181 return datastore.DecodeCursor(s) | 186 return datastore.DecodeCursor(s) |
182 } | 187 } |
183 | 188 |
184 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { | 189 func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
185 q, err := d.fixQuery(fq) | 190 q, err := d.fixQuery(fq) |
186 if err != nil { | 191 if err != nil { |
187 return err | 192 return err |
188 } | 193 } |
189 | 194 |
190 » t := q.Run(d) | 195 » t := q.Run(d.aeCtx) |
191 | 196 |
192 cfunc := func() (ds.Cursor, error) { | 197 cfunc := func() (ds.Cursor, error) { |
193 return t.Cursor() | 198 return t.Cursor() |
194 } | 199 } |
195 tf := typeFilter{} | 200 tf := typeFilter{} |
196 for { | 201 for { |
197 k, err := t.Next(&tf) | 202 k, err := t.Next(&tf) |
198 if err == datastore.Done { | 203 if err == datastore.Done { |
199 return nil | 204 return nil |
200 } | 205 } |
201 if err != nil { | 206 if err != nil { |
202 return err | 207 return err |
203 } | 208 } |
204 if !cb(dsR2F(k), tf.pm, cfunc) { | 209 if !cb(dsR2F(k), tf.pm, cfunc) { |
205 return nil | 210 return nil |
206 } | 211 } |
207 } | 212 } |
208 } | 213 } |
209 | 214 |
210 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { | 215 func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { |
211 q, err := d.fixQuery(fq) | 216 q, err := d.fixQuery(fq) |
212 if err != nil { | 217 if err != nil { |
213 return 0, err | 218 return 0, err |
214 } | 219 } |
215 » ret, err := q.Count(d) | 220 » ret, err := q.Count(d.aeCtx) |
216 return int64(ret), err | 221 return int64(ret), err |
217 } | 222 } |
218 | 223 |
219 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran
sactionOptions) error { | 224 func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.Tran
sactionOptions) error { |
220 ropts := (*datastore.TransactionOptions)(opts) | 225 ropts := (*datastore.TransactionOptions)(opts) |
221 » return datastore.RunInTransaction(d, f, ropts) | 226 » return datastore.RunInTransaction(d.aeCtx, func(aeCtx context.Context) e
rror { |
| 227 » » return f(context.WithValue(d.userCtx, prodContextKey, aeCtx)) |
| 228 » }, ropts) |
222 } | 229 } |
223 | 230 |
224 func (d rdsImpl) Testable() ds.Testable { | 231 func (d rdsImpl) Testable() ds.Testable { |
225 return nil | 232 return nil |
226 } | 233 } |
OLD | NEW |