OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sync" | 10 "sync" |
11 "sync/atomic" | 11 "sync/atomic" |
12 | 12 |
13 ds "github.com/luci/gae/service/datastore" | 13 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/datastore/dskey" |
| 15 "github.com/luci/gae/service/datastore/serialize" |
14 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
15 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
16 ) | 18 ) |
17 | 19 |
18 //////////////////////////////// dataStoreData ///////////////////////////////// | 20 //////////////////////////////// dataStoreData ///////////////////////////////// |
19 | 21 |
20 type dataStoreData struct { | 22 type dataStoreData struct { |
21 rwlock sync.RWMutex | 23 rwlock sync.RWMutex |
22 // See README.md for store schema. | 24 // See README.md for store schema. |
23 store *memStore | 25 store *memStore |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
71 | 73 |
72 func (d *dataStoreData) catchupIndexes() { | 74 func (d *dataStoreData) catchupIndexes() { |
73 d.rwlock.Lock() | 75 d.rwlock.Lock() |
74 defer d.rwlock.Unlock() | 76 defer d.rwlock.Unlock() |
75 d.snap = d.store.Snapshot() | 77 d.snap = d.store.Snapshot() |
76 } | 78 } |
77 | 79 |
78 /////////////////////////// indicies(dataStoreData) //////////////////////////// | 80 /////////////////////////// indicies(dataStoreData) //////////////////////////// |
79 | 81 |
80 func groupMetaKey(key ds.Key) []byte { | 82 func groupMetaKey(key ds.Key) []byte { |
81 » return keyBytes(ds.WithoutContext, | 83 » return keyBytes(serialize.WithoutContext, |
82 » » ds.NewKey("", "", "__entity_group__", "", 1, ds.KeyRoot(key))) | 84 » » dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key))) |
83 } | 85 } |
84 | 86 |
85 func groupIDsKey(key ds.Key) []byte { | 87 func groupIDsKey(key ds.Key) []byte { |
86 » return keyBytes(ds.WithoutContext, | 88 » return keyBytes(serialize.WithoutContext, |
87 » » ds.NewKey("", "", "__entity_group_ids__", "", 1, ds.KeyRoot(key)
)) | 89 » » dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key)
)) |
88 } | 90 } |
89 | 91 |
90 func rootIDsKey(kind string) []byte { | 92 func rootIDsKey(kind string) []byte { |
91 » return keyBytes(ds.WithoutContext, | 93 » return keyBytes(serialize.WithoutContext, |
92 » » ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) | 94 » » dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
93 } | 95 } |
94 | 96 |
95 func curVersion(ents *memCollection, key []byte) int64 { | 97 func curVersion(ents *memCollection, key []byte) int64 { |
96 if ents != nil { | 98 if ents != nil { |
97 if v := ents.Get(key); v != nil { | 99 if v := ents.Get(key); v != nil { |
98 pm, err := rpm(v) | 100 pm, err := rpm(v) |
99 if err != nil { | 101 if err != nil { |
100 panic(err) // memory corruption | 102 panic(err) // memory corruption |
101 } | 103 } |
102 pl, ok := pm["__version__"] | 104 pl, ok := pm["__version__"] |
103 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { | 105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
104 return pl[0].Value().(int64) | 106 return pl[0].Value().(int64) |
105 } | 107 } |
106 panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) | 108 panic(fmt.Errorf("__version__ property missing or wrong:
%v", pm)) |
107 } | 109 } |
108 } | 110 } |
109 return 0 | 111 return 0 |
110 } | 112 } |
111 | 113 |
112 func incrementLocked(ents *memCollection, key []byte) int64 { | 114 func incrementLocked(ents *memCollection, key []byte) int64 { |
113 ret := curVersion(ents, key) + 1 | 115 ret := curVersion(ents, key) + 1 |
114 buf := &bytes.Buffer{} | 116 buf := &bytes.Buffer{} |
115 » ds.PropertyMap{"__version__": {ds.MkPropertyNI(ret)}}.Write( | 117 » serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ |
116 » » buf, ds.WithContext) | 118 » » "__version__": {ds.MkPropertyNI(ret)}}) |
117 ents.Set(key, buf.Bytes()) | 119 ents.Set(key, buf.Bytes()) |
118 return ret | 120 return ret |
119 } | 121 } |
120 | 122 |
121 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { | 123 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
122 coll := "ents:" + key.Namespace() | 124 coll := "ents:" + key.Namespace() |
123 ents := d.store.GetCollection(coll) | 125 ents := d.store.GetCollection(coll) |
124 if ents == nil { | 126 if ents == nil { |
125 ents = d.store.SetCollection(coll, nil) | 127 ents = d.store.SetCollection(coll, nil) |
126 } | 128 } |
127 | 129 |
128 » if ds.KeyIncomplete(key) { | 130 » if dskey.Incomplete(key) { |
129 idKey := []byte(nil) | 131 idKey := []byte(nil) |
130 if key.Parent() == nil { | 132 if key.Parent() == nil { |
131 idKey = rootIDsKey(key.Kind()) | 133 idKey = rootIDsKey(key.Kind()) |
132 } else { | 134 } else { |
133 idKey = groupIDsKey(key) | 135 idKey = groupIDsKey(key) |
134 } | 136 } |
135 id := incrementLocked(ents, idKey) | 137 id := incrementLocked(ents, idKey) |
136 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) | 138 » » key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id
, key.Parent()) |
137 } | 139 } |
138 | 140 |
139 return ents, key | 141 return ents, key |
140 } | 142 } |
141 | 143 |
142 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { | 144 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
MultiCB) { |
143 for i, k := range keys { | 145 for i, k := range keys { |
144 buf := &bytes.Buffer{} | 146 buf := &bytes.Buffer{} |
145 pmap, _ := vals[i].Save(false) | 147 pmap, _ := vals[i].Save(false) |
146 » » pmap.Write(buf, ds.WithoutContext) | 148 » » serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap) |
147 dataBytes := buf.Bytes() | 149 dataBytes := buf.Bytes() |
148 | 150 |
149 k, err := func() (ret ds.Key, err error) { | 151 k, err := func() (ret ds.Key, err error) { |
150 d.rwlock.Lock() | 152 d.rwlock.Lock() |
151 defer d.rwlock.Unlock() | 153 defer d.rwlock.Unlock() |
152 | 154 |
153 ents, ret := d.entsKeyLocked(k) | 155 ents, ret := d.entsKeyLocked(k) |
154 incrementLocked(ents, groupMetaKey(ret)) | 156 incrementLocked(ents, groupMetaKey(ret)) |
155 | 157 |
156 » » » old := ents.Get(keyBytes(ds.WithoutContext, ret)) | 158 » » » old := ents.Get(keyBytes(serialize.WithoutContext, ret)) |
157 oldPM := ds.PropertyMap(nil) | 159 oldPM := ds.PropertyMap(nil) |
158 if old != nil { | 160 if old != nil { |
159 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { | 161 if oldPM, err = rpmWoCtx(old, ret.Namespace());
err != nil { |
160 return | 162 return |
161 } | 163 } |
162 } | 164 } |
163 updateIndicies(d.store, ret, oldPM, pmap) | 165 updateIndicies(d.store, ret, oldPM, pmap) |
164 » » » ents.Set(keyBytes(ds.WithoutContext, ret), dataBytes) | 166 » » » ents.Set(keyBytes(serialize.WithoutContext, ret), dataBy
tes) |
165 return | 167 return |
166 }() | 168 }() |
167 if cb != nil { | 169 if cb != nil { |
168 cb(k, err) | 170 cb(k, err) |
169 } | 171 } |
170 } | 172 } |
171 } | 173 } |
172 | 174 |
173 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { | 175 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
on, error)) error { |
174 ents, err := getColl() | 176 ents, err := getColl() |
175 if err != nil { | 177 if err != nil { |
176 return err | 178 return err |
177 } | 179 } |
178 if ents == nil { | 180 if ents == nil { |
179 for range keys { | 181 for range keys { |
180 cb(nil, ds.ErrNoSuchEntity) | 182 cb(nil, ds.ErrNoSuchEntity) |
181 } | 183 } |
182 return nil | 184 return nil |
183 } | 185 } |
184 | 186 |
185 for _, k := range keys { | 187 for _, k := range keys { |
186 » » pdata := ents.Get(keyBytes(ds.WithoutContext, k)) | 188 » » pdata := ents.Get(keyBytes(serialize.WithoutContext, k)) |
187 if pdata == nil { | 189 if pdata == nil { |
188 cb(nil, ds.ErrNoSuchEntity) | 190 cb(nil, ds.ErrNoSuchEntity) |
189 continue | 191 continue |
190 } | 192 } |
191 cb(rpmWoCtx(pdata, k.Namespace())) | 193 cb(rpmWoCtx(pdata, k.Namespace())) |
192 } | 194 } |
193 return nil | 195 return nil |
194 } | 196 } |
195 | 197 |
196 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { | 198 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
197 getMultiInner(keys, cb, func() (*memCollection, error) { | 199 getMultiInner(keys, cb, func() (*memCollection, error) { |
198 s := d.takeSnapshot() | 200 s := d.takeSnapshot() |
199 | 201 |
200 return s.GetCollection("ents:" + keys[0].Namespace()), nil | 202 return s.GetCollection("ents:" + keys[0].Namespace()), nil |
201 }) | 203 }) |
202 return nil | 204 return nil |
203 } | 205 } |
204 | 206 |
205 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { | 207 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
206 toDel := make([][]byte, 0, len(keys)) | 208 toDel := make([][]byte, 0, len(keys)) |
207 for _, k := range keys { | 209 for _, k := range keys { |
208 » » toDel = append(toDel, keyBytes(ds.WithoutContext, k)) | 210 » » toDel = append(toDel, keyBytes(serialize.WithoutContext, k)) |
209 } | 211 } |
210 ns := keys[0].Namespace() | 212 ns := keys[0].Namespace() |
211 | 213 |
212 d.rwlock.Lock() | 214 d.rwlock.Lock() |
213 defer d.rwlock.Unlock() | 215 defer d.rwlock.Unlock() |
214 | 216 |
215 ents := d.store.GetCollection("ents:" + ns) | 217 ents := d.store.GetCollection("ents:" + ns) |
216 | 218 |
217 for i, k := range keys { | 219 for i, k := range keys { |
218 if ents != nil { | 220 if ents != nil { |
(...skipping 18 matching lines...) Expand all Loading... |
237 } | 239 } |
238 | 240 |
239 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { | 241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
240 // TODO(riannucci): implement with Flush/FlushRevert for persistance. | 242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
241 | 243 |
242 txn := obj.(*txnDataStoreData) | 244 txn := obj.(*txnDataStoreData) |
243 for rk, muts := range txn.muts { | 245 for rk, muts := range txn.muts { |
244 if len(muts) == 0 { // read-only | 246 if len(muts) == 0 { // read-only |
245 continue | 247 continue |
246 } | 248 } |
247 » » k, err := ds.ReadKey(bytes.NewBufferString(rk), ds.WithContext,
"", "") | 249 » » k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize
.WithContext, "", "") |
248 if err != nil { | 250 if err != nil { |
249 panic(err) | 251 panic(err) |
250 } | 252 } |
251 | 253 |
252 entKey := "ents:" + k.Namespace() | 254 entKey := "ents:" + k.Namespace() |
253 mkey := groupMetaKey(k) | 255 mkey := groupMetaKey(k) |
254 entsHead := d.store.GetCollection(entKey) | 256 entsHead := d.store.GetCollection(entKey) |
255 entsSnap := txn.snap.GetCollection(entKey) | 257 entsSnap := txn.snap.GetCollection(entKey) |
256 vHead := curVersion(entsHead, mkey) | 258 vHead := curVersion(entsHead, mkey) |
257 vSnap := curVersion(entsSnap, mkey) | 259 vSnap := curVersion(entsSnap, mkey) |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
356 // | 358 // |
357 // if getOnly is true, don't record the actual mutation data, just ensure that | 359 // if getOnly is true, don't record the actual mutation data, just ensure that |
358 // the key is in an included entity group (or add an empty entry for tha
t | 360 // the key is in an included entity group (or add an empty entry for tha
t |
359 // group). | 361 // group). |
360 // | 362 // |
361 // if !getOnly && data == nil, this counts as a deletion instead of a Put. | 363 // if !getOnly && data == nil, this counts as a deletion instead of a Put. |
362 // | 364 // |
363 // Returns an error if this key causes the transaction to cross too many entity | 365 // Returns an error if this key causes the transaction to cross too many entity |
364 // groups. | 366 // groups. |
365 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { | 367 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop
ertyMap) error { |
366 » rk := string(keyBytes(ds.WithContext, ds.KeyRoot(key))) | 368 » rk := string(keyBytes(serialize.WithContext, dskey.Root(key))) |
367 | 369 |
368 td.Lock() | 370 td.Lock() |
369 defer td.Unlock() | 371 defer td.Unlock() |
370 | 372 |
371 if _, ok := td.muts[rk]; !ok { | 373 if _, ok := td.muts[rk]; !ok { |
372 limit := 1 | 374 limit := 1 |
373 if td.isXG { | 375 if td.isXG { |
374 limit = xgEGLimit | 376 limit = xgEGLimit |
375 } | 377 } |
376 if len(td.muts)+1 > limit { | 378 if len(td.muts)+1 > limit { |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
419 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | 421 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
420 for _, k := range keys { | 422 for _, k := range keys { |
421 err := td.writeMutation(false, k, nil) | 423 err := td.writeMutation(false, k, nil) |
422 if cb != nil { | 424 if cb != nil { |
423 cb(err) | 425 cb(err) |
424 } | 426 } |
425 } | 427 } |
426 return nil | 428 return nil |
427 } | 429 } |
428 | 430 |
429 func keyBytes(ctx ds.KeyContext, key ds.Key) []byte { | 431 func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte { |
430 buf := &bytes.Buffer{} | 432 buf := &bytes.Buffer{} |
431 » ds.WriteKey(buf, ctx, key) | 433 » serialize.WriteKey(buf, ctx, key) |
432 return buf.Bytes() | 434 return buf.Bytes() |
433 } | 435 } |
434 | 436 |
435 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { | 437 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
436 » ret := ds.PropertyMap{} | 438 » return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
437 » err := ret.Read(bytes.NewBuffer(data), ds.WithoutContext, globalAppID, n
s) | 439 » » serialize.WithoutContext, globalAppID, ns) |
438 » return ret, err | |
439 } | 440 } |
440 | 441 |
441 func rpm(data []byte) (ds.PropertyMap, error) { | 442 func rpm(data []byte) (ds.PropertyMap, error) { |
442 » ret := ds.PropertyMap{} | 443 » return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
443 » err := ret.Read(bytes.NewBuffer(data), ds.WithContext, "", "") | 444 » » serialize.WithContext, "", "") |
444 » return ret, err | |
445 } | 445 } |
446 | 446 |
447 type keyitem interface { | 447 type keyitem interface { |
448 Key() ds.Key | 448 Key() ds.Key |
449 } | 449 } |
OLD | NEW |