OLD | NEW |
---|---|
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "strings" | 9 "strings" |
10 "sync" | 10 "sync" |
11 | 11 |
12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
13 "github.com/luci/luci-go/common/logging/memlogger" | 13 "github.com/luci/luci-go/common/logging/memlogger" |
14 | |
14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
15 ) | 16 ) |
16 | 17 |
17 var serializationDeterministic = false | 18 var serializationDeterministic = false |
18 | 19 |
19 type memContextObj interface { | 20 type memContextObj interface { |
20 sync.Locker | 21 sync.Locker |
21 canApplyTxn(m memContextObj) bool | 22 canApplyTxn(m memContextObj) bool |
22 applyTxn(c context.Context, m memContextObj) | 23 applyTxn(c context.Context, m memContextObj) |
23 | 24 |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
121 } | 122 } |
122 c = memlogger.Use(c) | 123 c = memlogger.Use(c) |
123 | 124 |
124 fqAppID := aid | 125 fqAppID := aid |
125 if parts := strings.SplitN(fqAppID, "~", 2); len(parts) == 2 { | 126 if parts := strings.SplitN(fqAppID, "~", 2); len(parts) == 2 { |
126 aid = parts[1] | 127 aid = parts[1] |
127 } | 128 } |
128 | 129 |
129 memctx := newMemContext(fqAppID) | 130 memctx := newMemContext(fqAppID) |
130 c = context.WithValue(c, memContextKey, memctx) | 131 c = context.WithValue(c, memContextKey, memctx) |
131 c = context.WithValue(c, memContextNoTxnKey, memctx) | |
132 c = useGID(c, func(mod *globalInfoData) { | 132 c = useGID(c, func(mod *globalInfoData) { |
133 mod.appID = aid | 133 mod.appID = aid |
134 mod.fqAppID = fqAppID | 134 mod.fqAppID = fqAppID |
135 }) | 135 }) |
136 return useMod(useMail(useUser(useTQ(useRDS(useMC(useGI(c))))))) | 136 return useMod(useMail(useUser(useTQ(useRDS(useMC(useGI(c))))))) |
137 } | 137 } |
138 | 138 |
139 func cur(c context.Context) (p *memContext) { | 139 func cur(c context.Context) (*memContext, bool) { |
140 » p, _ = c.Value(memContextKey).(*memContext) | 140 » if txn := c.Value(currentTxnKey); txn != nil { |
141 » return | 141 » » // We are in a Transaction. |
142 » » return txn.(*memContext), true | |
143 » } | |
144 » return c.Value(memContextKey).(*memContext), false | |
142 } | 145 } |
143 | 146 |
144 func curNoTxn(c context.Context) (p *memContext) { | 147 func inTxn(c context.Context) bool { |
dnj
2016/09/01 15:25:39
Ended up not needing this, will delete in next pat
iannucci
2016/09/16 01:01:13
sgtm
dnj
2016/09/16 05:44:42
Done.
| |
145 » p, _ = c.Value(memContextNoTxnKey).(*memContext) | 148 » _, inTxn := cur(c) |
146 » return | 149 » return inTxn |
147 } | 150 } |
148 | 151 |
149 type memContextKeyType int | 152 type memContextKeyType int |
150 | 153 |
151 var ( | 154 var ( |
152 » memContextKey memContextKeyType | 155 » memContextKey memContextKeyType |
153 » memContextNoTxnKey memContextKeyType = 1 | 156 » currentTxnKey = 1 |
154 ) | 157 ) |
155 | 158 |
156 // weird stuff | 159 // weird stuff |
157 | 160 |
158 // RunInTransaction is here because it's really a service-wide transaction, not | 161 // RunInTransaction is here because it's really a service-wide transaction, not |
159 // just in the datastore. TaskQueue behaves differently in a transaction in | 162 // just in the datastore. TaskQueue behaves differently in a transaction in |
160 // a couple ways, for example. | 163 // a couple ways, for example. |
161 // | 164 // |
162 // It really should have been appengine.Context.RunInTransaction(func(tc...)), | 165 // It really should have been appengine.Context.RunInTransaction(func(tc...)), |
163 // but because it's not, this method is on dsImpl instead to mirror the official | 166 // but because it's not, this method is on dsImpl instead to mirror the official |
164 // API. | 167 // API. |
165 // | 168 // |
166 // The fake implementation also differs from the real implementation because the | 169 // The fake implementation also differs from the real implementation because the |
167 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the | 170 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the |
168 // test-access API for TaskQueue better (instead of trying to reconstitute the | 171 // test-access API for TaskQueue better (instead of trying to reconstitute the |
169 // state of the task queue from a bunch of datastore accesses). | 172 // state of the task queue from a bunch of datastore accesses). |
170 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti onOptions) error { | 173 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti onOptions) error { |
171 if d.data.getDisableSpecialEntities() { | 174 if d.data.getDisableSpecialEntities() { |
172 return errors.New("special entities are disabled. no transaction s for you") | 175 return errors.New("special entities are disabled. no transaction s for you") |
173 } | 176 } |
174 | 177 |
175 // Keep in separate function for defers. | 178 // Keep in separate function for defers. |
176 loopBody := func(applyForReal bool) error { | 179 loopBody := func(applyForReal bool) error { |
177 » » curMC := cur(d.c) | 180 » » curMC, inTxn := cur(d) |
181 » » if inTxn { | |
182 » » » return errors.New("datastore: nested transactions are no t supported") | |
183 » » } | |
178 | 184 |
179 txnMC := curMC.mkTxn(o) | 185 txnMC := curMC.mkTxn(o) |
180 | 186 |
181 defer func() { | 187 defer func() { |
182 txnMC.Lock() | 188 txnMC.Lock() |
183 defer txnMC.Unlock() | 189 defer txnMC.Unlock() |
184 | 190 |
185 txnMC.endTxn() | 191 txnMC.endTxn() |
186 }() | 192 }() |
187 | 193 |
188 » » if err := f(context.WithValue(d.c, memContextKey, txnMC)); err ! = nil { | 194 » » if err := f(context.WithValue(d, currentTxnKey, txnMC)); err != nil { |
189 return err | 195 return err |
190 } | 196 } |
191 | 197 |
192 txnMC.Lock() | 198 txnMC.Lock() |
193 defer txnMC.Unlock() | 199 defer txnMC.Unlock() |
194 | 200 |
195 if applyForReal && curMC.canApplyTxn(txnMC) { | 201 if applyForReal && curMC.canApplyTxn(txnMC) { |
196 » » » curMC.applyTxn(d.c, txnMC) | 202 » » » curMC.applyTxn(d, txnMC) |
197 } else { | 203 } else { |
198 return ds.ErrConcurrentTransaction | 204 return ds.ErrConcurrentTransaction |
199 } | 205 } |
200 return nil | 206 return nil |
201 } | 207 } |
202 | 208 |
203 // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." | 209 // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." |
204 attempts := 3 | 210 attempts := 3 |
205 if o != nil && o.Attempts != 0 { | 211 if o != nil && o.Attempts != 0 { |
206 attempts = o.Attempts | 212 attempts = o.Attempts |
207 } | 213 } |
208 for attempt := 0; attempt < attempts; attempt++ { | 214 for attempt := 0; attempt < attempts; attempt++ { |
209 if err := loopBody(attempt >= d.data.txnFakeRetry); err != ds.Er rConcurrentTransaction { | 215 if err := loopBody(attempt >= d.data.txnFakeRetry); err != ds.Er rConcurrentTransaction { |
210 return err | 216 return err |
211 } | 217 } |
212 } | 218 } |
213 return ds.ErrConcurrentTransaction | 219 return ds.ErrConcurrentTransaction |
214 } | 220 } |
OLD | NEW |