OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
11 "sync/atomic" | 11 "sync/atomic" |
12 | 12 |
13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
14 "github.com/luci/gae/service/datastore/dskey" | |
15 "github.com/luci/gae/service/datastore/serialize" | 14 "github.com/luci/gae/service/datastore/serialize" |
16 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
17 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
18 ) | 17 ) |
19 | 18 |
20 //////////////////////////////// dataStoreData ///////////////////////////////// | 19 //////////////////////////////// dataStoreData ///////////////////////////////// |
21 | 20 |
22 type dataStoreData struct { | 21 type dataStoreData struct { |
23 rwlock sync.RWMutex | 22 rwlock sync.RWMutex |
24 // See README.md for head schema. | 23 // See README.md for head schema. |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
74 } | 73 } |
75 | 74 |
76 func (d *dataStoreData) catchupIndexes() { | 75 func (d *dataStoreData) catchupIndexes() { |
77 d.rwlock.Lock() | 76 d.rwlock.Lock() |
78 defer d.rwlock.Unlock() | 77 defer d.rwlock.Unlock() |
79 d.snap = d.head.Snapshot() | 78 d.snap = d.head.Snapshot() |
80 } | 79 } |
81 | 80 |
82 /////////////////////////// indexes(dataStoreData) //////////////////////////// | 81 /////////////////////////// indexes(dataStoreData) //////////////////////////// |
83 | 82 |
84 func groupMetaKey(key ds.Key) []byte { | 83 func groupMetaKey(key *ds.Key) []byte { |
85 » return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root( key))) | 84 » return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) |
86 } | 85 } |
87 | 86 |
88 func groupIDsKey(key ds.Key) []byte { | 87 func groupIDsKey(key *ds.Key) []byte { |
89 » return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.R oot(key))) | 88 » return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo t())) |
90 } | 89 } |
91 | 90 |
92 func rootIDsKey(kind string) []byte { | 91 func rootIDsKey(kind string) []byte { |
93 » return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) | 92 » return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) |
94 } | 93 } |
95 | 94 |
96 func curVersion(ents *memCollection, key []byte) int64 { | 95 func curVersion(ents *memCollection, key []byte) int64 { |
97 if ents != nil { | 96 if ents != nil { |
98 if v := ents.Get(key); v != nil { | 97 if v := ents.Get(key); v != nil { |
99 pm, err := rpm(v) | 98 pm, err := rpm(v) |
100 memoryCorruption(err) | 99 memoryCorruption(err) |
101 | 100 |
102 pl, ok := pm["__version__"] | 101 pl, ok := pm["__version__"] |
103 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 102 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
104 return pl[0].Value().(int64) | 103 return pl[0].Value().(int64) |
105 } | 104 } |
106 | 105 |
107 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm)) | 106 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm)) |
108 } | 107 } |
109 } | 108 } |
110 return 0 | 109 return 0 |
111 } | 110 } |
112 | 111 |
113 func incrementLocked(ents *memCollection, key []byte) int64 { | 112 func incrementLocked(ents *memCollection, key []byte) int64 { |
114 ret := curVersion(ents, key) + 1 | 113 ret := curVersion(ents, key) + 1 |
115 ents.Set(key, serialize.ToBytes(ds.PropertyMap{ | 114 ents.Set(key, serialize.ToBytes(ds.PropertyMap{ |
116 "__version__": {ds.MkPropertyNI(ret)}, | 115 "__version__": {ds.MkPropertyNI(ret)}, |
117 })) | 116 })) |
118 return ret | 117 return ret |
119 } | 118 } |
120 | 119 |
121 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { | 120 func (d *dataStoreData) entsKeyLocked(key *ds.Key) (*memCollection, *ds.Key) { |
122 coll := "ents:" + key.Namespace() | 121 coll := "ents:" + key.Namespace() |
123 ents := d.head.GetCollection(coll) | 122 ents := d.head.GetCollection(coll) |
124 if ents == nil { | 123 if ents == nil { |
125 ents = d.head.SetCollection(coll, nil) | 124 ents = d.head.SetCollection(coll, nil) |
126 } | 125 } |
127 | 126 |
128 » if dskey.Incomplete(key) { | 127 » if key.Incomplete() { |
129 idKey := []byte(nil) | 128 idKey := []byte(nil) |
130 if key.Parent() == nil { | 129 if key.Parent() == nil { |
131 » » » idKey = rootIDsKey(key.Kind()) | 130 » » » idKey = rootIDsKey(key.Last().Kind) |
132 } else { | 131 } else { |
133 idKey = groupIDsKey(key) | 132 idKey = groupIDsKey(key) |
134 } | 133 } |
135 id := incrementLocked(ents, idKey) | 134 id := incrementLocked(ents, idKey) |
136 » » key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) | 135 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Last().Kind, " ", id, key.Parent()) |
137 } | 136 } |
138 | 137 |
139 return ents, key | 138 return ents, key |
140 } | 139 } |
141 | 140 |
142 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { | 141 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) { |
143 for i, k := range keys { | 142 for i, k := range keys { |
144 pmap, _ := vals[i].Save(false) | 143 pmap, _ := vals[i].Save(false) |
145 dataBytes := serialize.ToBytes(pmap) | 144 dataBytes := serialize.ToBytes(pmap) |
146 | 145 |
147 » » k, err := func() (ret ds.Key, err error) { | 146 » » k, err := func() (ret *ds.Key, err error) { |
148 d.rwlock.Lock() | 147 d.rwlock.Lock() |
149 defer d.rwlock.Unlock() | 148 defer d.rwlock.Unlock() |
150 | 149 |
151 ents, ret := d.entsKeyLocked(k) | 150 ents, ret := d.entsKeyLocked(k) |
152 incrementLocked(ents, groupMetaKey(ret)) | 151 incrementLocked(ents, groupMetaKey(ret)) |
153 | 152 |
154 old := ents.Get(keyBytes(ret)) | 153 old := ents.Get(keyBytes(ret)) |
155 oldPM := ds.PropertyMap(nil) | 154 oldPM := ds.PropertyMap(nil) |
156 if old != nil { | 155 if old != nil { |
157 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { | 156 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { |
158 return | 157 return |
159 } | 158 } |
160 } | 159 } |
161 updateIndexes(d.head, ret, oldPM, pmap) | 160 updateIndexes(d.head, ret, oldPM, pmap) |
162 ents.Set(keyBytes(ret), dataBytes) | 161 ents.Set(keyBytes(ret), dataBytes) |
163 return | 162 return |
164 }() | 163 }() |
165 if cb != nil { | 164 if cb != nil { |
166 cb(k, err) | 165 cb(k, err) |
167 } | 166 } |
168 } | 167 } |
169 } | 168 } |
170 | 169 |
171 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { | 170 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { |
172 ents, err := getColl() | 171 ents, err := getColl() |
173 if err != nil { | 172 if err != nil { |
174 return err | 173 return err |
175 } | 174 } |
176 if ents == nil { | 175 if ents == nil { |
177 for range keys { | 176 for range keys { |
178 cb(nil, ds.ErrNoSuchEntity) | 177 cb(nil, ds.ErrNoSuchEntity) |
179 } | 178 } |
180 return nil | 179 return nil |
181 } | 180 } |
182 | 181 |
183 for _, k := range keys { | 182 for _, k := range keys { |
184 pdata := ents.Get(keyBytes(k)) | 183 pdata := ents.Get(keyBytes(k)) |
185 if pdata == nil { | 184 if pdata == nil { |
186 cb(nil, ds.ErrNoSuchEntity) | 185 cb(nil, ds.ErrNoSuchEntity) |
187 continue | 186 continue |
188 } | 187 } |
189 cb(rpmWoCtx(pdata, k.Namespace())) | 188 cb(rpmWoCtx(pdata, k.Namespace())) |
190 } | 189 } |
191 return nil | 190 return nil |
192 } | 191 } |
193 | 192 |
194 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | 193 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
195 getMultiInner(keys, cb, func() (*memCollection, error) { | 194 getMultiInner(keys, cb, func() (*memCollection, error) { |
196 s := d.takeSnapshot() | 195 s := d.takeSnapshot() |
197 | 196 |
198 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 197 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
199 }) | 198 }) |
200 return nil | 199 return nil |
201 } | 200 } |
202 | 201 |
203 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { | 202 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { |
204 toDel := make([][]byte, 0, len(keys)) | 203 toDel := make([][]byte, 0, len(keys)) |
205 for _, k := range keys { | 204 for _, k := range keys { |
206 toDel = append(toDel, keyBytes(k)) | 205 toDel = append(toDel, keyBytes(k)) |
207 } | 206 } |
208 ns := keys[0].Namespace() | 207 ns := keys[0].Namespace() |
209 | 208 |
210 d.rwlock.Lock() | 209 d.rwlock.Lock() |
211 defer d.rwlock.Unlock() | 210 defer d.rwlock.Unlock() |
212 | 211 |
213 ents := d.head.GetCollection("ents:" + ns) | 212 ents := d.head.GetCollection("ents:" + ns) |
(...skipping 24 matching lines...) Expand all Loading... | |
238 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 237 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
239 | 238 |
240 txn := obj.(*txnDataStoreData) | 239 txn := obj.(*txnDataStoreData) |
241 for rk, muts := range txn.muts { | 240 for rk, muts := range txn.muts { |
242 if len(muts) == 0 { // read-only | 241 if len(muts) == 0 { // read-only |
243 continue | 242 continue |
244 } | 243 } |
245 prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "") | 244 prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "") |
246 memoryCorruption(err) | 245 memoryCorruption(err) |
247 | 246 |
248 » » k := prop.Value().(ds.Key) | 247 » » k := prop.Value().(*ds.Key) |
249 | 248 |
250 entKey := "ents:" + k.Namespace() | 249 entKey := "ents:" + k.Namespace() |
251 mkey := groupMetaKey(k) | 250 mkey := groupMetaKey(k) |
252 entsHead := d.head.GetCollection(entKey) | 251 entsHead := d.head.GetCollection(entKey) |
253 entsSnap := txn.snap.GetCollection(entKey) | 252 entsSnap := txn.snap.GetCollection(entKey) |
254 vHead := curVersion(entsHead, mkey) | 253 vHead := curVersion(entsHead, mkey) |
255 vSnap := curVersion(entsSnap, mkey) | 254 vSnap := curVersion(entsSnap, mkey) |
256 if vHead != vSnap { | 255 if vHead != vSnap { |
257 return false | 256 return false |
258 } | 257 } |
259 } | 258 } |
260 return true | 259 return true |
261 } | 260 } |
262 | 261 |
263 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { | 262 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
264 txn := obj.(*txnDataStoreData) | 263 txn := obj.(*txnDataStoreData) |
265 for _, muts := range txn.muts { | 264 for _, muts := range txn.muts { |
266 if len(muts) == 0 { // read-only | 265 if len(muts) == 0 { // read-only |
267 continue | 266 continue |
268 } | 267 } |
269 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti | 268 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti |
270 for _, m := range muts { | 269 for _, m := range muts { |
271 err := error(nil) | 270 err := error(nil) |
272 k := m.key | 271 k := m.key |
273 if m.data == nil { | 272 if m.data == nil { |
274 » » » » d.delMulti([]ds.Key{k}, | 273 » » » » d.delMulti([]*ds.Key{k}, |
275 func(e error) { err = e }) | 274 func(e error) { err = e }) |
276 } else { | 275 } else { |
277 » » » » d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, | 276 » » » » d.putMulti([]*ds.Key{m.key}, []ds.PropertyMap{m. data}, |
278 » » » » » func(_ ds.Key, e error) { err = e }) | 277 » » » » » func(_ *ds.Key, e error) { err = e }) |
279 } | 278 } |
280 impossible(err) | 279 impossible(err) |
281 } | 280 } |
282 } | 281 } |
283 } | 282 } |
284 | 283 |
285 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { | 284 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
286 return &txnDataStoreData{ | 285 return &txnDataStoreData{ |
287 // alias to the main datastore's so that testing code can have p rimitive | 286 // alias to the main datastore's so that testing code can have p rimitive |
288 // access to break features inside of transactions. | 287 // access to break features inside of transactions. |
289 parent: d, | 288 parent: d, |
290 isXG: o != nil && o.XG, | 289 isXG: o != nil && o.XG, |
291 snap: d.head.Snapshot(), | 290 snap: d.head.Snapshot(), |
292 muts: map[string][]txnMutation{}, | 291 muts: map[string][]txnMutation{}, |
293 } | 292 } |
294 } | 293 } |
295 | 294 |
296 func (d *dataStoreData) endTxn() {} | 295 func (d *dataStoreData) endTxn() {} |
297 | 296 |
298 /////////////////////////////// txnDataStoreData /////////////////////////////// | 297 /////////////////////////////// txnDataStoreData /////////////////////////////// |
299 | 298 |
300 type txnMutation struct { | 299 type txnMutation struct { |
301 » key ds.Key | 300 » key *ds.Key |
302 data ds.PropertyMap | 301 data ds.PropertyMap |
303 } | 302 } |
304 | 303 |
305 type txnDataStoreData struct { | 304 type txnDataStoreData struct { |
306 sync.Mutex | 305 sync.Mutex |
307 | 306 |
308 parent *dataStoreData | 307 parent *dataStoreData |
309 | 308 |
310 // boolean 0 or 1, use atomic.*Int32 to access. | 309 // boolean 0 or 1, use atomic.*Int32 to access. |
311 closed int32 | 310 closed int32 |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
351 // mutation. | 350 // mutation. |
352 // | 351 // |
353 // if getOnly is true, don't record the actual mutation data, just ensure that | 352 // if getOnly is true, don't record the actual mutation data, just ensure that |
354 // the key is in an included entity group (or add an empty entry for tha t | 353 // the key is in an included entity group (or add an empty entry for tha t |
355 // group). | 354 // group). |
356 // | 355 // |
357 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 356 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
358 // | 357 // |
359 // Returns an error if this key causes the transaction to cross too many entity | 358 // Returns an error if this key causes the transaction to cross too many entity |
360 // groups. | 359 // groups. |
361 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { | 360 func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro pertyMap) error { |
362 » rk := string(keyBytes(dskey.Root(key))) | 361 » rk := string(keyBytes(key.Root())) |
363 | 362 |
364 td.Lock() | 363 td.Lock() |
365 defer td.Unlock() | 364 defer td.Unlock() |
366 | 365 |
367 if _, ok := td.muts[rk]; !ok { | 366 if _, ok := td.muts[rk]; !ok { |
368 limit := 1 | 367 limit := 1 |
369 if td.isXG { | 368 if td.isXG { |
370 limit = xgEGLimit | 369 limit = xgEGLimit |
371 } | 370 } |
372 if len(td.muts)+1 > limit { | 371 if len(td.muts)+1 > limit { |
373 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" | 372 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" |
374 if td.isXG { | 373 if td.isXG { |
375 msg = "operating on too many entity groups in a single transaction" | 374 msg = "operating on too many entity groups in a single transaction" |
376 } | 375 } |
377 return errors.New(msg) | 376 return errors.New(msg) |
378 } | 377 } |
379 td.muts[rk] = []txnMutation{} | 378 td.muts[rk] = []txnMutation{} |
380 } | 379 } |
381 if !getOnly { | 380 if !getOnly { |
382 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) | 381 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
383 } | 382 } |
384 | 383 |
385 return nil | 384 return nil |
386 } | 385 } |
387 | 386 |
388 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds .PutMultiCB) { | 387 func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d s.PutMultiCB) { |
389 for i, k := range keys { | 388 for i, k := range keys { |
390 func() { | 389 func() { |
391 td.parent.Lock() | 390 td.parent.Lock() |
392 defer td.parent.Unlock() | 391 defer td.parent.Unlock() |
393 _, k = td.parent.entsKeyLocked(k) | 392 _, k = td.parent.entsKeyLocked(k) |
394 }() | 393 }() |
395 err := td.writeMutation(false, k, vals[i]) | 394 err := td.writeMutation(false, k, vals[i]) |
396 if cb != nil { | 395 if cb != nil { |
397 cb(k, err) | 396 cb(k, err) |
398 } | 397 } |
399 } | 398 } |
400 } | 399 } |
401 | 400 |
402 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | 401 func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { |
403 return getMultiInner(keys, cb, func() (*memCollection, error) { | 402 return getMultiInner(keys, cb, func() (*memCollection, error) { |
404 err := error(nil) | 403 err := error(nil) |
405 for _, key := range keys { | 404 for _, key := range keys { |
406 err = td.writeMutation(true, key, nil) | 405 err = td.writeMutation(true, key, nil) |
407 if err != nil { | 406 if err != nil { |
408 return nil, err | 407 return nil, err |
409 } | 408 } |
410 } | 409 } |
411 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil | 410 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil |
412 }) | 411 }) |
413 } | 412 } |
414 | 413 |
415 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | 414 func (td *txnDataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { |
416 for _, k := range keys { | 415 for _, k := range keys { |
417 err := td.writeMutation(false, k, nil) | 416 err := td.writeMutation(false, k, nil) |
418 if cb != nil { | 417 if cb != nil { |
419 cb(err) | 418 cb(err) |
420 } | 419 } |
421 } | 420 } |
422 return nil | 421 return nil |
423 } | 422 } |
424 | 423 |
425 func keyBytes(key ds.Key) []byte { | 424 func keyBytes(key *ds.Key) []byte { |
426 return serialize.ToBytes(ds.MkProperty(key)) | 425 return serialize.ToBytes(ds.MkProperty(key)) |
427 } | 426 } |
428 | 427 |
429 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | 428 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
430 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 429 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
431 serialize.WithoutContext, globalAppID, ns) | 430 serialize.WithoutContext, globalAppID, ns) |
432 } | 431 } |
433 | 432 |
434 func rpm(data []byte) (ds.PropertyMap, error) { | 433 func rpm(data []byte) (ds.PropertyMap, error) { |
435 return serialize.ReadPropertyMap(bytes.NewBuffer(data), | 434 return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
436 serialize.WithContext, "", "") | 435 serialize.WithContext, "", "") |
437 } | 436 } |
438 | |
439 type keyitem interface { | |
440 Key() ds.Key | |
441 } | |
iannucci
2015/09/18 04:31:52
stale code is stale!
| |
OLD | NEW |