Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(562)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1911263002: Fix memory corruption bug in impl/memory (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/gae@master
Patch Set: fix comments Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "strings" 10 "strings"
11 "sync" 11 "sync"
12 "sync/atomic" 12 "sync/atomic"
13 13
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 "github.com/luci/gae/service/datastore/serialize" 15 "github.com/luci/gae/service/datastore/serialize"
16 "github.com/luci/luci-go/common/errors" 16 "github.com/luci/luci-go/common/errors"
17 "golang.org/x/net/context" 17 "golang.org/x/net/context"
18 ) 18 )
19 19
20 //////////////////////////////// dataStoreData ///////////////////////////////// 20 //////////////////////////////// dataStoreData /////////////////////////////////
21 21
22 type dataStoreData struct { 22 type dataStoreData struct {
23 rwlock sync.RWMutex 23 rwlock sync.RWMutex
24 24
25 // the 'appid' of this datastore 25 // the 'appid' of this datastore
26 aid string 26 aid string
27 27
28 // See README.md for head schema. 28 // See README.md for head schema.
29 » head *memStore 29 » head memStore
30 // if snap is nil, that means that this is always-consistent, and 30 // if snap is nil, that means that this is always-consistent, and
31 // getQuerySnaps will return (head, head) 31 // getQuerySnaps will return (head, head)
32 » snap *memStore 32 » snap memStore
33 // For testing, see SetTransactionRetryCount. 33 // For testing, see SetTransactionRetryCount.
34 txnFakeRetry int 34 txnFakeRetry int
35 // true means that queries with insufficent indexes will pause to add th em 35 // true means that queries with insufficent indexes will pause to add th em
36 // and then continue instead of failing. 36 // and then continue instead of failing.
37 autoIndex bool 37 autoIndex bool
38 // true means that all of the __...__ keys which are normally automatica lly 38 // true means that all of the __...__ keys which are normally automatica lly
39 // maintained will be omitted. This also means that Put with an incomple te 39 // maintained will be omitted. This also means that Put with an incomple te
40 // key will become an error. 40 // key will become an error.
41 disableSpecialEntities bool 41 disableSpecialEntities bool
42 } 42 }
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
115 defer d.Unlock() 115 defer d.Unlock()
116 d.disableSpecialEntities = true 116 d.disableSpecialEntities = true
117 } 117 }
118 118
119 func (d *dataStoreData) getDisableSpecialEntities() bool { 119 func (d *dataStoreData) getDisableSpecialEntities() bool {
120 d.rwlock.RLock() 120 d.rwlock.RLock()
121 defer d.rwlock.RUnlock() 121 defer d.rwlock.RUnlock()
122 return d.disableSpecialEntities 122 return d.disableSpecialEntities
123 } 123 }
124 124
125 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { 125 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head memStore) {
126 d.rwlock.RLock() 126 d.rwlock.RLock()
127 defer d.rwlock.RUnlock() 127 defer d.rwlock.RUnlock()
128 if d.snap == nil { 128 if d.snap == nil {
129 // we're 'always consistent' 129 // we're 'always consistent'
130 snap := d.head.Snapshot() 130 snap := d.head.Snapshot()
131 return snap, snap 131 return snap, snap
132 } 132 }
133 133
134 head = d.head.Snapshot() 134 head = d.head.Snapshot()
135 if consistent { 135 if consistent {
136 idx = head 136 idx = head
137 } else { 137 } else {
138 idx = d.snap 138 idx = d.snap
139 } 139 }
140 return 140 return
141 } 141 }
142 142
143 func (d *dataStoreData) takeSnapshot() *memStore { 143 func (d *dataStoreData) takeSnapshot() memStore {
144 d.rwlock.RLock() 144 d.rwlock.RLock()
145 defer d.rwlock.RUnlock() 145 defer d.rwlock.RUnlock()
146 return d.head.Snapshot() 146 return d.head.Snapshot()
147 } 147 }
148 148
149 func (d *dataStoreData) setSnapshot(snap *memStore) { 149 func (d *dataStoreData) setSnapshot(snap memStore) {
150 d.rwlock.Lock() 150 d.rwlock.Lock()
151 defer d.rwlock.Unlock() 151 defer d.rwlock.Unlock()
152 if d.snap == nil { 152 if d.snap == nil {
153 // we're 'always consistent' 153 // we're 'always consistent'
154 return 154 return
155 } 155 }
156 d.snap = snap 156 d.snap = snap
157 } 157 }
158 158
159 func (d *dataStoreData) catchupIndexes() { 159 func (d *dataStoreData) catchupIndexes() {
(...skipping 20 matching lines...) Expand all
180 } 180 }
181 181
182 func groupIDsKey(key *ds.Key) []byte { 182 func groupIDsKey(key *ds.Key) []byte {
183 return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo t())) 183 return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo t()))
184 } 184 }
185 185
186 func rootIDsKey(kind string) []byte { 186 func rootIDsKey(kind string) []byte {
187 return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) 187 return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil))
188 } 188 }
189 189
190 func curVersion(ents *memCollection, key []byte) int64 { 190 func curVersion(ents memCollection, key []byte) int64 {
191 if ents != nil { 191 if ents != nil {
192 if v := ents.Get(key); v != nil { 192 if v := ents.Get(key); v != nil {
193 pm, err := rpm(v) 193 pm, err := rpm(v)
194 memoryCorruption(err) 194 memoryCorruption(err)
195 195
196 pl, ok := pm["__version__"] 196 pl, ok := pm["__version__"]
197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { 197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
198 return pl[0].Value().(int64) 198 return pl[0].Value().(int64)
199 } 199 }
200 200
201 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm)) 201 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm))
202 } 202 }
203 } 203 }
204 return 0 204 return 0
205 } 205 }
206 206
207 func incrementLocked(ents *memCollection, key []byte, amt int) int64 { 207 func incrementLocked(ents memCollection, key []byte, amt int) int64 {
208 if amt <= 0 { 208 if amt <= 0 {
209 panic(fmt.Errorf("incrementLocked called with bad `amt`: %d", am t)) 209 panic(fmt.Errorf("incrementLocked called with bad `amt`: %d", am t))
210 } 210 }
211 ret := curVersion(ents, key) + 1 211 ret := curVersion(ents, key) + 1
212 ents.Set(key, serialize.ToBytes(ds.PropertyMap{ 212 ents.Set(key, serialize.ToBytes(ds.PropertyMap{
213 "__version__": {ds.MkPropertyNI(ret + int64(amt-1))}, 213 "__version__": {ds.MkPropertyNI(ret + int64(amt-1))},
214 })) 214 }))
215 return ret 215 return ret
216 } 216 }
217 217
218 func (d *dataStoreData) mutableEntsLocked(ns string) *memCollection {
219 coll := "ents:" + ns
220 ents := d.head.GetCollection(coll)
221 if ents == nil {
222 ents = d.head.SetCollection(coll, nil)
223 }
224 return ents
225 }
226
227 func (d *dataStoreData) allocateIDs(incomplete *ds.Key, n int) (int64, error) { 218 func (d *dataStoreData) allocateIDs(incomplete *ds.Key, n int) (int64, error) {
228 d.Lock() 219 d.Lock()
229 defer d.Unlock() 220 defer d.Unlock()
230 221
231 » ents := d.mutableEntsLocked(incomplete.Namespace()) 222 » ents := d.head.GetOrCreateCollection("ents:" + incomplete.Namespace())
232 return d.allocateIDsLocked(ents, incomplete, n) 223 return d.allocateIDsLocked(ents, incomplete, n)
233 } 224 }
234 225
235 func (d *dataStoreData) allocateIDsLocked(ents *memCollection, incomplete *ds.Ke y, n int) (int64, error) { 226 func (d *dataStoreData) allocateIDsLocked(ents memCollection, incomplete *ds.Key , n int) (int64, error) {
236 if d.disableSpecialEntities { 227 if d.disableSpecialEntities {
237 return 0, errors.New("disableSpecialEntities is true so allocate IDs is disabled") 228 return 0, errors.New("disableSpecialEntities is true so allocate IDs is disabled")
238 } 229 }
239 230
240 idKey := []byte(nil) 231 idKey := []byte(nil)
241 if incomplete.Parent() == nil { 232 if incomplete.Parent() == nil {
242 idKey = rootIDsKey(incomplete.Kind()) 233 idKey = rootIDsKey(incomplete.Kind())
243 } else { 234 } else {
244 idKey = groupIDsKey(incomplete) 235 idKey = groupIDsKey(incomplete)
245 } 236 }
246 return incrementLocked(ents, idKey, n), nil 237 return incrementLocked(ents, idKey, n), nil
247 } 238 }
248 239
249 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) (*ds.Key, error) { 240 func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key, error) {
250 if key.Incomplete() { 241 if key.Incomplete() {
251 id, err := d.allocateIDsLocked(ents, key, 1) 242 id, err := d.allocateIDsLocked(ents, key, 1)
252 if err != nil { 243 if err != nil {
253 return key, err 244 return key, err
254 } 245 }
255 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) 246 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent())
256 } 247 }
257 return key, nil 248 return key, nil
258 } 249 }
259 250
260 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) error { 251 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) error {
261 ns := keys[0].Namespace() 252 ns := keys[0].Namespace()
262 253
263 for i, k := range keys { 254 for i, k := range keys {
264 pmap, _ := vals[i].Save(false) 255 pmap, _ := vals[i].Save(false)
265 dataBytes := serialize.ToBytes(pmap) 256 dataBytes := serialize.ToBytes(pmap)
266 257
267 k, err := func() (ret *ds.Key, err error) { 258 k, err := func() (ret *ds.Key, err error) {
268 d.Lock() 259 d.Lock()
269 defer d.Unlock() 260 defer d.Unlock()
270 261
271 » » » ents := d.mutableEntsLocked(ns) 262 » » » ents := d.head.GetOrCreateCollection("ents:" + ns)
272 263
273 ret, err = d.fixKeyLocked(ents, k) 264 ret, err = d.fixKeyLocked(ents, k)
274 if err != nil { 265 if err != nil {
275 return 266 return
276 } 267 }
277 if !d.disableSpecialEntities { 268 if !d.disableSpecialEntities {
278 incrementLocked(ents, groupMetaKey(ret), 1) 269 incrementLocked(ents, groupMetaKey(ret), 1)
279 } 270 }
280 271
281 old := ents.Get(keyBytes(ret)) 272 old := ents.Get(keyBytes(ret))
282 oldPM := ds.PropertyMap(nil) 273 oldPM := ds.PropertyMap(nil)
283 if old != nil { 274 if old != nil {
284 if oldPM, err = rpm(old); err != nil { 275 if oldPM, err = rpm(old); err != nil {
285 return 276 return
286 } 277 }
287 } 278 }
288 ents.Set(keyBytes(ret), dataBytes) 279 ents.Set(keyBytes(ret), dataBytes)
289 updateIndexes(d.head, ret, oldPM, pmap) 280 updateIndexes(d.head, ret, oldPM, pmap)
290 return 281 return
291 }() 282 }()
292 if cb != nil { 283 if cb != nil {
293 if err := cb(k, err); err != nil { 284 if err := cb(k, err); err != nil {
294 return err 285 return err
295 } 286 }
296 } 287 }
297 } 288 }
298 return nil 289 return nil
299 } 290 }
300 291
301 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { 292 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (memCollecti on, error)) error {
302 ents, err := getColl() 293 ents, err := getColl()
303 if err != nil { 294 if err != nil {
304 return err 295 return err
305 } 296 }
306 if ents == nil { 297 if ents == nil {
307 for range keys { 298 for range keys {
308 cb(nil, ds.ErrNoSuchEntity) 299 cb(nil, ds.ErrNoSuchEntity)
309 } 300 }
310 return nil 301 return nil
311 } 302 }
312 303
313 for _, k := range keys { 304 for _, k := range keys {
314 pdata := ents.Get(keyBytes(k)) 305 pdata := ents.Get(keyBytes(k))
315 if pdata == nil { 306 if pdata == nil {
316 cb(nil, ds.ErrNoSuchEntity) 307 cb(nil, ds.ErrNoSuchEntity)
317 continue 308 continue
318 } 309 }
319 cb(rpm(pdata)) 310 cb(rpm(pdata))
320 } 311 }
321 return nil 312 return nil
322 } 313 }
323 314
324 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { 315 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
325 » return getMultiInner(keys, cb, func() (*memCollection, error) { 316 » return getMultiInner(keys, cb, func() (memCollection, error) {
326 s := d.takeSnapshot() 317 s := d.takeSnapshot()
327 318
328 return s.GetCollection("ents:" + keys[0].Namespace()), nil 319 return s.GetCollection("ents:" + keys[0].Namespace()), nil
329 }) 320 })
330 } 321 }
331 322
332 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { 323 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
333 ns := keys[0].Namespace() 324 ns := keys[0].Namespace()
334 325
335 hasEntsInNS := func() bool { 326 hasEntsInNS := func() bool {
336 d.Lock() 327 d.Lock()
337 defer d.Unlock() 328 defer d.Unlock()
338 » » return d.mutableEntsLocked(ns) != nil 329 » » return d.head.GetOrCreateCollection("ents:"+ns) != nil
339 }() 330 }()
340 331
341 if hasEntsInNS { 332 if hasEntsInNS {
342 for _, k := range keys { 333 for _, k := range keys {
343 err := func() error { 334 err := func() error {
344 kb := keyBytes(k) 335 kb := keyBytes(k)
345 336
346 d.Lock() 337 d.Lock()
347 defer d.Unlock() 338 defer d.Unlock()
348 339
349 » » » » ents := d.mutableEntsLocked(ns) 340 » » » » ents := d.head.GetOrCreateCollection("ents:" + n s)
350 341
351 if !d.disableSpecialEntities { 342 if !d.disableSpecialEntities {
352 incrementLocked(ents, groupMetaKey(k), 1 ) 343 incrementLocked(ents, groupMetaKey(k), 1 )
353 } 344 }
354 if old := ents.Get(kb); old != nil { 345 if old := ents.Get(kb); old != nil {
355 oldPM, err := rpm(old) 346 oldPM, err := rpm(old)
356 if err != nil { 347 if err != nil {
357 return err 348 return err
358 } 349 }
359 ents.Delete(kb) 350 ents.Delete(kb)
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
445 436
446 type txnDataStoreData struct { 437 type txnDataStoreData struct {
447 sync.Mutex 438 sync.Mutex
448 439
449 parent *dataStoreData 440 parent *dataStoreData
450 441
451 // boolean 0 or 1, use atomic.*Int32 to access. 442 // boolean 0 or 1, use atomic.*Int32 to access.
452 closed int32 443 closed int32
453 isXG bool 444 isXG bool
454 445
455 » snap *memStore 446 » snap memStore
456 447
457 // string is the raw-bytes encoding of the entity root incl. namespace 448 // string is the raw-bytes encoding of the entity root incl. namespace
458 muts map[string][]txnMutation 449 muts map[string][]txnMutation
459 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing 450 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
460 // length of encoded keys + values. 451 // length of encoded keys + values.
461 } 452 }
462 453
463 var _ memContextObj = (*txnDataStoreData)(nil) 454 var _ memContextObj = (*txnDataStoreData)(nil)
464 455
465 const xgEGLimit = 25 456 const xgEGLimit = 25
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
526 return nil 517 return nil
527 } 518 }
528 519
529 func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d s.PutMultiCB) { 520 func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d s.PutMultiCB) {
530 ns := keys[0].Namespace() 521 ns := keys[0].Namespace()
531 522
532 for i, k := range keys { 523 for i, k := range keys {
533 err := func() (err error) { 524 err := func() (err error) {
534 td.parent.Lock() 525 td.parent.Lock()
535 defer td.parent.Unlock() 526 defer td.parent.Unlock()
536 » » » ents := td.parent.mutableEntsLocked(ns) 527 » » » ents := td.parent.head.GetOrCreateCollection("ents:" + n s)
537 528
538 k, err = td.parent.fixKeyLocked(ents, k) 529 k, err = td.parent.fixKeyLocked(ents, k)
539 return 530 return
540 }() 531 }()
541 if err == nil { 532 if err == nil {
542 err = td.writeMutation(false, k, vals[i]) 533 err = td.writeMutation(false, k, vals[i])
543 } 534 }
544 if cb != nil { 535 if cb != nil {
545 cb(k, err) 536 cb(k, err)
546 } 537 }
547 } 538 }
548 } 539 }
549 540
550 func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { 541 func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
551 » return getMultiInner(keys, cb, func() (*memCollection, error) { 542 » return getMultiInner(keys, cb, func() (memCollection, error) {
552 err := error(nil) 543 err := error(nil)
553 for _, key := range keys { 544 for _, key := range keys {
554 err = td.writeMutation(true, key, nil) 545 err = td.writeMutation(true, key, nil)
555 if err != nil { 546 if err != nil {
556 return nil, err 547 return nil, err
557 } 548 }
558 } 549 }
559 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil 550 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil
560 }) 551 })
561 } 552 }
(...skipping 10 matching lines...) Expand all
572 563
573 func keyBytes(key *ds.Key) []byte { 564 func keyBytes(key *ds.Key) []byte {
574 return serialize.ToBytes(ds.MkProperty(key)) 565 return serialize.ToBytes(ds.MkProperty(key))
575 } 566 }
576 567
577 func rpm(data []byte) (ds.PropertyMap, error) { 568 func rpm(data []byte) (ds.PropertyMap, error) {
578 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 569 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
579 serialize.WithContext, "", "") 570 serialize.WithContext, "", "")
580 } 571 }
581 572
582 func namespaces(store *memStore) []string { 573 func namespaces(store memStore) []string {
583 var namespaces []string 574 var namespaces []string
584 for _, c := range store.GetCollectionNames() { 575 for _, c := range store.GetCollectionNames() {
585 ns, has := trimPrefix(c, "ents:") 576 ns, has := trimPrefix(c, "ents:")
586 if !has { 577 if !has {
587 if len(namespaces) > 0 { 578 if len(namespaces) > 0 {
588 break 579 break
589 } 580 }
590 continue 581 continue
591 } 582 }
592 namespaces = append(namespaces, ns) 583 namespaces = append(namespaces, ns)
593 } 584 }
594 return namespaces 585 return namespaces
595 } 586 }
596 587
597 func trimPrefix(v, p string) (string, bool) { 588 func trimPrefix(v, p string) (string, bool) {
598 if strings.HasPrefix(v, p) { 589 if strings.HasPrefix(v, p) {
599 return v[len(p):], true 590 return v[len(p):], true
600 } 591 }
601 return v, false 592 return v, false
602 } 593 }
OLDNEW
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698