Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package memory | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "infra/gae/libs/wrapper" | |
| 10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon" | |
| 11 "math/rand" | |
| 12 "sync" | |
| 13 "sync/atomic" | |
| 14 | |
| 15 "github.com/mjibson/goon" | |
| 16 | |
| 17 "appengine/datastore" | |
| 18 pb "appengine_internal/datastore" | |
| 19 ) | |
| 20 | |
| 21 ////////////////////////////////// knrKeeper /////////////////////////////////// | |
| 22 | |
| 23 type knrKeeper struct { | |
| 24 knrLock sync.Mutex | |
| 25 knrFunc goon.KindNameResolver | |
| 26 } | |
| 27 | |
| 28 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { | |
| 29 k.knrLock.Lock() | |
| 30 defer k.knrLock.Unlock() | |
| 31 ret := k.knrFunc | |
| 32 if ret == nil { | |
| 33 ret = goon.DefaultKindName | |
| 34 k.knrFunc = ret | |
| 35 } | |
| 36 return ret | |
| 37 } | |
| 38 | |
| 39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { | |
| 40 k.knrLock.Lock() | |
| 41 defer k.knrLock.Unlock() | |
| 42 if knr == nil { | |
| 43 knr = goon.DefaultKindName | |
| 44 } | |
| 45 k.knrFunc = knr | |
| 46 } | |
| 47 | |
| 48 //////////////////////////////// dataStoreData ///////////////////////////////// | |
| 49 | |
| 50 type dataStoreData struct { | |
| 51 wrapper.BrokenFeatures | |
| 52 knrKeeper | |
| 53 | |
| 54 rwlock sync.RWMutex | |
| 55 store *memStore | |
| 56 snap *memStore | |
| 57 | |
| 58 /* collections: | |
|
M-A Ruel
2015/05/28 22:42:38
FTR, I prefer to use // all the time so if I need
iannucci
2015/05/28 23:00:34
removed in favor of a comment pointing to the READ
| |
| 59 * ents:ns -> key -> value | |
| 60 * (rootkind, rootid, __entity_group__,1 ) -> {__version__: int} | |
| 61 * (rootkind, rootid, __entity_group_ids __,1) -> {__version__: int} | |
| 62 * (__entity_group_ids__,1) -> {__versio n__: int} | |
| 63 * idx -> kind,A?,[-?prop]* | |
| 64 * idx:ns:kind -> key = nil | |
| 65 * idx:ns:kind|prop -> propval|key = [prev val] | |
| 66 * idx:ns:kind|-prop -> -propval|key = [next val] | |
| 67 * idx:ns:kind|A|?prop|?prop -> A|propval|propval|key = [prev/next va l]|[prev/next val] | |
| 68 * idx:ns:kind|?prop|?prop -> propval|propval|key = [prev/next val] |[prev/next val] | |
| 69 */ | |
| 70 } | |
| 71 | |
| 72 ////////////////////////////// New(dataStoreData) ////////////////////////////// | |
| 73 | |
| 74 func newDataStoreData() *dataStoreData { | |
| 75 store := newMemStore() | |
| 76 return &dataStoreData{ | |
| 77 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)}, | |
| 78 store: store, | |
| 79 snap: store.Snapshot(), // empty but better than a nil pointer. | |
| 80 } | |
| 81 } | |
| 82 | |
| 83 ////////////////////////////// Locker(Datastore) /////////////////////////////// | |
| 84 | |
| 85 func (d *dataStoreData) Lock() { | |
| 86 d.rwlock.Lock() | |
| 87 } | |
| 88 | |
| 89 func (d *dataStoreData) Unlock() { | |
| 90 d.rwlock.Unlock() | |
| 91 } | |
| 92 | |
| 93 func groupMetaKey(key *datastore.Key) []byte { | |
| 94 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) )) | |
| 95 } | |
| 96 | |
| 97 func groupIDsKey(key *datastore.Key) []byte { | |
| 98 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key))) | |
| 99 } | |
| 100 | |
| 101 func rootIDsKey(kind string) []byte { | |
| 102 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) | |
| 103 } | |
| 104 | |
| 105 func curVersion(ents *memCollection, key []byte) (int64, error) { | |
| 106 var err error | |
| 107 v := ents.Get(key) | |
| 108 num := int64(0) | |
| 109 numData := &propertyList{} | |
| 110 if v != nil { | |
| 111 err = numData.UnmarshalBinary(v) | |
| 112 num = (*numData)[0].Value.(int64) | |
| 113 } | |
| 114 return num, err | |
| 115 } | |
| 116 | |
| 117 func incrementLocked(ents *memCollection, key []byte) (int64, error) { | |
| 118 v := ents.Get(key) | |
| 119 | |
| 120 num := int64(0) | |
| 121 numData := &propertyList{} | |
| 122 if v == nil { | |
| 123 num++ | |
| 124 *numData = append(*numData, datastore.Property{Name: "__version_ _"}) | |
| 125 } else { | |
| 126 err := numData.UnmarshalBinary(v) | |
| 127 if err != nil { | |
| 128 return 0, err | |
| 129 } | |
| 130 num = (*numData)[0].Value.(int64) | |
| 131 num++ | |
|
M-A Ruel
2015/05/28 22:42:38
Remove both num++, insert before line 133:
num++
iannucci
2015/05/28 23:00:34
done
| |
| 132 } | |
| 133 (*numData)[0].Value = num | |
| 134 incData, err := numData.MarshalBinary() | |
| 135 if err != nil { | |
| 136 return 0, err | |
| 137 } | |
| 138 ents.Set(key, incData) | |
| 139 | |
| 140 return num, nil | |
| 141 } | |
| 142 | |
| 143 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) { | |
| 144 coll := "ents:" + key.Namespace() | |
| 145 ents := d.store.GetCollection(coll) | |
| 146 if ents == nil { | |
| 147 ents = d.store.SetCollection(coll, nil) | |
| 148 } | |
| 149 | |
| 150 if key.Incomplete() { | |
| 151 var idKey []byte | |
| 152 if key.Parent() == nil { | |
| 153 idKey = rootIDsKey(key.Kind()) | |
| 154 } else { | |
| 155 idKey = groupIDsKey(key) | |
| 156 } | |
| 157 | |
| 158 id, err := incrementLocked(ents, idKey) | |
| 159 if err != nil { | |
| 160 return nil, nil, err | |
| 161 } | |
| 162 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) | |
| 163 } | |
| 164 | |
| 165 return ents, key, nil | |
| 166 } | |
| 167 | |
| 168 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) { | |
| 169 key := newKeyObj(ns, knr, src) | |
| 170 if !KeyCouldBeValid(ns, key, UserKeyOnly) { | |
| 171 // TODO(riannucci): different error for Put-ing to reserved Keys ? | |
| 172 return nil, nil, datastore.ErrInvalidKey | |
| 173 } | |
| 174 | |
| 175 data, err := toPL(src) | |
| 176 return key, data, err | |
| 177 } | |
| 178 | |
| 179 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { | |
| 180 key, plData, err := putPrelim(ns, d.KindNameResolver(), src) | |
| 181 if err != nil { | |
| 182 return nil, err | |
| 183 } | |
| 184 key, err = d.putInner(key, plData) | |
| 185 if err != nil { | |
| 186 return nil, err | |
| 187 } | |
| 188 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver()) | |
| 189 } | |
| 190 | |
| 191 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) { | |
| 192 dataBytes, err := data.MarshalBinary() | |
| 193 if err != nil { | |
| 194 return nil, err | |
| 195 } | |
| 196 | |
| 197 d.rwlock.Lock() | |
| 198 defer d.Unlock() | |
|
M-A Ruel
2015/05/28 22:42:38
Stay symmetrical between the calls.
iannucci
2015/05/28 23:00:34
oops good catch.
| |
| 199 | |
| 200 ents, key, err := d.entsKeyLocked(key) | |
| 201 if err != nil { | |
| 202 return nil, err | |
| 203 } | |
| 204 | |
| 205 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { | |
| 206 return nil, err | |
| 207 } | |
| 208 | |
| 209 ents.Set(keyBytes(noNS, key), dataBytes) | |
| 210 | |
| 211 return key, nil | |
| 212 } | |
| 213 | |
| 214 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error { | |
| 215 key := newKeyObj(ns, knr, dst) | |
| 216 if !KeyValid(ns, key, AllowSpecialKeys) { | |
| 217 return datastore.ErrInvalidKey | |
| 218 } | |
| 219 | |
| 220 ents, err := getColl(key) | |
| 221 if err != nil { | |
| 222 return err | |
| 223 } | |
| 224 if ents == nil { | |
| 225 return datastore.ErrNoSuchEntity | |
| 226 } | |
| 227 pdata := ents.Get(keyBytes(noNS, key)) | |
| 228 if pdata == nil { | |
| 229 return datastore.ErrNoSuchEntity | |
| 230 } | |
| 231 pl := &propertyList{} | |
| 232 err = pl.UnmarshalBinary(pdata) | |
|
M-A Ruel
2015/05/28 22:42:38
merge lines 232 et 233.
iannucci
2015/05/28 23:00:34
done
| |
| 233 if err != nil { | |
| 234 return err | |
| 235 } | |
| 236 return fromPL(pl, dst) | |
| 237 } | |
| 238 | |
| 239 func (d *dataStoreData) get(ns string, dst interface{}) error { | |
| 240 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) { | |
| 241 d.rwlock.RLock() | |
|
M-A Ruel
2015/05/28 22:42:38
There will be enough concurrent calls to warrant a
iannucci
2015/05/28 23:00:35
I think there will be. Writes are heavy (esp when
| |
| 242 s := d.store.Snapshot() | |
| 243 d.rwlock.RUnlock() | |
| 244 | |
| 245 return s.GetCollection("ents:" + ns), nil | |
| 246 }) | |
| 247 } | |
| 248 | |
| 249 func (d *dataStoreData) del(ns string, key *datastore.Key) error { | |
| 250 if !KeyValid(ns, key, UserKeyOnly) { | |
| 251 return datastore.ErrInvalidKey | |
| 252 } | |
| 253 | |
| 254 keyBuf := keyBytes(noNS, key) | |
| 255 | |
| 256 d.rwlock.Lock() | |
| 257 defer d.Unlock() | |
| 258 | |
| 259 ents := d.store.GetCollection("ents:" + ns) | |
| 260 if ents == nil { | |
| 261 return nil | |
| 262 } | |
| 263 | |
| 264 _, err := incrementLocked(ents, groupMetaKey(key)) | |
| 265 if err != nil { | |
| 266 return err | |
| 267 } | |
| 268 | |
| 269 ents.Delete(keyBuf) | |
| 270 return nil | |
| 271 } | |
| 272 | |
| 273 ///////////////////////// memContextObj(dataStoreData) ///////////////////////// | |
| 274 | |
| 275 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | |
| 276 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | |
| 277 | |
| 278 txn := obj.(*txnDataStoreData) | |
| 279 for rk, muts := range txn.muts { | |
| 280 if len(muts) == 0 { // read-only | |
| 281 continue | |
| 282 } | |
| 283 k, err := keyFromByteString(withNS, rk) | |
| 284 if err != nil { | |
| 285 panic(err) | |
| 286 } | |
| 287 entKey := "ents:" + k.Namespace() | |
| 288 mkey := groupMetaKey(k) | |
| 289 entsHead := d.store.GetCollection(entKey) | |
| 290 entsSnap := txn.snap.GetCollection(entKey) | |
| 291 vHead, err := curVersion(entsHead, mkey) | |
| 292 if err != nil { | |
| 293 panic(err) | |
| 294 } | |
| 295 vSnap, err := curVersion(entsSnap, mkey) | |
| 296 if err != nil { | |
| 297 panic(err) | |
| 298 } | |
| 299 if vHead != vSnap { | |
| 300 return false | |
| 301 } | |
| 302 } | |
| 303 return true | |
| 304 } | |
| 305 | |
| 306 func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) { | |
| 307 txn := obj.(*txnDataStoreData) | |
| 308 for _, muts := range txn.muts { | |
| 309 if len(muts) == 0 { // read-only | |
| 310 continue | |
| 311 } | |
| 312 for _, m := range muts { | |
| 313 var err error | |
| 314 if m.data == nil { | |
| 315 err = d.del(m.key.Namespace(), m.key) | |
| 316 } else { | |
| 317 _, err = d.putInner(m.key, m.data) | |
| 318 } | |
| 319 if err != nil { | |
| 320 panic(err) | |
| 321 } | |
| 322 } | |
| 323 } | |
| 324 } | |
| 325 | |
| 326 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) { | |
| 327 return &txnDataStoreData{ | |
| 328 // alias to the main datastore's so that testing code can have p rimitive | |
| 329 // access to break features inside of transactions. | |
| 330 BrokenFeatures: &d.BrokenFeatures, | |
| 331 parent: d, | |
| 332 knrKeeper: knrKeeper{knrFunc: d.knrFunc}, | |
| 333 isXG: o != nil && o.XG, | |
| 334 snap: d.store.Snapshot(), | |
| 335 muts: map[string][]txnMutation{}, | |
| 336 }, nil | |
| 337 } | |
| 338 | |
| 339 func (d *dataStoreData) endTxn() {} | |
| 340 | |
| 341 /////////////////////////////// txnDataStoreData /////////////////////////////// | |
| 342 type txnMutation struct { | |
| 343 key *datastore.Key | |
| 344 data *propertyList | |
| 345 } | |
| 346 | |
| 347 type txnDataStoreData struct { | |
| 348 *wrapper.BrokenFeatures | |
| 349 knrKeeper | |
| 350 sync.Mutex | |
| 351 | |
| 352 parent *dataStoreData | |
| 353 | |
| 354 // boolean 0 or 1, use atomic.*Int32 to access. | |
| 355 closed int32 | |
| 356 isXG bool | |
| 357 | |
| 358 snap *memStore | |
| 359 | |
| 360 // string is the raw-bytes encoding of the entity root incl. namespace | |
| 361 muts map[string][]txnMutation | |
| 362 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing | |
| 363 // length of encoded keys + values. | |
| 364 } | |
| 365 | |
| 366 const xgEGLimit = 25 | |
| 367 | |
| 368 /////////////////////// memContextObj(txnDataStoreData) //////////////////////// | |
| 369 | |
| 370 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | |
| 371 func (td *txnDataStoreData) endTxn() { | |
| 372 if atomic.LoadInt32(&td.closed) == 1 { | |
| 373 panic("cannot end transaction twice") | |
| 374 } | |
| 375 atomic.StoreInt32(&td.closed, 1) | |
| 376 } | |
| 377 func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) { | |
| 378 panic("txnDataStoreData cannot apply transactions") | |
| 379 } | |
| 380 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) { | |
| 381 return nil, errors.New("datastore: nested transactions are not supported ") | |
| 382 } | |
| 383 | |
| 384 /////////////////// wrapper.BrokenFeatures(txnDataStoreData) /////////////////// | |
| 385 | |
| 386 func (td *txnDataStoreData) IsBroken() error { | |
| 387 // Slightly different from the SDK... datastore and taskqueue each imple ment | |
| 388 // this here, where in the SDK only datastore.transaction.Call does. | |
| 389 if atomic.LoadInt32(&td.closed) == 1 { | |
| 390 return errors.New("datastore: transaction context has expired") | |
| 391 } | |
| 392 return td.BrokenFeatures.IsBroken() | |
| 393 } | |
| 394 | |
| 395 // writeMutation ensures that this transaction can support the given key/value | |
| 396 // mutation. | |
| 397 // | |
| 398 // if getOnly is true, don't record the actual mutation data, just ensure that | |
| 399 // the key is in an included entity group (or add an empty entry for tha t | |
| 400 // group). | |
| 401 // | |
| 402 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | |
| 403 // | |
| 404 // Returns an error if this key causes the transaction to cross too many entity | |
| 405 // groups. | |
| 406 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error { | |
| 407 rk := string(keyBytes(withNS, rootKey(key))) | |
| 408 | |
| 409 td.Lock() | |
| 410 defer td.Unlock() | |
| 411 | |
| 412 if _, ok := td.muts[rk]; !ok { | |
| 413 limit := 1 | |
| 414 if td.isXG { | |
| 415 limit = xgEGLimit | |
| 416 } | |
| 417 if len(td.muts)+1 > limit { | |
| 418 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" | |
| 419 if td.isXG { | |
| 420 msg = "operating on too many entity groups in a single transaction" | |
| 421 } | |
| 422 return newDSError(pb.Error_BAD_REQUEST, msg) | |
| 423 } | |
| 424 td.muts[rk] = []txnMutation{} | |
| 425 } | |
| 426 if !getOnly { | |
| 427 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | |
| 428 } | |
| 429 | |
| 430 return nil | |
| 431 } | |
| 432 | |
| 433 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) { | |
| 434 key, plData, err := putPrelim(ns, td.KindNameResolver(), src) | |
| 435 if err != nil { | |
| 436 return nil, err | |
| 437 } | |
| 438 | |
| 439 func() { | |
| 440 td.parent.Lock() | |
| 441 defer td.parent.Unlock() | |
| 442 _, key, err = td.parent.entsKeyLocked(key) | |
| 443 }() | |
| 444 if err != nil { | |
| 445 return nil, err | |
| 446 } | |
| 447 | |
| 448 if err = td.writeMutation(false, key, plData); err != nil { | |
| 449 return nil, err | |
| 450 } | |
| 451 | |
| 452 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver()) | |
| 453 } | |
| 454 | |
| 455 func (td *txnDataStoreData) get(ns string, dst interface{}) error { | |
| 456 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) { | |
| 457 if err := td.writeMutation(true, key, nil); err != nil { | |
| 458 return nil, err | |
| 459 } | |
| 460 return td.snap.GetCollection("ents:" + ns), nil | |
| 461 }) | |
| 462 } | |
| 463 | |
| 464 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { | |
| 465 if !KeyValid(ns, key, UserKeyOnly) { | |
| 466 return datastore.ErrInvalidKey | |
| 467 } | |
| 468 return td.writeMutation(false, key, nil) | |
| 469 } | |
| OLD | NEW |