Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(337)

Side by Side Diff: filter/txnBuf/ds_txn.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: change pcg timeout and codereview server to crcr.as.com Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « filter/txnBuf/ds.go ('k') | filter/txnBuf/query_merger.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/common/errors"
10 "golang.org/x/net/context"
11 )
12
13 // ErrTransactionTooLarge is returned when applying an inner transaction would
14 // cause an outer transaction to become too large.
15 var ErrTransactionTooLarge = errors.New(
16 "applying the transaction would make the parent transaction too large")
17
18 // ErrTooManyRoots is returned when executing an operation which would cause
19 // the transaction to exceed it's allotted number of entity groups.
20 var ErrTooManyRoots = errors.New(
21 "operating on too many entity groups in nested transaction")
22
23 type dsTxnBuf struct {
24 ic context.Context
25 state *txnBufState
26 }
27
28 var _ ds.RawInterface = (*dsTxnBuf)(nil)
29
30 func (d *dsTxnBuf) DecodeCursor(s string) (ds.Cursor, error) {
31 return d.state.parentDS.DecodeCursor(s)
32 }
33
34 func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro r) {
35 return d.state.parentDS.AllocateIDs(incomplete, n)
36 }
37
38 func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetM ultiCB) error {
39 data, err := d.state.getMulti(keys)
40 if err != nil {
41 return err
42 }
43
44 idxMap := []int(nil)
45 getKeys := []*ds.Key(nil)
46 getMetas := ds.MultiMetaGetter(nil)
47 lme := errors.NewLazyMultiError(len(keys))
48
49 for i, itm := range data {
50 if !itm.buffered {
51 idxMap = append(idxMap, i)
52 getKeys = append(getKeys, itm.key)
53 getMetas = append(getMetas, metas.GetSingle(i))
54 }
55 }
56
57 if len(idxMap) > 0 {
58 j := 0
59 err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.P ropertyMap, err error) {
60 if err != ds.ErrNoSuchEntity {
61 i := idxMap[j]
62 if !lme.Assign(i, err) {
63 data[i].data = pm
64 }
65 }
66 j++
67 })
68 if err != nil {
69 return err
70 }
71 }
72
73 for i, itm := range data {
74 err := lme.GetOne(i)
75 if err != nil {
76 cb(nil, err)
77 } else if itm.data == nil {
78 cb(nil, ds.ErrNoSuchEntity)
79 } else {
80 cb(itm.data, nil)
81 }
82 }
83 return nil
84 }
85
86 func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMult iCB) error {
87 lme := errors.NewLazyMultiError(len(keys))
88 realKeys := []*ds.Key(nil)
89 for i, key := range keys {
90 if key.Incomplete() {
91 start, err := d.AllocateIDs(key, 1)
92 if !lme.Assign(i, err) {
93 if realKeys == nil {
94 realKeys = make([]*ds.Key, len(keys))
95 copy(realKeys, keys)
96 }
97
98 aid, ns, toks := key.Split()
99 toks[len(toks)-1].IntID = start
100 realKeys[i] = ds.NewKeyToks(aid, ns, toks)
101 }
102 }
103 }
104 if err := lme.Get(); err != nil {
105 for _, e := range err.(errors.MultiError) {
106 if e == nil {
107 e = errors.New("putMulti failed because some key s were unable to AllocateIDs")
108 }
109 cb(nil, e)
110 }
111 return nil
112 }
113
114 if realKeys == nil {
115 realKeys = keys
116 }
117
118 err := d.state.putMulti(realKeys, vals)
119 if err != nil {
120 return err
121 }
122
123 for _, k := range realKeys {
124 cb(k, nil)
125 }
126 return nil
127 }
128
129 func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
130 if err := d.state.deleteMulti(keys); err != nil {
131 return err
132 }
133
134 for range keys {
135 cb(nil)
136 }
137 return nil
138 }
139
140 func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) {
141 // Unfortunately there's no fast-path here. We literally have to run the
142 // query and count. Fortunately we can optimize to count keys if it's no t
143 // a projection query. This will save on bandwidth a bit.
144 if len(fq.Project()) == 0 && !fq.KeysOnly() {
145 fq, err = fq.Original().KeysOnly(true).Finalize()
146 if err != nil {
147 return
148 }
149 }
150 err = d.Run(fq, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) bool {
151 count++
152 return true
153 })
154 return
155 }
156
157 func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
158 if start, end := fq.Bounds(); start != nil || end != nil {
159 return errors.New("txnBuf filter does not support query cursors" )
160 }
161
162 limit, limitSet := fq.Limit()
163 offset, _ := fq.Offset()
164 keysOnly := fq.KeysOnly()
165
166 project := fq.Project()
167
168 d.state.Lock()
169 memDS := d.state.memDS
170 parentDS := d.state.parentDS
171 sizes := d.state.entState.dup()
172 d.state.Unlock()
173
174 return runMergedQueries(fq, sizes, memDS, parentDS, func(key *ds.Key, da ta ds.PropertyMap) bool {
175 if offset > 0 {
176 offset--
177 return true
178 }
179 if limitSet {
180 if limit == 0 {
181 return false
182 }
183 limit--
184 }
185 if keysOnly {
186 data = nil
187 } else if len(project) > 0 {
188 newData := make(ds.PropertyMap, len(project))
189 for _, p := range project {
190 newData[p] = data[p]
191 }
192 data = newData
193 }
194 return cb(key, data, nil)
195 })
196 }
197
198 func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.Tra nsactionOptions) error {
199 return withTxnBuf(d.ic, cb, opts)
200 }
201
202 func (d *dsTxnBuf) Testable() ds.Testable {
203 return d.state.parentDS.Testable()
204 }
OLDNEW
« no previous file with comments | « filter/txnBuf/ds.go ('k') | filter/txnBuf/query_merger.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698