Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Side by Side Diff: filter/txnBuf/ds_txn.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: Make tests faster, no lock for queries Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/common/errors"
10 "golang.org/x/net/context"
11 )
12
13 // ErrTransactionTooLarge is returned when applying an inner transaction would
14 // cause an outer transaction to become too large.
15 var ErrTransactionTooLarge = errors.New(
16 "applying the transaction would make the parent transaction too large")
17
18 type dsTxnBuf struct {
19 ic context.Context
20 state *txnBufState
21 }
22
23 var _ ds.RawInterface = (*dsTxnBuf)(nil)
24
25 func (d *dsTxnBuf) DecodeCursor(s string) (ds.Cursor, error) {
26 return d.state.parentDS.DecodeCursor(s)
27 }
28
29 func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro r) {
30 return d.state.parentDS.AllocateIDs(incomplete, n)
31 }
32
33 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error {
34 data, err := d.state.getMulti(keys)
35 if err != nil {
36 return err
37 }
38
39 idxMap := []int(nil)
40 getKeys := []*ds.Key(nil)
41 getMetas := ds.MultiMetaGetter(nil)
42 lme := errors.NewLazyMultiError(len(keys))
43
44 for i, itm := range data {
45 if !itm.buffered {
46 idxMap = append(idxMap, i)
47 getKeys = append(getKeys, itm.key)
48 getMetas = append(getMetas, metas.GetSingle(i))
49 }
50 }
51
52 if len(idxMap) > 0 {
53 j := 0
54 err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) {
55 if err != ds.ErrNoSuchEntity {
56 i := idxMap[j]
57 if !lme.Assign(i, err) {
58 data[i].data = pm
59 }
60 }
61 j++
62 })
63 if err != nil {
64 return err
65 }
66 }
67
68 for i, itm := range data {
69 err := lme.GetOne(i)
70 if err != nil {
71 cb(nil, err)
72 } else if itm.data == nil {
73 cb(nil, ds.ErrNoSuchEntity)
74 } else {
75 cb(itm.data, nil)
76 }
77 }
78 return nil
79 }
80
81 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error {
82 lme := errors.NewLazyMultiError(len(keys))
83 realKeys := []*ds.Key(nil)
84 for i, key := range keys {
85 if key.Incomplete() {
86 start, err := d.AllocateIDs(key, 1)
87 if !lme.Assign(i, err) {
88 if realKeys == nil {
89 realKeys = make([]*ds.Key, len(keys))
90 copy(realKeys, keys)
91 }
92
93 aid, ns, toks := key.Split()
94 toks[len(toks)-1].IntID = start
95 realKeys[i] = ds.NewKeyToks(aid, ns, toks)
96 }
97 }
98 }
99 if err := lme.Get(); err != nil {
100 for _, e := range err.(errors.MultiError) {
101 if e == nil {
102 e = errors.New("putMulti failed because some key s were unable to AllocateIDs")
103 }
104 cb(nil, e)
105 }
106 return nil
107 }
108
109 if realKeys == nil {
110 realKeys = keys
111 }
112
113 err := d.state.putMulti(realKeys, vals)
114 if err != nil {
115 return err
116 }
117
118 for _, k := range realKeys {
119 cb(k, nil)
120 }
121 return nil
122 }
123
124 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
125 err := d.state.deleteMulti(keys)
126 if err != nil {
127 return err
128 }
129
130 for range keys {
131 cb(nil)
132 }
133 return nil
134 }
135
136 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) {
137 // Unfortunately there's no fast-path here. We literally have to run the
138 // query and count. Fortunately we can optimize to count keys if it's no t
139 // a projection query. This will save on bandwidth a bit.
140 if len(fq.Project()) == 0 && !fq.KeysOnly() {
141 fq, err = fq.Original().KeysOnly(true).Finalize()
142 if err != nil {
143 return
144 }
145 }
146 err = d.Run(fq, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) bool {
147 count++
148 return true
149 })
150 return
151 }
152
153 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
154 if start, end := fq.Bounds(); start != nil || end != nil {
155 return errors.New("txnBuf filter does not support query cursors" )
156 }
157
158 limit, limitSet := fq.Limit()
159 offset, _ := fq.Offset()
160 keysOnly := fq.KeysOnly()
161
162 project := fq.Project()
163
164 d.state.Lock()
165 memDS := d.state.memDS
166 parentDS := d.state.parentDS
167 sizes := d.state.entState.dup()
168 d.state.Unlock()
169
170 return runMergedQueries(fq, sizes, memDS, parentDS, func(key *ds.Key, da ta ds.PropertyMap) bool {
171 if offset > 0 {
172 offset--
173 return true
174 }
175 if limitSet {
176 if limit == 0 {
177 return false
178 }
179 limit--
180 }
181 if keysOnly {
182 data = nil
183 } else if len(project) > 0 {
184 newData := make(ds.PropertyMap, len(project))
185 for _, p := range project {
186 newData[p] = data[p]
187 }
188 data = newData
189 }
190 return cb(key, data, nil)
191 })
192 }
193
194 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error {
195 return withTxnBuf(d.ic, cb, opts)
196 }
197
198 func (d *dsTxnBuf) Testable() ds.Testable {
199 return d.state.parentDS.Testable()
200 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698