Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sync" | 10 "sync" |
| 11 "sync/atomic" | 11 "sync/atomic" |
| 12 | 12 |
| 13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/datastore/dskey" | 14 "github.com/luci/gae/service/datastore/dskey" |
| 15 "github.com/luci/gae/service/datastore/serialize" | 15 "github.com/luci/gae/service/datastore/serialize" |
| 16 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 17 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
| 18 ) | 18 ) |
| 19 | 19 |
| 20 //////////////////////////////// dataStoreData ///////////////////////////////// | 20 //////////////////////////////// dataStoreData ///////////////////////////////// |
| 21 | 21 |
| 22 type dataStoreData struct { | 22 type dataStoreData struct { |
| 23 rwlock sync.RWMutex | 23 rwlock sync.RWMutex |
| 24 » // See README.md for store schema. | 24 » // See README.md for head schema. |
| 25 » store *memStore | 25 » head *memStore |
| 26 » snap *memStore | 26 » snap *memStore |
| 27 } | 27 } |
| 28 | 28 |
| 29 var ( | 29 var ( |
| 30 _ = memContextObj((*dataStoreData)(nil)) | 30 _ = memContextObj((*dataStoreData)(nil)) |
| 31 _ = sync.Locker((*dataStoreData)(nil)) | 31 _ = sync.Locker((*dataStoreData)(nil)) |
| 32 ) | 32 ) |
| 33 | 33 |
| 34 func newDataStoreData() *dataStoreData { | 34 func newDataStoreData() *dataStoreData { |
| 35 » store := newMemStore() | 35 » head := newMemStore() |
| 36 return &dataStoreData{ | 36 return &dataStoreData{ |
| 37 » » store: store, | 37 » » head: head, |
| 38 » » snap: store.Snapshot(), // empty but better than a nil pointer. | 38 » » snap: head.Snapshot(), // empty but better than a nil pointer. |
| 39 } | 39 } |
| 40 } | 40 } |
| 41 | 41 |
| 42 func (d *dataStoreData) Lock() { | 42 func (d *dataStoreData) Lock() { |
| 43 d.rwlock.Lock() | 43 d.rwlock.Lock() |
| 44 } | 44 } |
| 45 | 45 |
| 46 func (d *dataStoreData) Unlock() { | 46 func (d *dataStoreData) Unlock() { |
| 47 d.rwlock.Unlock() | 47 d.rwlock.Unlock() |
| 48 } | 48 } |
| 49 | 49 |
| 50 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { | 50 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
| 51 d.rwlock.RLock() | 51 d.rwlock.RLock() |
| 52 defer d.rwlock.RUnlock() | 52 defer d.rwlock.RUnlock() |
| 53 » head = d.store.Snapshot() | 53 » head = d.head.Snapshot() |
| 54 if consistent { | 54 if consistent { |
| 55 idx = head | 55 idx = head |
| 56 } else { | 56 } else { |
| 57 idx = d.snap | 57 idx = d.snap |
| 58 } | 58 } |
| 59 return | 59 return |
| 60 } | 60 } |
| 61 | 61 |
| 62 func (d *dataStoreData) takeSnapshot() *memStore { | 62 func (d *dataStoreData) takeSnapshot() *memStore { |
| 63 d.rwlock.RLock() | 63 d.rwlock.RLock() |
| 64 defer d.rwlock.RUnlock() | 64 defer d.rwlock.RUnlock() |
| 65 » return d.store.Snapshot() | 65 » return d.head.Snapshot() |
| 66 } | 66 } |
| 67 | 67 |
| 68 func (d *dataStoreData) setSnapshot(snap *memStore) { | 68 func (d *dataStoreData) setSnapshot(snap *memStore) { |
| 69 d.rwlock.Lock() | 69 d.rwlock.Lock() |
| 70 defer d.rwlock.Unlock() | 70 defer d.rwlock.Unlock() |
| 71 d.snap = snap | 71 d.snap = snap |
| 72 } | 72 } |
| 73 | 73 |
| 74 func (d *dataStoreData) catchupIndexes() { | 74 func (d *dataStoreData) catchupIndexes() { |
| 75 d.rwlock.Lock() | 75 d.rwlock.Lock() |
| 76 defer d.rwlock.Unlock() | 76 defer d.rwlock.Unlock() |
| 77 » d.snap = d.store.Snapshot() | 77 » d.snap = d.head.Snapshot() |
| 78 } | 78 } |
| 79 | 79 |
| 80 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 80 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
| 81 | 81 |
| 82 func groupMetaKey(key ds.Key) []byte { | 82 func groupMetaKey(key ds.Key) []byte { |
| 83 » return keyBytes(serialize.WithoutContext, | 83 » return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root( key))) |
| 84 » » dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key))) | |
| 85 } | 84 } |
| 86 | 85 |
| 87 func groupIDsKey(key ds.Key) []byte { | 86 func groupIDsKey(key ds.Key) []byte { |
| 88 » return keyBytes(serialize.WithoutContext, | 87 » return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.R oot(key))) |
| 89 » » dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key) )) | |
| 90 } | 88 } |
| 91 | 89 |
| 92 func rootIDsKey(kind string) []byte { | 90 func rootIDsKey(kind string) []byte { |
| 93 » return keyBytes(serialize.WithoutContext, | 91 » return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
| 94 » » dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) | |
| 95 } | 92 } |
| 96 | 93 |
| 97 func curVersion(ents *memCollection, key []byte) int64 { | 94 func curVersion(ents *memCollection, key []byte) int64 { |
| 98 if ents != nil { | 95 if ents != nil { |
| 99 if v := ents.Get(key); v != nil { | 96 if v := ents.Get(key); v != nil { |
| 100 pm, err := rpm(v) | 97 pm, err := rpm(v) |
| 101 » » » if err != nil { | 98 » » » memoryCorruption(err) |
| 102 » » » » panic(err) // memory corruption | 99 |
| 103 » » » } | |
| 104 pl, ok := pm["__version__"] | 100 pl, ok := pm["__version__"] |
| 105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 101 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
| 106 return pl[0].Value().(int64) | 102 return pl[0].Value().(int64) |
| 107 } | 103 } |
| 108 » » » panic(fmt.Errorf("__version__ property missing or wrong: %v", pm)) | 104 |
| 105 » » » memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm)) | |
| 109 } | 106 } |
| 110 } | 107 } |
| 111 return 0 | 108 return 0 |
| 112 } | 109 } |
| 113 | 110 |
| 114 func incrementLocked(ents *memCollection, key []byte) int64 { | 111 func incrementLocked(ents *memCollection, key []byte) int64 { |
| 115 ret := curVersion(ents, key) + 1 | 112 ret := curVersion(ents, key) + 1 |
| 116 » buf := &bytes.Buffer{} | 113 » ents.Set(key, serialize.ToBytes(ds.PropertyMap{ |
| 117 » serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ | 114 » » "__version__": {ds.MkPropertyNI(ret)}, |
| 118 » » "__version__": {ds.MkPropertyNI(ret)}}) | 115 » })) |
| 119 » ents.Set(key, buf.Bytes()) | |
| 120 return ret | 116 return ret |
| 121 } | 117 } |
| 122 | 118 |
| 123 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { | 119 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
| 124 coll := "ents:" + key.Namespace() | 120 coll := "ents:" + key.Namespace() |
| 125 » ents := d.store.GetCollection(coll) | 121 » ents := d.head.GetCollection(coll) |
| 126 if ents == nil { | 122 if ents == nil { |
| 127 » » ents = d.store.SetCollection(coll, nil) | 123 » » ents = d.head.SetCollection(coll, nil) |
| 128 } | 124 } |
| 129 | 125 |
| 130 if dskey.Incomplete(key) { | 126 if dskey.Incomplete(key) { |
| 131 idKey := []byte(nil) | 127 idKey := []byte(nil) |
| 132 if key.Parent() == nil { | 128 if key.Parent() == nil { |
| 133 idKey = rootIDsKey(key.Kind()) | 129 idKey = rootIDsKey(key.Kind()) |
| 134 } else { | 130 } else { |
| 135 idKey = groupIDsKey(key) | 131 idKey = groupIDsKey(key) |
| 136 } | 132 } |
| 137 id := incrementLocked(ents, idKey) | 133 id := incrementLocked(ents, idKey) |
| 138 key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) | 134 key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) |
| 139 } | 135 } |
| 140 | 136 |
| 141 return ents, key | 137 return ents, key |
| 142 } | 138 } |
| 143 | 139 |
| 144 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { | 140 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { |
| 145 for i, k := range keys { | 141 for i, k := range keys { |
| 146 buf := &bytes.Buffer{} | |
| 147 pmap, _ := vals[i].Save(false) | 142 pmap, _ := vals[i].Save(false) |
| 148 » » serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap) | 143 » » dataBytes := serialize.ToBytes(pmap) |
| 149 » » dataBytes := buf.Bytes() | |
| 150 | 144 |
| 151 k, err := func() (ret ds.Key, err error) { | 145 k, err := func() (ret ds.Key, err error) { |
| 152 d.rwlock.Lock() | 146 d.rwlock.Lock() |
| 153 defer d.rwlock.Unlock() | 147 defer d.rwlock.Unlock() |
| 154 | 148 |
| 155 ents, ret := d.entsKeyLocked(k) | 149 ents, ret := d.entsKeyLocked(k) |
| 156 incrementLocked(ents, groupMetaKey(ret)) | 150 incrementLocked(ents, groupMetaKey(ret)) |
| 157 | 151 |
| 158 » » » old := ents.Get(keyBytes(serialize.WithoutContext, ret)) | 152 » » » old := ents.Get(keyBytes(ret)) |
| 159 oldPM := ds.PropertyMap(nil) | 153 oldPM := ds.PropertyMap(nil) |
| 160 if old != nil { | 154 if old != nil { |
| 161 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { | 155 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { |
| 162 return | 156 return |
| 163 } | 157 } |
| 164 } | 158 } |
| 165 » » » updateIndicies(d.store, ret, oldPM, pmap) | 159 » » » updateIndicies(d.head, ret, oldPM, pmap) |
| 166 » » » ents.Set(keyBytes(serialize.WithoutContext, ret), dataBy tes) | 160 » » » ents.Set(keyBytes(ret), dataBytes) |
| 167 return | 161 return |
| 168 }() | 162 }() |
| 169 if cb != nil { | 163 if cb != nil { |
| 170 cb(k, err) | 164 cb(k, err) |
| 171 } | 165 } |
| 172 } | 166 } |
| 173 } | 167 } |
| 174 | 168 |
| 175 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { | 169 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { |
| 176 ents, err := getColl() | 170 ents, err := getColl() |
| 177 if err != nil { | 171 if err != nil { |
| 178 return err | 172 return err |
| 179 } | 173 } |
| 180 if ents == nil { | 174 if ents == nil { |
| 181 for range keys { | 175 for range keys { |
| 182 cb(nil, ds.ErrNoSuchEntity) | 176 cb(nil, ds.ErrNoSuchEntity) |
| 183 } | 177 } |
| 184 return nil | 178 return nil |
| 185 } | 179 } |
| 186 | 180 |
| 187 for _, k := range keys { | 181 for _, k := range keys { |
| 188 » » pdata := ents.Get(keyBytes(serialize.WithoutContext, k)) | 182 » » pdata := ents.Get(keyBytes(k)) |
| 189 if pdata == nil { | 183 if pdata == nil { |
| 190 cb(nil, ds.ErrNoSuchEntity) | 184 cb(nil, ds.ErrNoSuchEntity) |
| 191 continue | 185 continue |
| 192 } | 186 } |
| 193 cb(rpmWoCtx(pdata, k.Namespace())) | 187 cb(rpmWoCtx(pdata, k.Namespace())) |
| 194 } | 188 } |
| 195 return nil | 189 return nil |
| 196 } | 190 } |
| 197 | 191 |
| 198 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | 192 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
| 199 getMultiInner(keys, cb, func() (*memCollection, error) { | 193 getMultiInner(keys, cb, func() (*memCollection, error) { |
| 200 s := d.takeSnapshot() | 194 s := d.takeSnapshot() |
| 201 | 195 |
| 202 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 196 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
| 203 }) | 197 }) |
| 204 return nil | 198 return nil |
| 205 } | 199 } |
| 206 | 200 |
| 207 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { | 201 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
| 208 toDel := make([][]byte, 0, len(keys)) | 202 toDel := make([][]byte, 0, len(keys)) |
| 209 for _, k := range keys { | 203 for _, k := range keys { |
| 210 » » toDel = append(toDel, keyBytes(serialize.WithoutContext, k)) | 204 » » toDel = append(toDel, keyBytes(k)) |
| 211 } | 205 } |
| 212 ns := keys[0].Namespace() | 206 ns := keys[0].Namespace() |
| 213 | 207 |
| 214 d.rwlock.Lock() | 208 d.rwlock.Lock() |
| 215 defer d.rwlock.Unlock() | 209 defer d.rwlock.Unlock() |
| 216 | 210 |
| 217 » ents := d.store.GetCollection("ents:" + ns) | 211 » ents := d.head.GetCollection("ents:" + ns) |
| 218 | 212 |
| 219 for i, k := range keys { | 213 for i, k := range keys { |
| 220 if ents != nil { | 214 if ents != nil { |
| 221 incrementLocked(ents, groupMetaKey(k)) | 215 incrementLocked(ents, groupMetaKey(k)) |
| 222 kb := toDel[i] | 216 kb := toDel[i] |
| 223 if old := ents.Get(kb); old != nil { | 217 if old := ents.Get(kb); old != nil { |
| 224 oldPM, err := rpmWoCtx(old, ns) | 218 oldPM, err := rpmWoCtx(old, ns) |
| 225 if err != nil { | 219 if err != nil { |
| 226 if cb != nil { | 220 if cb != nil { |
| 227 cb(err) | 221 cb(err) |
| 228 } | 222 } |
| 229 continue | 223 continue |
| 230 } | 224 } |
| 231 » » » » updateIndicies(d.store, k, oldPM, nil) | 225 » » » » updateIndicies(d.head, k, oldPM, nil) |
| 232 ents.Delete(kb) | 226 ents.Delete(kb) |
| 233 } | 227 } |
| 234 } | 228 } |
| 235 if cb != nil { | 229 if cb != nil { |
| 236 cb(nil) | 230 cb(nil) |
| 237 } | 231 } |
| 238 } | 232 } |
| 239 } | 233 } |
| 240 | 234 |
| 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 235 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 236 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| 243 | 237 |
| 244 txn := obj.(*txnDataStoreData) | 238 txn := obj.(*txnDataStoreData) |
| 245 for rk, muts := range txn.muts { | 239 for rk, muts := range txn.muts { |
| 246 if len(muts) == 0 { // read-only | 240 if len(muts) == 0 { // read-only |
| 247 continue | 241 continue |
| 248 } | 242 } |
| 249 » » k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize .WithContext, "", "") | 243 » » prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "") |
| 250 » » if err != nil { | 244 » » memoryCorruption(err) |
| 251 » » » panic(err) | 245 |
| 252 » » } | 246 » » k := prop.Value().(ds.Key) |
| 253 | 247 |
| 254 entKey := "ents:" + k.Namespace() | 248 entKey := "ents:" + k.Namespace() |
| 255 mkey := groupMetaKey(k) | 249 mkey := groupMetaKey(k) |
| 256 » » entsHead := d.store.GetCollection(entKey) | 250 » » entsHead := d.head.GetCollection(entKey) |
| 257 entsSnap := txn.snap.GetCollection(entKey) | 251 entsSnap := txn.snap.GetCollection(entKey) |
| 258 vHead := curVersion(entsHead, mkey) | 252 vHead := curVersion(entsHead, mkey) |
| 259 vSnap := curVersion(entsSnap, mkey) | 253 vSnap := curVersion(entsSnap, mkey) |
| 260 if vHead != vSnap { | 254 if vHead != vSnap { |
| 261 return false | 255 return false |
| 262 } | 256 } |
| 263 } | 257 } |
| 264 return true | 258 return true |
| 265 } | 259 } |
| 266 | 260 |
| 267 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | 261 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
| 268 txn := obj.(*txnDataStoreData) | 262 txn := obj.(*txnDataStoreData) |
| 269 for _, muts := range txn.muts { | 263 for _, muts := range txn.muts { |
| 270 if len(muts) == 0 { // read-only | 264 if len(muts) == 0 { // read-only |
| 271 continue | 265 continue |
| 272 } | 266 } |
| 273 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti | 267 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti |
| 274 for _, m := range muts { | 268 for _, m := range muts { |
| 275 err := error(nil) | 269 err := error(nil) |
| 276 k := m.key | 270 k := m.key |
| 277 if m.data == nil { | 271 if m.data == nil { |
| 278 d.delMulti([]ds.Key{k}, | 272 d.delMulti([]ds.Key{k}, |
| 279 func(e error) { err = e }) | 273 func(e error) { err = e }) |
| 280 } else { | 274 } else { |
| 281 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, | 275 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, |
| 282 func(_ ds.Key, e error) { err = e }) | 276 func(_ ds.Key, e error) { err = e }) |
| 283 } | 277 } |
| 284 » » » err = errors.SingleError(err) | 278 » » » impossible(err) |
| 285 » » » if err != nil { | |
| 286 » » » » panic(err) | |
| 287 » » » } | |
| 288 } | 279 } |
| 289 } | 280 } |
| 290 } | 281 } |
| 291 | 282 |
| 292 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | 283 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
| 293 return &txnDataStoreData{ | 284 return &txnDataStoreData{ |
| 294 // alias to the main datastore's so that testing code can have p rimitive | 285 // alias to the main datastore's so that testing code can have p rimitive |
| 295 // access to break features inside of transactions. | 286 // access to break features inside of transactions. |
| 296 parent: d, | 287 parent: d, |
| 297 isXG: o != nil && o.XG, | 288 isXG: o != nil && o.XG, |
| 298 » » snap: d.store.Snapshot(), | 289 » » snap: d.head.Snapshot(), |
| 299 muts: map[string][]txnMutation{}, | 290 muts: map[string][]txnMutation{}, |
| 300 } | 291 } |
| 301 } | 292 } |
| 302 | 293 |
| 303 func (d *dataStoreData) endTxn() {} | 294 func (d *dataStoreData) endTxn() {} |
| 304 | 295 |
| 305 /////////////////////////////// txnDataStoreData /////////////////////////////// | 296 /////////////////////////////// txnDataStoreData /////////////////////////////// |
| 306 | 297 |
| 307 type txnMutation struct { | 298 type txnMutation struct { |
| 308 key ds.Key | 299 key ds.Key |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 331 const xgEGLimit = 25 | 322 const xgEGLimit = 25 |
| 332 | 323 |
| 333 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } | 324 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
| 334 func (td *txnDataStoreData) endTxn() { | 325 func (td *txnDataStoreData) endTxn() { |
| 335 if atomic.LoadInt32(&td.closed) == 1 { | 326 if atomic.LoadInt32(&td.closed) == 1 { |
| 336 panic("cannot end transaction twice") | 327 panic("cannot end transaction twice") |
| 337 } | 328 } |
| 338 atomic.StoreInt32(&td.closed, 1) | 329 atomic.StoreInt32(&td.closed, 1) |
| 339 } | 330 } |
| 340 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { | 331 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
| 341 » panic("txnDataStoreData cannot apply transactions") | 332 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
|
dnj (Google)
2015/08/28 17:54:21
"create"
iannucci
2015/08/28 19:48:55
done
| |
| 342 } | 333 } |
| 343 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { | 334 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
| 344 » panic("impossible") | 335 » impossible(fmt.Errorf("cannot creat a recursive transaction")) |
|
dnj (Google)
2015/08/28 17:54:21
"create"
iannucci
2015/08/28 19:48:55
done
| |
| 336 » return nil | |
| 345 } | 337 } |
| 346 | 338 |
| 347 func (td *txnDataStoreData) run(f func() error) error { | 339 func (td *txnDataStoreData) run(f func() error) error { |
| 348 // Slightly different from the SDK... datastore and taskqueue each imple ment | 340 // Slightly different from the SDK... datastore and taskqueue each imple ment |
| 349 // this here, where in the SDK only datastore.transaction.Call does. | 341 // this here, where in the SDK only datastore.transaction.Call does. |
| 350 if atomic.LoadInt32(&td.closed) == 1 { | 342 if atomic.LoadInt32(&td.closed) == 1 { |
| 351 return errors.New("datastore: transaction context has expired") | 343 return errors.New("datastore: transaction context has expired") |
| 352 } | 344 } |
| 353 return f() | 345 return f() |
| 354 } | 346 } |
| 355 | 347 |
| 356 // writeMutation ensures that this transaction can support the given key/value | 348 // writeMutation ensures that this transaction can support the given key/value |
| 357 // mutation. | 349 // mutation. |
| 358 // | 350 // |
| 359 // if getOnly is true, don't record the actual mutation data, just ensure that | 351 // if getOnly is true, don't record the actual mutation data, just ensure that |
| 360 // the key is in an included entity group (or add an empty entry for tha t | 352 // the key is in an included entity group (or add an empty entry for tha t |
| 361 // group). | 353 // group). |
| 362 // | 354 // |
| 363 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 355 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
| 364 // | 356 // |
| 365 // Returns an error if this key causes the transaction to cross too many entity | 357 // Returns an error if this key causes the transaction to cross too many entity |
| 366 // groups. | 358 // groups. |
| 367 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { | 359 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { |
| 368 » rk := string(keyBytes(serialize.WithContext, dskey.Root(key))) | 360 » rk := string(keyBytes(dskey.Root(key))) |
| 369 | 361 |
| 370 td.Lock() | 362 td.Lock() |
| 371 defer td.Unlock() | 363 defer td.Unlock() |
| 372 | 364 |
| 373 if _, ok := td.muts[rk]; !ok { | 365 if _, ok := td.muts[rk]; !ok { |
| 374 limit := 1 | 366 limit := 1 |
| 375 if td.isXG { | 367 if td.isXG { |
| 376 limit = xgEGLimit | 368 limit = xgEGLimit |
| 377 } | 369 } |
| 378 if len(td.muts)+1 > limit { | 370 if len(td.muts)+1 > limit { |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 421 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | 413 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
| 422 for _, k := range keys { | 414 for _, k := range keys { |
| 423 err := td.writeMutation(false, k, nil) | 415 err := td.writeMutation(false, k, nil) |
| 424 if cb != nil { | 416 if cb != nil { |
| 425 cb(err) | 417 cb(err) |
| 426 } | 418 } |
| 427 } | 419 } |
| 428 return nil | 420 return nil |
| 429 } | 421 } |
| 430 | 422 |
| 431 func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte { | 423 func keyBytes(key ds.Key) []byte { |
| 432 » buf := &bytes.Buffer{} | 424 » return serialize.ToBytes(ds.MkProperty(key)) |
| 433 » serialize.WriteKey(buf, ctx, key) | |
| 434 » return buf.Bytes() | |
| 435 } | 425 } |
| 436 | 426 |
| 437 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | 427 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
| 438 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 428 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 439 serialize.WithoutContext, globalAppID, ns) | 429 serialize.WithoutContext, globalAppID, ns) |
| 440 } | 430 } |
| 441 | 431 |
| 442 func rpm(data []byte) (ds.PropertyMap, error) { | 432 func rpm(data []byte) (ds.PropertyMap, error) { |
| 443 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 433 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
| 444 serialize.WithContext, "", "") | 434 serialize.WithContext, "", "") |
| 445 } | 435 } |
| 446 | 436 |
| 447 type keyitem interface { | 437 type keyitem interface { |
| 448 Key() ds.Key | 438 Key() ds.Key |
| 449 } | 439 } |
| OLD | NEW |