OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "strings" | 10 "strings" |
11 "sync" | 11 "sync" |
12 "sync/atomic" | |
13 | 12 |
14 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
15 "github.com/luci/gae/service/datastore/serialize" | 14 "github.com/luci/gae/service/datastore/serialize" |
16 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 16 |
17 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
18 ) | 18 ) |
19 | 19 |
20 //////////////////////////////// dataStoreData ///////////////////////////////// | 20 //////////////////////////////// dataStoreData ///////////////////////////////// |
21 | 21 |
22 type dataStoreData struct { | 22 type dataStoreData struct { |
23 rwlock sync.RWMutex | 23 rwlock sync.RWMutex |
24 | 24 |
25 // the 'appid' of this datastore | 25 // the 'appid' of this datastore |
26 aid string | 26 aid string |
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
169 func (d *dataStoreData) namespaces() []string { | 169 func (d *dataStoreData) namespaces() []string { |
170 d.rwlock.Lock() | 170 d.rwlock.Lock() |
171 defer d.rwlock.Unlock() | 171 defer d.rwlock.Unlock() |
172 | 172 |
173 return namespaces(d.head) | 173 return namespaces(d.head) |
174 } | 174 } |
175 | 175 |
176 /////////////////////////// indexes(dataStoreData) //////////////////////////// | 176 /////////////////////////// indexes(dataStoreData) //////////////////////////// |
177 | 177 |
178 func groupMetaKey(key *ds.Key) []byte { | 178 func groupMetaKey(key *ds.Key) []byte { |
179 » return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root())
) | 179 » return keyBytes(ds.KeyContext{}.NewKey("__entity_group__", "", 1, key.Ro
ot())) |
180 } | 180 } |
181 | 181 |
182 func groupIDsKey(key *ds.Key) []byte { | 182 func groupIDsKey(key *ds.Key) []byte { |
183 » return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo
t())) | 183 » return keyBytes(ds.KeyContext{}.NewKey("__entity_group_ids__", "", 1, ke
y.Root())) |
184 } | 184 } |
185 | 185 |
186 func rootIDsKey(kind string) []byte { | 186 func rootIDsKey(kind string) []byte { |
187 » return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | 187 » return keyBytes(ds.KeyContext{}.NewKey("__entity_root_ids__", kind, 0, n
il)) |
188 } | 188 } |
189 | 189 |
190 func curVersion(ents memCollection, key []byte) int64 { | 190 func curVersion(ents memCollection, key []byte) int64 { |
191 if ents != nil { | 191 if ents != nil { |
192 if v := ents.Get(key); v != nil { | 192 if v := ents.Get(key); v != nil { |
193 pm, err := rpm(v) | 193 pm, err := rpm(v) |
194 memoryCorruption(err) | 194 memoryCorruption(err) |
195 | 195 |
196 pl, ok := pm["__version__"] | 196 pl, ok := pm["__version__"] |
197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 197 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
272 } | 272 } |
273 return incrementLocked(ents, idKey, n), nil | 273 return incrementLocked(ents, idKey, n), nil |
274 } | 274 } |
275 | 275 |
276 func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key,
error) { | 276 func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key,
error) { |
277 if key.IsIncomplete() { | 277 if key.IsIncomplete() { |
278 id, err := d.allocateIDsLocked(ents, key, 1) | 278 id, err := d.allocateIDsLocked(ents, key, 1) |
279 if err != nil { | 279 if err != nil { |
280 return key, err | 280 return key, err |
281 } | 281 } |
282 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) | 282 » » key = key.KeyContext().NewKey(key.Kind(), "", id, key.Parent()) |
283 } | 283 } |
284 return key, nil | 284 return key, nil |
285 } | 285 } |
286 | 286 |
287 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Ne
wKeyCB) error { | 287 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Ne
wKeyCB) error { |
288 ns := keys[0].Namespace() | 288 ns := keys[0].Namespace() |
289 | 289 |
290 for i, k := range keys { | 290 for i, k := range keys { |
291 pmap, _ := vals[i].Save(false) | 291 pmap, _ := vals[i].Save(false) |
292 dataBytes := serialize.ToBytes(pmap) | 292 dataBytes := serialize.ToBytes(pmap) |
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
405 } | 405 } |
406 | 406 |
407 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 407 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
408 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 408 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
409 | 409 |
410 txn := obj.(*txnDataStoreData) | 410 txn := obj.(*txnDataStoreData) |
411 for rk, muts := range txn.muts { | 411 for rk, muts := range txn.muts { |
412 if len(muts) == 0 { // read-only | 412 if len(muts) == 0 { // read-only |
413 continue | 413 continue |
414 } | 414 } |
415 » » prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s
erialize.WithContext, "", "") | 415 » » prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s
erialize.WithContext, ds.KeyContext{}) |
416 memoryCorruption(err) | 416 memoryCorruption(err) |
417 | 417 |
418 k := prop.Value().(*ds.Key) | 418 k := prop.Value().(*ds.Key) |
419 | 419 |
420 entKey := "ents:" + k.Namespace() | 420 entKey := "ents:" + k.Namespace() |
421 mkey := groupMetaKey(k) | 421 mkey := groupMetaKey(k) |
422 entsHead := d.head.GetCollection(entKey) | 422 entsHead := d.head.GetCollection(entKey) |
423 entsSnap := txn.snap.GetCollection(entKey) | 423 entsSnap := txn.snap.GetCollection(entKey) |
424 vHead := curVersion(entsHead, mkey) | 424 vHead := curVersion(entsHead, mkey) |
425 vSnap := curVersion(entsSnap, mkey) | 425 vSnap := curVersion(entsSnap, mkey) |
(...skipping 22 matching lines...) Expand all Loading... |
448 } | 448 } |
449 } | 449 } |
450 } | 450 } |
451 } | 451 } |
452 | 452 |
453 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | 453 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
454 return &txnDataStoreData{ | 454 return &txnDataStoreData{ |
455 // alias to the main datastore's so that testing code can have p
rimitive | 455 // alias to the main datastore's so that testing code can have p
rimitive |
456 // access to break features inside of transactions. | 456 // access to break features inside of transactions. |
457 parent: d, | 457 parent: d, |
458 » » isXG: o != nil && o.XG, | 458 » » txn: &transactionImpl{ |
459 » » snap: d.head.Snapshot(), | 459 » » » isXG: o != nil && o.XG, |
460 » » muts: map[string][]txnMutation{}, | 460 » » }, |
| 461 » » snap: d.head.Snapshot(), |
| 462 » » muts: map[string][]txnMutation{}, |
461 } | 463 } |
462 } | 464 } |
463 | 465 |
464 func (d *dataStoreData) endTxn() {} | 466 func (d *dataStoreData) endTxn() {} |
465 | 467 |
466 /////////////////////////////// txnDataStoreData /////////////////////////////// | 468 /////////////////////////////// txnDataStoreData /////////////////////////////// |
467 | 469 |
468 type txnMutation struct { | 470 type txnMutation struct { |
469 key *ds.Key | 471 key *ds.Key |
470 data ds.PropertyMap | 472 data ds.PropertyMap |
471 } | 473 } |
472 | 474 |
473 type txnDataStoreData struct { | 475 type txnDataStoreData struct { |
474 sync.Mutex | 476 sync.Mutex |
475 | 477 |
476 parent *dataStoreData | 478 parent *dataStoreData |
477 | 479 |
478 » // boolean 0 or 1, use atomic.*Int32 to access. | 480 » txn *transactionImpl |
479 » closed int32 | |
480 » isXG bool | |
481 | 481 |
482 snap memStore | 482 snap memStore |
483 | 483 |
484 // string is the raw-bytes encoding of the entity root incl. namespace | 484 // string is the raw-bytes encoding of the entity root incl. namespace |
485 muts map[string][]txnMutation | 485 muts map[string][]txnMutation |
486 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing | 486 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ
ing |
487 // length of encoded keys + values. | 487 // length of encoded keys + values. |
488 } | 488 } |
489 | 489 |
490 var _ memContextObj = (*txnDataStoreData)(nil) | 490 var _ memContextObj = (*txnDataStoreData)(nil) |
491 | 491 |
492 const xgEGLimit = 25 | 492 const xgEGLimit = 25 |
493 | 493 |
494 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 494 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
| 495 |
495 func (td *txnDataStoreData) endTxn() { | 496 func (td *txnDataStoreData) endTxn() { |
496 » if atomic.LoadInt32(&td.closed) == 1 { | 497 » if err := td.txn.close(); err != nil { |
497 » » panic("cannot end transaction twice") | 498 » » panic(err) |
498 } | 499 } |
499 atomic.StoreInt32(&td.closed, 1) | |
500 } | 500 } |
501 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 501 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
502 impossible(fmt.Errorf("cannot create a recursive transaction")) | 502 impossible(fmt.Errorf("cannot create a recursive transaction")) |
503 } | 503 } |
504 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { | 504 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
505 impossible(fmt.Errorf("cannot create a recursive transaction")) | 505 impossible(fmt.Errorf("cannot create a recursive transaction")) |
506 return nil | 506 return nil |
507 } | 507 } |
508 | 508 |
509 func (td *txnDataStoreData) run(f func() error) error { | 509 func (td *txnDataStoreData) run(f func() error) error { |
510 // Slightly different from the SDK... datastore and taskqueue each imple
ment | 510 // Slightly different from the SDK... datastore and taskqueue each imple
ment |
511 // this here, where in the SDK only datastore.transaction.Call does. | 511 // this here, where in the SDK only datastore.transaction.Call does. |
512 » if atomic.LoadInt32(&td.closed) == 1 { | 512 » if err := td.txn.valid(); err != nil { |
513 » » return errors.New("datastore: transaction context has expired") | 513 » » return err |
514 } | 514 } |
515 return f() | 515 return f() |
516 } | 516 } |
517 | 517 |
518 // writeMutation ensures that this transaction can support the given key/value | 518 // writeMutation ensures that this transaction can support the given key/value |
519 // mutation. | 519 // mutation. |
520 // | 520 // |
521 // if getOnly is true, don't record the actual mutation data, just ensure that | 521 // if getOnly is true, don't record the actual mutation data, just ensure that |
522 // the key is in an included entity group (or add an empty entry for tha
t | 522 // the key is in an included entity group (or add an empty entry for tha
t |
523 // group). | 523 // group). |
524 // | 524 // |
525 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 525 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
526 // | 526 // |
527 // Returns an error if this key causes the transaction to cross too many entity | 527 // Returns an error if this key causes the transaction to cross too many entity |
528 // groups. | 528 // groups. |
529 func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro
pertyMap) error { | 529 func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro
pertyMap) error { |
530 rk := string(keyBytes(key.Root())) | 530 rk := string(keyBytes(key.Root())) |
531 | 531 |
532 td.Lock() | 532 td.Lock() |
533 defer td.Unlock() | 533 defer td.Unlock() |
534 | 534 |
535 if _, ok := td.muts[rk]; !ok { | 535 if _, ok := td.muts[rk]; !ok { |
536 limit := 1 | 536 limit := 1 |
537 » » if td.isXG { | 537 » » if td.txn.isXG { |
538 limit = xgEGLimit | 538 limit = xgEGLimit |
539 } | 539 } |
540 if len(td.muts)+1 > limit { | 540 if len(td.muts)+1 > limit { |
541 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" | 541 msg := "cross-group transaction need to be explicitly sp
ecified (xg=True)" |
542 » » » if td.isXG { | 542 » » » if td.txn.isXG { |
543 msg = "operating on too many entity groups in a
single transaction" | 543 msg = "operating on too many entity groups in a
single transaction" |
544 } | 544 } |
545 return errors.New(msg) | 545 return errors.New(msg) |
546 } | 546 } |
547 td.muts[rk] = []txnMutation{} | 547 td.muts[rk] = []txnMutation{} |
548 } | 548 } |
549 if !getOnly { | 549 if !getOnly { |
550 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 550 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
551 } | 551 } |
552 | 552 |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
596 } | 596 } |
597 return nil | 597 return nil |
598 } | 598 } |
599 | 599 |
600 func keyBytes(key *ds.Key) []byte { | 600 func keyBytes(key *ds.Key) []byte { |
601 return serialize.ToBytes(ds.MkProperty(key)) | 601 return serialize.ToBytes(ds.MkProperty(key)) |
602 } | 602 } |
603 | 603 |
604 func rpm(data []byte) (ds.PropertyMap, error) { | 604 func rpm(data []byte) (ds.PropertyMap, error) { |
605 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 605 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
606 » » serialize.WithContext, "", "") | 606 » » serialize.WithContext, ds.KeyContext{}) |
607 } | 607 } |
608 | 608 |
609 func namespaces(store memStore) []string { | 609 func namespaces(store memStore) []string { |
610 var namespaces []string | 610 var namespaces []string |
611 for _, c := range store.GetCollectionNames() { | 611 for _, c := range store.GetCollectionNames() { |
612 ns, has := trimPrefix(c, "ents:") | 612 ns, has := trimPrefix(c, "ents:") |
613 if !has { | 613 if !has { |
614 if len(namespaces) > 0 { | 614 if len(namespaces) > 0 { |
615 break | 615 break |
616 } | 616 } |
617 continue | 617 continue |
618 } | 618 } |
619 namespaces = append(namespaces, ns) | 619 namespaces = append(namespaces, ns) |
620 } | 620 } |
621 return namespaces | 621 return namespaces |
622 } | 622 } |
623 | 623 |
624 func trimPrefix(v, p string) (string, bool) { | 624 func trimPrefix(v, p string) (string, bool) { |
625 if strings.HasPrefix(v, p) { | 625 if strings.HasPrefix(v, p) { |
626 return v[len(p):], true | 626 return v[len(p):], true |
627 } | 627 } |
628 return v, false | 628 return v, false |
629 } | 629 } |
OLD | NEW |