Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(39)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: minor fixes Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/dskey" 14 "github.com/luci/gae/service/datastore/dskey"
15 "github.com/luci/gae/service/datastore/serialize" 15 "github.com/luci/gae/service/datastore/serialize"
16 "github.com/luci/luci-go/common/errors" 16 "github.com/luci/luci-go/common/errors"
17 "golang.org/x/net/context" 17 "golang.org/x/net/context"
18 ) 18 )
19 19
20 //////////////////////////////// dataStoreData ///////////////////////////////// 20 //////////////////////////////// dataStoreData /////////////////////////////////
21 21
22 type dataStoreData struct { 22 type dataStoreData struct {
23 rwlock sync.RWMutex 23 rwlock sync.RWMutex
24 » // See README.md for store schema. 24 » // See README.md for head schema.
25 » store *memStore 25 » head *memStore
26 » snap *memStore 26 » snap *memStore
27 } 27 }
28 28
29 var ( 29 var (
30 _ = memContextObj((*dataStoreData)(nil)) 30 _ = memContextObj((*dataStoreData)(nil))
31 _ = sync.Locker((*dataStoreData)(nil)) 31 _ = sync.Locker((*dataStoreData)(nil))
32 ) 32 )
33 33
34 func newDataStoreData() *dataStoreData { 34 func newDataStoreData() *dataStoreData {
35 » store := newMemStore() 35 » head := newMemStore()
36 return &dataStoreData{ 36 return &dataStoreData{
37 » » store: store, 37 » » head: head,
38 » » snap: store.Snapshot(), // empty but better than a nil pointer. 38 » » snap: head.Snapshot(), // empty but better than a nil pointer.
39 } 39 }
40 } 40 }
41 41
42 func (d *dataStoreData) Lock() { 42 func (d *dataStoreData) Lock() {
43 d.rwlock.Lock() 43 d.rwlock.Lock()
44 } 44 }
45 45
46 func (d *dataStoreData) Unlock() { 46 func (d *dataStoreData) Unlock() {
47 d.rwlock.Unlock() 47 d.rwlock.Unlock()
48 } 48 }
49 49
50 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { 50 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
51 d.rwlock.RLock() 51 d.rwlock.RLock()
52 defer d.rwlock.RUnlock() 52 defer d.rwlock.RUnlock()
53 » head = d.store.Snapshot() 53 » head = d.head.Snapshot()
54 if consistent { 54 if consistent {
55 idx = head 55 idx = head
56 } else { 56 } else {
57 idx = d.snap 57 idx = d.snap
58 } 58 }
59 return 59 return
60 } 60 }
61 61
62 func (d *dataStoreData) takeSnapshot() *memStore { 62 func (d *dataStoreData) takeSnapshot() *memStore {
63 d.rwlock.RLock() 63 d.rwlock.RLock()
64 defer d.rwlock.RUnlock() 64 defer d.rwlock.RUnlock()
65 » return d.store.Snapshot() 65 » return d.head.Snapshot()
66 } 66 }
67 67
68 func (d *dataStoreData) setSnapshot(snap *memStore) { 68 func (d *dataStoreData) setSnapshot(snap *memStore) {
69 d.rwlock.Lock() 69 d.rwlock.Lock()
70 defer d.rwlock.Unlock() 70 defer d.rwlock.Unlock()
71 d.snap = snap 71 d.snap = snap
72 } 72 }
73 73
74 func (d *dataStoreData) catchupIndexes() { 74 func (d *dataStoreData) catchupIndexes() {
75 d.rwlock.Lock() 75 d.rwlock.Lock()
76 defer d.rwlock.Unlock() 76 defer d.rwlock.Unlock()
77 » d.snap = d.store.Snapshot() 77 » d.snap = d.head.Snapshot()
78 } 78 }
79 79
80 /////////////////////////// indicies(dataStoreData) //////////////////////////// 80 /////////////////////////// indicies(dataStoreData) ////////////////////////////
81 81
82 func groupMetaKey(key ds.Key) []byte { 82 func groupMetaKey(key ds.Key) []byte {
83 » return keyBytes(serialize.WithoutContext, 83 » return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root( key)))
84 » » dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key)))
85 } 84 }
86 85
87 func groupIDsKey(key ds.Key) []byte { 86 func groupIDsKey(key ds.Key) []byte {
88 » return keyBytes(serialize.WithoutContext, 87 » return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.R oot(key)))
89 » » dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key) ))
90 } 88 }
91 89
92 func rootIDsKey(kind string) []byte { 90 func rootIDsKey(kind string) []byte {
93 » return keyBytes(serialize.WithoutContext, 91 » return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
94 » » dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
95 } 92 }
96 93
97 func curVersion(ents *memCollection, key []byte) int64 { 94 func curVersion(ents *memCollection, key []byte) int64 {
98 if ents != nil { 95 if ents != nil {
99 if v := ents.Get(key); v != nil { 96 if v := ents.Get(key); v != nil {
100 pm, err := rpm(v) 97 pm, err := rpm(v)
101 » » » if err != nil { 98 » » » memoryCorruption(err)
102 » » » » panic(err) // memory corruption 99
103 » » » }
104 pl, ok := pm["__version__"] 100 pl, ok := pm["__version__"]
105 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { 101 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
106 return pl[0].Value().(int64) 102 return pl[0].Value().(int64)
107 } 103 }
108 » » » panic(fmt.Errorf("__version__ property missing or wrong: %v", pm)) 104
105 » » » memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm))
109 } 106 }
110 } 107 }
111 return 0 108 return 0
112 } 109 }
113 110
114 func incrementLocked(ents *memCollection, key []byte) int64 { 111 func incrementLocked(ents *memCollection, key []byte) int64 {
115 ret := curVersion(ents, key) + 1 112 ret := curVersion(ents, key) + 1
116 » buf := &bytes.Buffer{} 113 » ents.Set(key, serialize.ToBytes(ds.PropertyMap{
117 » serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ 114 » » "__version__": {ds.MkPropertyNI(ret)},
118 » » "__version__": {ds.MkPropertyNI(ret)}}) 115 » }))
119 » ents.Set(key, buf.Bytes())
120 return ret 116 return ret
121 } 117 }
122 118
123 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { 119 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) {
124 coll := "ents:" + key.Namespace() 120 coll := "ents:" + key.Namespace()
125 » ents := d.store.GetCollection(coll) 121 » ents := d.head.GetCollection(coll)
126 if ents == nil { 122 if ents == nil {
127 » » ents = d.store.SetCollection(coll, nil) 123 » » ents = d.head.SetCollection(coll, nil)
128 } 124 }
129 125
130 if dskey.Incomplete(key) { 126 if dskey.Incomplete(key) {
131 idKey := []byte(nil) 127 idKey := []byte(nil)
132 if key.Parent() == nil { 128 if key.Parent() == nil {
133 idKey = rootIDsKey(key.Kind()) 129 idKey = rootIDsKey(key.Kind())
134 } else { 130 } else {
135 idKey = groupIDsKey(key) 131 idKey = groupIDsKey(key)
136 } 132 }
137 id := incrementLocked(ents, idKey) 133 id := incrementLocked(ents, idKey)
138 key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) 134 key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent())
139 } 135 }
140 136
141 return ents, key 137 return ents, key
142 } 138 }
143 139
144 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { 140 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) {
145 for i, k := range keys { 141 for i, k := range keys {
146 buf := &bytes.Buffer{}
147 pmap, _ := vals[i].Save(false) 142 pmap, _ := vals[i].Save(false)
148 » » serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap) 143 » » dataBytes := serialize.ToBytes(pmap)
149 » » dataBytes := buf.Bytes()
150 144
151 k, err := func() (ret ds.Key, err error) { 145 k, err := func() (ret ds.Key, err error) {
152 d.rwlock.Lock() 146 d.rwlock.Lock()
153 defer d.rwlock.Unlock() 147 defer d.rwlock.Unlock()
154 148
155 ents, ret := d.entsKeyLocked(k) 149 ents, ret := d.entsKeyLocked(k)
156 incrementLocked(ents, groupMetaKey(ret)) 150 incrementLocked(ents, groupMetaKey(ret))
157 151
158 » » » old := ents.Get(keyBytes(serialize.WithoutContext, ret)) 152 » » » old := ents.Get(keyBytes(ret))
159 oldPM := ds.PropertyMap(nil) 153 oldPM := ds.PropertyMap(nil)
160 if old != nil { 154 if old != nil {
161 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { 155 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
162 return 156 return
163 } 157 }
164 } 158 }
165 » » » updateIndicies(d.store, ret, oldPM, pmap) 159 » » » updateIndicies(d.head, ret, oldPM, pmap)
166 » » » ents.Set(keyBytes(serialize.WithoutContext, ret), dataBy tes) 160 » » » ents.Set(keyBytes(ret), dataBytes)
167 return 161 return
168 }() 162 }()
169 if cb != nil { 163 if cb != nil {
170 cb(k, err) 164 cb(k, err)
171 } 165 }
172 } 166 }
173 } 167 }
174 168
175 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { 169 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error {
176 ents, err := getColl() 170 ents, err := getColl()
177 if err != nil { 171 if err != nil {
178 return err 172 return err
179 } 173 }
180 if ents == nil { 174 if ents == nil {
181 for range keys { 175 for range keys {
182 cb(nil, ds.ErrNoSuchEntity) 176 cb(nil, ds.ErrNoSuchEntity)
183 } 177 }
184 return nil 178 return nil
185 } 179 }
186 180
187 for _, k := range keys { 181 for _, k := range keys {
188 » » pdata := ents.Get(keyBytes(serialize.WithoutContext, k)) 182 » » pdata := ents.Get(keyBytes(k))
189 if pdata == nil { 183 if pdata == nil {
190 cb(nil, ds.ErrNoSuchEntity) 184 cb(nil, ds.ErrNoSuchEntity)
191 continue 185 continue
192 } 186 }
193 cb(rpmWoCtx(pdata, k.Namespace())) 187 cb(rpmWoCtx(pdata, k.Namespace()))
194 } 188 }
195 return nil 189 return nil
196 } 190 }
197 191
198 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { 192 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error {
199 getMultiInner(keys, cb, func() (*memCollection, error) { 193 getMultiInner(keys, cb, func() (*memCollection, error) {
200 s := d.takeSnapshot() 194 s := d.takeSnapshot()
201 195
202 return s.GetCollection("ents:" + keys[0].Namespace()), nil 196 return s.GetCollection("ents:" + keys[0].Namespace()), nil
203 }) 197 })
204 return nil 198 return nil
205 } 199 }
206 200
207 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { 201 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) {
208 toDel := make([][]byte, 0, len(keys)) 202 toDel := make([][]byte, 0, len(keys))
209 for _, k := range keys { 203 for _, k := range keys {
210 » » toDel = append(toDel, keyBytes(serialize.WithoutContext, k)) 204 » » toDel = append(toDel, keyBytes(k))
211 } 205 }
212 ns := keys[0].Namespace() 206 ns := keys[0].Namespace()
213 207
214 d.rwlock.Lock() 208 d.rwlock.Lock()
215 defer d.rwlock.Unlock() 209 defer d.rwlock.Unlock()
216 210
217 » ents := d.store.GetCollection("ents:" + ns) 211 » ents := d.head.GetCollection("ents:" + ns)
218 212
219 for i, k := range keys { 213 for i, k := range keys {
220 if ents != nil { 214 if ents != nil {
221 incrementLocked(ents, groupMetaKey(k)) 215 incrementLocked(ents, groupMetaKey(k))
222 kb := toDel[i] 216 kb := toDel[i]
223 if old := ents.Get(kb); old != nil { 217 if old := ents.Get(kb); old != nil {
224 oldPM, err := rpmWoCtx(old, ns) 218 oldPM, err := rpmWoCtx(old, ns)
225 if err != nil { 219 if err != nil {
226 if cb != nil { 220 if cb != nil {
227 cb(err) 221 cb(err)
228 } 222 }
229 continue 223 continue
230 } 224 }
231 » » » » updateIndicies(d.store, k, oldPM, nil) 225 » » » » updateIndicies(d.head, k, oldPM, nil)
232 ents.Delete(kb) 226 ents.Delete(kb)
233 } 227 }
234 } 228 }
235 if cb != nil { 229 if cb != nil {
236 cb(nil) 230 cb(nil)
237 } 231 }
238 } 232 }
239 } 233 }
240 234
241 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { 235 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
242 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 236 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
243 237
244 txn := obj.(*txnDataStoreData) 238 txn := obj.(*txnDataStoreData)
245 for rk, muts := range txn.muts { 239 for rk, muts := range txn.muts {
246 if len(muts) == 0 { // read-only 240 if len(muts) == 0 { // read-only
247 continue 241 continue
248 } 242 }
249 » » k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize .WithContext, "", "") 243 » » prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "")
250 » » if err != nil { 244 » » memoryCorruption(err)
251 » » » panic(err) 245
252 » » } 246 » » k := prop.Value().(ds.Key)
253 247
254 entKey := "ents:" + k.Namespace() 248 entKey := "ents:" + k.Namespace()
255 mkey := groupMetaKey(k) 249 mkey := groupMetaKey(k)
256 » » entsHead := d.store.GetCollection(entKey) 250 » » entsHead := d.head.GetCollection(entKey)
257 entsSnap := txn.snap.GetCollection(entKey) 251 entsSnap := txn.snap.GetCollection(entKey)
258 vHead := curVersion(entsHead, mkey) 252 vHead := curVersion(entsHead, mkey)
259 vSnap := curVersion(entsSnap, mkey) 253 vSnap := curVersion(entsSnap, mkey)
260 if vHead != vSnap { 254 if vHead != vSnap {
261 return false 255 return false
262 } 256 }
263 } 257 }
264 return true 258 return true
265 } 259 }
266 260
267 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { 261 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
268 txn := obj.(*txnDataStoreData) 262 txn := obj.(*txnDataStoreData)
269 for _, muts := range txn.muts { 263 for _, muts := range txn.muts {
270 if len(muts) == 0 { // read-only 264 if len(muts) == 0 { // read-only
271 continue 265 continue
272 } 266 }
273 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti 267 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti
274 for _, m := range muts { 268 for _, m := range muts {
275 err := error(nil) 269 err := error(nil)
276 k := m.key 270 k := m.key
277 if m.data == nil { 271 if m.data == nil {
278 d.delMulti([]ds.Key{k}, 272 d.delMulti([]ds.Key{k},
279 func(e error) { err = e }) 273 func(e error) { err = e })
280 } else { 274 } else {
281 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, 275 d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata},
282 func(_ ds.Key, e error) { err = e }) 276 func(_ ds.Key, e error) { err = e })
283 } 277 }
284 » » » err = errors.SingleError(err) 278 » » » impossible(err)
285 » » » if err != nil {
286 » » » » panic(err)
287 » » » }
288 } 279 }
289 } 280 }
290 } 281 }
291 282
292 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { 283 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
293 return &txnDataStoreData{ 284 return &txnDataStoreData{
294 // alias to the main datastore's so that testing code can have p rimitive 285 // alias to the main datastore's so that testing code can have p rimitive
295 // access to break features inside of transactions. 286 // access to break features inside of transactions.
296 parent: d, 287 parent: d,
297 isXG: o != nil && o.XG, 288 isXG: o != nil && o.XG,
298 » » snap: d.store.Snapshot(), 289 » » snap: d.head.Snapshot(),
299 muts: map[string][]txnMutation{}, 290 muts: map[string][]txnMutation{},
300 } 291 }
301 } 292 }
302 293
303 func (d *dataStoreData) endTxn() {} 294 func (d *dataStoreData) endTxn() {}
304 295
305 /////////////////////////////// txnDataStoreData /////////////////////////////// 296 /////////////////////////////// txnDataStoreData ///////////////////////////////
306 297
307 type txnMutation struct { 298 type txnMutation struct {
308 key ds.Key 299 key ds.Key
(...skipping 22 matching lines...) Expand all
331 const xgEGLimit = 25 322 const xgEGLimit = 25
332 323
333 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } 324 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
334 func (td *txnDataStoreData) endTxn() { 325 func (td *txnDataStoreData) endTxn() {
335 if atomic.LoadInt32(&td.closed) == 1 { 326 if atomic.LoadInt32(&td.closed) == 1 {
336 panic("cannot end transaction twice") 327 panic("cannot end transaction twice")
337 } 328 }
338 atomic.StoreInt32(&td.closed, 1) 329 atomic.StoreInt32(&td.closed, 1)
339 } 330 }
340 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { 331 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
341 » panic("txnDataStoreData cannot apply transactions") 332 » impossible(fmt.Errorf("cannot creat a recursive transaction"))
dnj (Google) 2015/08/28 17:54:21 "create"
iannucci 2015/08/28 19:48:55 done
342 } 333 }
343 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { 334 func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj {
344 » panic("impossible") 335 » impossible(fmt.Errorf("cannot creat a recursive transaction"))
dnj (Google) 2015/08/28 17:54:21 "create"
iannucci 2015/08/28 19:48:55 done
336 » return nil
345 } 337 }
346 338
347 func (td *txnDataStoreData) run(f func() error) error { 339 func (td *txnDataStoreData) run(f func() error) error {
348 // Slightly different from the SDK... datastore and taskqueue each imple ment 340 // Slightly different from the SDK... datastore and taskqueue each imple ment
349 // this here, where in the SDK only datastore.transaction.Call does. 341 // this here, where in the SDK only datastore.transaction.Call does.
350 if atomic.LoadInt32(&td.closed) == 1 { 342 if atomic.LoadInt32(&td.closed) == 1 {
351 return errors.New("datastore: transaction context has expired") 343 return errors.New("datastore: transaction context has expired")
352 } 344 }
353 return f() 345 return f()
354 } 346 }
355 347
356 // writeMutation ensures that this transaction can support the given key/value 348 // writeMutation ensures that this transaction can support the given key/value
357 // mutation. 349 // mutation.
358 // 350 //
359 // if getOnly is true, don't record the actual mutation data, just ensure that 351 // if getOnly is true, don't record the actual mutation data, just ensure that
360 // the key is in an included entity group (or add an empty entry for tha t 352 // the key is in an included entity group (or add an empty entry for tha t
361 // group). 353 // group).
362 // 354 //
363 // if !getOnly && data == nil, this counts as a deletion instead of a Put. 355 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
364 // 356 //
365 // Returns an error if this key causes the transaction to cross too many entity 357 // Returns an error if this key causes the transaction to cross too many entity
366 // groups. 358 // groups.
367 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { 359 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error {
368 » rk := string(keyBytes(serialize.WithContext, dskey.Root(key))) 360 » rk := string(keyBytes(dskey.Root(key)))
369 361
370 td.Lock() 362 td.Lock()
371 defer td.Unlock() 363 defer td.Unlock()
372 364
373 if _, ok := td.muts[rk]; !ok { 365 if _, ok := td.muts[rk]; !ok {
374 limit := 1 366 limit := 1
375 if td.isXG { 367 if td.isXG {
376 limit = xgEGLimit 368 limit = xgEGLimit
377 } 369 }
378 if len(td.muts)+1 > limit { 370 if len(td.muts)+1 > limit {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
421 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { 413 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
422 for _, k := range keys { 414 for _, k := range keys {
423 err := td.writeMutation(false, k, nil) 415 err := td.writeMutation(false, k, nil)
424 if cb != nil { 416 if cb != nil {
425 cb(err) 417 cb(err)
426 } 418 }
427 } 419 }
428 return nil 420 return nil
429 } 421 }
430 422
431 func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte { 423 func keyBytes(key ds.Key) []byte {
432 » buf := &bytes.Buffer{} 424 » return serialize.ToBytes(ds.MkProperty(key))
433 » serialize.WriteKey(buf, ctx, key)
434 » return buf.Bytes()
435 } 425 }
436 426
437 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { 427 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
438 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 428 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
439 serialize.WithoutContext, globalAppID, ns) 429 serialize.WithoutContext, globalAppID, ns)
440 } 430 }
441 431
442 func rpm(data []byte) (ds.PropertyMap, error) { 432 func rpm(data []byte) (ds.PropertyMap, error) {
443 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 433 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
444 serialize.WithContext, "", "") 434 serialize.WithContext, "", "")
445 } 435 }
446 436
447 type keyitem interface { 437 type keyitem interface {
448 Key() ds.Key 438 Key() ds.Key
449 } 439 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698