| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| 11 "sync/atomic" | 11 "sync/atomic" |
| 12 | 12 |
| 13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/datastore/dskey" |
| 15 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 15 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
| 16 ) | 18 ) |
| 17 | 19 |
| 18 //////////////////////////////// dataStoreData ///////////////////////////////// | 20 //////////////////////////////// dataStoreData ///////////////////////////////// |
| 19 | 21 |
| 20 type dataStoreData struct { | 22 type dataStoreData struct { |
| 21 rwlock sync.RWMutex | 23 rwlock sync.RWMutex |
| 22 // See README.md for store schema. | 24 // See README.md for store schema. |
| 23 store *memStore | 25 store *memStore |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 71 | 73 |
| 72 func (d *dataStoreData) catchupIndexes() { | 74 func (d *dataStoreData) catchupIndexes() { |
| 73 d.rwlock.Lock() | 75 d.rwlock.Lock() |
| 74 defer d.rwlock.Unlock() | 76 defer d.rwlock.Unlock() |
| 75 d.snap = d.store.Snapshot() | 77 d.snap = d.store.Snapshot() |
| 76 } | 78 } |
| 77 | 79 |
| 78 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 80 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
| 79 | 81 |
| 80 func groupMetaKey(key ds.Key) []byte { | 82 func groupMetaKey(key ds.Key) []byte { |
| 81 » return keyBytes(ds.WithoutContext, | 83 » return keyBytes(serialize.WithoutContext, |
| 82 » » ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key))) | 84 » » dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key))) |
| 83 } | 85 } |
| 84 | 86 |
| 85 func groupIDsKey(key ds.Key) []byte { | 87 func groupIDsKey(key ds.Key) []byte { |
| 86 » return keyBytes(ds.WithoutContext, | 88 » return keyBytes(serialize.WithoutContext, |
| 87 » » ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key)
)) | 89 » » dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key)
)) |
| 88 } | 90 } |
| 89 | 91 |
| 90 func rootIDsKey(kind string) []byte { | 92 func rootIDsKey(kind string) []byte { |
| 91 » return keyBytes(ds.WithoutContext, | 93 » return keyBytes(serialize.WithoutContext, |
| 92 » » ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | 94 » » dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
| 93 } | 95 } |
| 94 | 96 |
| 95 func curVersion(ents *memCollection, key []byte) int64 { | 97 func curVersion(ents *memCollection, key []byte) int64 { |
| 96 if ents != nil { | 98 if ents != nil { |
| 97 if v := ents.Get(key); v != nil { | 99 if v := ents.Get(key); v != nil { |
| 98 pm, err := rpm(v) | 100 pm, err := rpm(v) |
| 99 if err != nil { | 101 if err != nil { |
| 100 panic(err) // memory corruption | 102 panic(err) // memory corruption |
| 101 } | 103 } |
| 102 pl, ok := pm["__version__"] | 104 pl, ok := pm["__version__"] |
| 103 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
| 104 return pl[0].Value().(int64) | 106 return pl[0].Value().(int64) |
| 105 } | 107 } |
| 106 panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) | 108 panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) |
| 107 } | 109 } |
| 108 } | 110 } |
| 109 return 0 | 111 return 0 |
| 110 } | 112 } |
| 111 | 113 |
| 112 func incrementLocked(ents *memCollection, key []byte) int64 { | 114 func incrementLocked(ents *memCollection, key []byte) int64 { |
| 113 ret := curVersion(ents, key) + 1 | 115 ret := curVersion(ents, key) + 1 |
| 114 buf := &bytes.Buffer{} | 116 buf := &bytes.Buffer{} |
| 115 » ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write( | 117 » serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ |
| 116 » » buf, ds.WithContext) | 118 » » "__version__": {ds.MkPropertyNI(ret)}}) |
| 117 ents.Set(key, buf.Bytes()) | 119 ents.Set(key, buf.Bytes()) |
| 118 return ret | 120 return ret |
| 119 } | 121 } |
| 120 | 122 |
| 121 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { | 123 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
| 122 coll := "ents:" + key.Namespace() | 124 coll := "ents:" + key.Namespace() |
| 123 ents := d.store.GetCollection(coll) | 125 ents := d.store.GetCollection(coll) |
| 124 if ents == nil { | 126 if ents == nil { |
| 125 ents = d.store.SetCollection(coll, nil) | 127 ents = d.store.SetCollection(coll, nil) |
| 126 } | 128 } |
| 127 | 129 |
| 128 » if ds.KeyIncomplete(key) { | 130 » if dskey.Incomplete(key) { |
| 129 idKey := []byte(nil) | 131 idKey := []byte(nil) |
| 130 if key.Parent() == nil { | 132 if key.Parent() == nil { |
| 131 idKey = rootIDsKey(key.Kind()) | 133 idKey = rootIDsKey(key.Kind()) |
| 132 } else { | 134 } else { |
| 133 idKey = groupIDsKey(key) | 135 idKey = groupIDsKey(key) |
| 134 } | 136 } |
| 135 id := incrementLocked(ents, idKey) | 137 id := incrementLocked(ents, idKey) |
| 136 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) | 138 » » key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) |
| 137 } | 139 } |
| 138 | 140 |
| 139 return ents, key | 141 return ents, key |
| 140 } | 142 } |
| 141 | 143 |
| 142 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { | 144 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { |
| 143 for i, k := range keys { | 145 for i, k := range keys { |
| 144 buf := &bytes.Buffer{} | 146 buf := &bytes.Buffer{} |
| 145 pmap, _ := vals[i].Save(false) | 147 pmap, _ := vals[i].Save(false) |
| 146 » » pmap.Write(buf, ds.WithoutContext) | 148 » » serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap) |
| 147 dataBytes := buf.Bytes() | 149 dataBytes := buf.Bytes() |
| 148 | 150 |
| 149 k, err := func() (ret ds.Key, err error) { | 151 k, err := func() (ret ds.Key, err error) { |
| 150 d.rwlock.Lock() | 152 d.rwlock.Lock() |
| 151 defer d.rwlock.Unlock() | 153 defer d.rwlock.Unlock() |
| 152 | 154 |
| 153 ents, ret := d.entsKeyLocked(k) | 155 ents, ret := d.entsKeyLocked(k) |
| 154 incrementLocked(ents, groupMetaKey(ret)) | 156 incrementLocked(ents, groupMetaKey(ret)) |
| 155 | 157 |
| 156 » » » old := ents.Get(keyBytes(ds.WithoutContext, ret)) | 158 » » » old := ents.Get(keyBytes(serialize.WithoutContext, ret)) |
| 157 oldPM := ds.PropertyMap(nil) | 159 oldPM := ds.PropertyMap(nil) |
| 158 if old != nil { | 160 if old != nil { |
| 159 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | 161 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { |
| 160 return | 162 return |
| 161 } | 163 } |
| 162 } | 164 } |
| 163 updateIndicies(d.store, ret, oldPM, pmap) | 165 updateIndicies(d.store, ret, oldPM, pmap) |
| 164 » » » ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes) | 166 » » » ents.Set(keyBytes(serialize.WithoutContext, ret), dataBy
tes) |
| 165 return | 167 return |
| 166 }() | 168 }() |
| 167 if cb != nil { | 169 if cb != nil { |
| 168 cb(k, err) | 170 cb(k, err) |
| 169 } | 171 } |
| 170 } | 172 } |
| 171 } | 173 } |
| 172 | 174 |
| 173 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { | 175 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { |
| 174 ents, err := getColl() | 176 ents, err := getColl() |
| 175 if err != nil { | 177 if err != nil { |
| 176 return err | 178 return err |
| 177 } | 179 } |
| 178 if ents == nil { | 180 if ents == nil { |
| 179 for range keys { | 181 for range keys { |
| 180 cb(nil, ds.ErrNoSuchEntity) | 182 cb(nil, ds.ErrNoSuchEntity) |
| 181 } | 183 } |
| 182 return nil | 184 return nil |
| 183 } | 185 } |
| 184 | 186 |
| 185 for _, k := range keys { | 187 for _, k := range keys { |
| 186 » » pdata := ents.Get(keyBytes(ds.WithoutContext, k)) | 188 » » pdata := ents.Get(keyBytes(serialize.WithoutContext, k)) |
| 187 if pdata == nil { | 189 if pdata == nil { |
| 188 cb(nil, ds.ErrNoSuchEntity) | 190 cb(nil, ds.ErrNoSuchEntity) |
| 189 continue | 191 continue |
| 190 } | 192 } |
| 191 cb(rpmWoCtx(pdata, k.Namespace())) | 193 cb(rpmWoCtx(pdata, k.Namespace())) |
| 192 } | 194 } |
| 193 return nil | 195 return nil |
| 194 } | 196 } |
| 195 | 197 |
| 196 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | 198 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
| 197 getMultiInner(keys, cb, func() (*memCollection, error) { | 199 getMultiInner(keys, cb, func() (*memCollection, error) { |
| 198 s := d.takeSnapshot() | 200 s := d.takeSnapshot() |
| 199 | 201 |
| 200 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 202 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
| 201 }) | 203 }) |
| 202 return nil | 204 return nil |
| 203 } | 205 } |
| 204 | 206 |
| 205 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { | 207 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
| 206 toDel := make([][]byte, 0, len(keys)) | 208 toDel := make([][]byte, 0, len(keys)) |
| 207 for _, k := range keys { | 209 for _, k := range keys { |
| 208 » » toDel = append(toDel, keyBytes(ds.WithoutContext, k)) | 210 » » toDel = append(toDel, keyBytes(serialize.WithoutContext, k)) |
| 209 } | 211 } |
| 210 ns := keys[0].Namespace() | 212 ns := keys[0].Namespace() |
| 211 | 213 |
| 212 d.rwlock.Lock() | 214 d.rwlock.Lock() |
| 213 defer d.rwlock.Unlock() | 215 defer d.rwlock.Unlock() |
| 214 | 216 |
| 215 ents := d.store.GetCollection("ents:" + ns) | 217 ents := d.store.GetCollection("ents:" + ns) |
| 216 | 218 |
| 217 for i, k := range keys { | 219 for i, k := range keys { |
| 218 if ents != nil { | 220 if ents != nil { |
| (...skipping 18 matching lines...) Expand all Loading... |
| 237 } | 239 } |
| 238 | 240 |
| 239 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| 240 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| 241 | 243 |
| 242 txn := obj.(*txnDataStoreData) | 244 txn := obj.(*txnDataStoreData) |
| 243 for rk, muts := range txn.muts { | 245 for rk, muts := range txn.muts { |
| 244 if len(muts) == 0 { // read-only | 246 if len(muts) == 0 { // read-only |
| 245 continue | 247 continue |
| 246 } | 248 } |
| 247 » » k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext,
"", "") | 249 » » k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize
.WithContext, "", "") |
| 248 if err != nil { | 250 if err != nil { |
| 249 panic(err) | 251 panic(err) |
| 250 } | 252 } |
| 251 | 253 |
| 252 entKey := "ents:" + k.Namespace() | 254 entKey := "ents:" + k.Namespace() |
| 253 mkey := groupMetaKey(k) | 255 mkey := groupMetaKey(k) |
| 254 entsHead := d.store.GetCollection(entKey) | 256 entsHead := d.store.GetCollection(entKey) |
| 255 entsSnap := txn.snap.GetCollection(entKey) | 257 entsSnap := txn.snap.GetCollection(entKey) |
| 256 vHead := curVersion(entsHead, mkey) | 258 vHead := curVersion(entsHead, mkey) |
| 257 vSnap := curVersion(entsSnap, mkey) | 259 vSnap := curVersion(entsSnap, mkey) |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 356 // | 358 // |
| 357 // if getOnly is true, don't record the actual mutation data, just ensure that | 359 // if getOnly is true, don't record the actual mutation data, just ensure that |
| 358 // the key is in an included entity group (or add an empty entry for tha
t | 360 // the key is in an included entity group (or add an empty entry for tha
t |
| 359 // group). | 361 // group). |
| 360 // | 362 // |
| 361 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 363 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
| 362 // | 364 // |
| 363 // Returns an error if this key causes the transaction to cross too many entity | 365 // Returns an error if this key causes the transaction to cross too many entity |
| 364 // groups. | 366 // groups. |
| 365 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { | 367 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { |
| 366 » rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key))) | 368 » rk := string(keyBytes(serialize.WithContext, dskey.Root(key))) |
| 367 | 369 |
| 368 td.Lock() | 370 td.Lock() |
| 369 defer td.Unlock() | 371 defer td.Unlock() |
| 370 | 372 |
| 371 if _, ok := td.muts[rk]; !ok { | 373 if _, ok := td.muts[rk]; !ok { |
| 372 limit := 1 | 374 limit := 1 |
| 373 if td.isXG { | 375 if td.isXG { |
| 374 limit = xgEGLimit | 376 limit = xgEGLimit |
| 375 } | 377 } |
| 376 if len(td.muts)+1 > limit { | 378 if len(td.muts)+1 > limit { |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 419 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | 421 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
| 420 for _, k := range keys { | 422 for _, k := range keys { |
| 421 err := td.writeMutation(false, k, nil) | 423 err := td.writeMutation(false, k, nil) |
| 422 if cb != nil { | 424 if cb != nil { |
| 423 cb(err) | 425 cb(err) |
| 424 } | 426 } |
| 425 } | 427 } |
| 426 return nil | 428 return nil |
| 427 } | 429 } |
| 428 | 430 |
| 429 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte { | 431 func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte { |
| 430 buf := &bytes.Buffer{} | 432 buf := &bytes.Buffer{} |
| 431 » ds.WriteKey(buf, ctx, key) | 433 » serialize.WriteKey(buf, ctx, key) |
| 432 return buf.Bytes() | 434 return buf.Bytes() |
| 433 } | 435 } |
| 434 | 436 |
| 435 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | 437 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
| 436 » ret := ds.PropertyMap{} | 438 » return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 437 » err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n
s) | 439 » » serialize.WithoutContext, globalAppID, ns) |
| 438 » return ret, err | |
| 439 } | 440 } |
| 440 | 441 |
| 441 func rpm(data []byte) (ds.PropertyMap, error) { | 442 func rpm(data []byte) (ds.PropertyMap, error) { |
| 442 » ret := ds.PropertyMap{} | 443 » return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 443 » err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "") | 444 » » serialize.WithContext, "", "") |
| 444 » return ret, err | |
| 445 } | 445 } |
| 446 | 446 |
| 447 type keyitem interface { | 447 type keyitem interface { |
| 448 Key() ds.Key | 448 Key() ds.Key |
| 449 } | 449 } |
| OLD | NEW |