Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(680)

Side by Side Diff: appengine/tumble/process.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: use exists Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/tumble/model_mutation.go ('k') | appengine/tumble/tumble.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "bytes"
9 "fmt"
10 "math"
11 "net/http"
12 "strconv"
13 "sync/atomic"
14 "time"
15
16 "github.com/julienschmidt/httprouter"
17 "github.com/luci/gae/filter/txnBuf"
18 "github.com/luci/gae/service/datastore"
19 "github.com/luci/gae/service/datastore/serialize"
20 "github.com/luci/gae/service/memcache"
21 "github.com/luci/luci-go/appengine/memlock"
22 "github.com/luci/luci-go/common/clock"
23 "github.com/luci/luci-go/common/errors"
24 "github.com/luci/luci-go/common/logging"
25 "github.com/luci/luci-go/common/parallel"
26 "github.com/luci/luci-go/common/stringset"
27 "golang.org/x/net/context"
28 )
29
30 // expandedShardBounds returns the boundary of the expandedShard query that
31 // currently corresponds to this shard number. If Shard is < 0 or > NumShards
32 // (the currently configured number of shards), this will return a low > high.
33 // Otherwise low < high.
34 func expandedShardBounds(c context.Context, shard uint64) (low, high int64) {
35 cfg := GetConfig(c)
36
37 if shard < 0 || uint64(shard) >= cfg.NumShards {
38 logging.Warningf(c, "Invalid shard: %d", shard)
39 // return inverted bounds
40 return 0, -1
41 }
42
43 expandedShardsPerShard := int64(math.MaxUint64 / cfg.NumShards)
44 low = math.MinInt64 + (int64(shard) * expandedShardsPerShard)
45 if uint64(shard) == cfg.NumShards-1 {
46 high = math.MaxInt64
47 } else {
48 high = low + expandedShardsPerShard
49 }
50 return
51 }
52
53 var dustSettleTimeout = 2 * time.Second
54
55 // ProcessShardHandler is a http handler suitable for installation into
56 // a httprouter. It expects `logging` and `luci/gae` services to be installed
57 // into the context.
58 //
59 // ProcessShardHandler verifies that its being run as a taskqueue task and that
60 // the following parameters exist and are well-formed:
61 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
62 // * shard_id: decimal-encoded shard identifier.
63 //
64 // ProcessShardHandler then invokes ProcessShard with the parsed parameters.
65 func ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Requ est, p httprouter.Params) {
66 tstampStr := p.ByName("timestamp")
67 sidStr := p.ByName("shard_id")
68
69 tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
70 if err != nil {
71 logging.Errorf(c, "bad timestamp %q", tstampStr)
72 rw.WriteHeader(http.StatusNotFound)
73 fmt.Fprintf(rw, "bad timestamp")
74 return
75 }
76
77 sid, err := strconv.ParseUint(sidStr, 10, 64)
78 if err != nil {
79 logging.Errorf(c, "bad shardID %q", tstampStr)
80 rw.WriteHeader(http.StatusNotFound)
81 fmt.Fprintf(rw, "bad shardID")
82 return
83 }
84
85 err = ProcessShard(c, time.Unix(tstamp, 0).UTC(), sid)
86 if err != nil {
87 rw.WriteHeader(http.StatusInternalServerError)
88 fmt.Fprintf(rw, "error: %s", err)
89 } else {
90 rw.Write([]byte("ok"))
91 }
92 }
93
94 // ProcessShard is the tumble backend endpoint. This accepts a shard number whic h
95 // is expected to be < GlobalConfig.NumShards.
96 func ProcessShard(c context.Context, timestamp time.Time, shard uint64) error {
97 low, high := expandedShardBounds(c, shard)
98 if low > high {
99 return nil
100 }
101
102 l := logging.Get(logging.SetField(c, "shard", shard))
103
104 cfg := GetConfig(c)
105
106 lockKey := fmt.Sprintf("%s.%d.lock", cfg.Name, shard)
107 clientID := fmt.Sprintf("%d_%d", timestamp.Unix(), shard)
108
109 // this last key allows buffered tasks to early exit if some other shard
110 // processor has already processed past this task's target timestamp.
111 lastKey := fmt.Sprintf("%s.%d.last", cfg.Name, shard)
112 mc := memcache.Get(c)
113 lastItm, err := mc.Get(lastKey)
114 if err != nil {
115 if err != memcache.ErrCacheMiss {
116 l.Warningf("couldn't obtain last timestamp: %s", err)
117 }
118 } else {
119 val := lastItm.Value()
120 last, err := serialize.ReadTime(bytes.NewBuffer(val))
121 if err != nil {
122 l.Warningf("could not decode timestamp %v: %s", val, err )
123 } else {
124 last = last.Add(cfg.TemporalRoundFactor)
125 if timestamp.After(last) {
126 l.Infof("early exit, %s > %s", timestamp, last)
127 return nil
128 }
129 }
130 }
131 err = nil
132
133 q := datastore.NewQuery("tumble.Mutation").
134 Gte("ExpandedShard", low).Lte("ExpandedShard", high).
135 Project("TargetRoot").Distinct(true).
136 Limit(cfg.ProcessMaxBatchSize)
137
138 banSets := map[string]stringset.Set{}
139
140 limitSemaphore := make(chan struct{}, cfg.NumGoroutines)
141
142 for try := 0; try < 2; try++ {
143 err = memlock.TryWithLock(c, lockKey, clientID, func(c context.C ontext) error {
144 l.Infof("Got lock (try %d)", try)
145
146 for {
147 processCounters := []*int64{}
148 err := parallel.FanOutIn(func(ch chan<- func() e rror) {
149 err := datastore.Get(c).Run(q, func(pm d atastore.PropertyMap, _ datastore.CursorCB) bool {
150 root := pm["TargetRoot"][0].Valu e().(*datastore.Key)
151 encRoot := root.Encode()
152
153 // TODO(riannucci): make banSets remove keys from the banSet which
154 // weren't hit. Once they stop s howing up, they'll never show up
155 // again.
156
157 bs := banSets[encRoot]
158 if bs == nil {
159 bs = stringset.New(0)
160 banSets[encRoot] = bs
161 }
162 counter := new(int64)
163 processCounters = append(process Counters, counter)
164
165 ch <- func() error {
166 limitSemaphore <- struct {}{}
167 defer func() {
168 <-limitSemaphore
169 }()
170 return processRoot(c, ro ot, bs, counter)
171 }
172
173 select {
174 case <-c.Done():
175 l.Warningf("Lost lock!")
176 return false
177 default:
178 return true
179 }
180 })
181 if err != nil {
182 l.Errorf("Failure to query: %s", err)
183 ch <- func() error {
184 return err
185 }
186 }
187 })
188 if err != nil {
189 return err
190 }
191 numProcessed := int64(0)
192 for _, n := range processCounters {
193 numProcessed += *n
194 }
195 l.Infof("cumulatively processed %d items", numPr ocessed)
196 if numProcessed == 0 {
197 break
198 }
199
200 err = mc.Set(mc.NewItem(lastKey).SetValue(serial ize.ToBytes(clock.Now(c))))
201 if err != nil {
202 l.Warningf("could not update last proces s memcache key %s: %s", lastKey, err)
203 }
204
205 clock.Sleep(c, dustSettleTimeout)
206 }
207 return nil
208 })
209 if err != memlock.ErrFailedToLock {
210 break
211 }
212 l.Infof("Couldn't obtain lock (try %d) (sleeping 2s)", try+1)
213 clock.Sleep(c, time.Second*2)
214 }
215 if err == memlock.ErrFailedToLock {
216 l.Infof("Couldn't obtain lock (giving up): %s", err)
217 err = nil
218 }
219 return err
220 }
221
222 func getBatchByRoot(c context.Context, root *datastore.Key, banSet stringset.Set ) ([]*realMutation, error) {
223 cfg := GetConfig(c)
224 ds := datastore.Get(c)
225 q := datastore.NewQuery("tumble.Mutation").Eq("TargetRoot", root)
226 toFetch := make([]*realMutation, 0, cfg.ProcessMaxBatchSize)
227 err := ds.Run(q, func(k *datastore.Key, _ datastore.CursorCB) bool {
228 if !banSet.Has(k.Encode()) {
229 toFetch = append(toFetch, &realMutation{
230 ID: k.StringID(),
231 Parent: k.Parent(),
232 })
233 }
234 return len(toFetch) < cap(toFetch)
235 })
236 return toFetch, err
237 }
238
239 func loadFilteredMutations(c context.Context, rms []*realMutation) ([]*datastore .Key, []Mutation, error) {
240 ds := datastore.Get(c)
241
242 mutKeys := make([]*datastore.Key, 0, len(rms))
243 muts := make([]Mutation, 0, len(rms))
244 err := ds.GetMulti(rms)
245 me, ok := err.(errors.MultiError)
246 if !ok && err != nil {
247 return nil, nil, err
248 }
249
250 for i, rm := range rms {
251 err = nil
252 if me != nil {
253 err = me[i]
254 }
255 if err == nil {
256 if rm.Version != getAppVersion(c) {
257 logging.Fields{
258 "mut_version": rm.Version,
259 "cur_version": getAppVersion(c),
260 }.Warningf(c, "loading mutation with different c ode version")
261 }
262 m, err := rm.GetMutation()
263 if err != nil {
264 logging.Errorf(c, "couldn't load mutation: %s", err)
265 continue
266 }
267 muts = append(muts, m)
268 mutKeys = append(mutKeys, ds.KeyForObj(rm))
269 } else if err != datastore.ErrNoSuchEntity {
270 return nil, nil, me
271 }
272 }
273
274 return mutKeys, muts, nil
275 }
276
277 func processRoot(c context.Context, root *datastore.Key, banSet stringset.Set, c ounter *int64) error {
278 l := logging.Get(c)
279
280 toFetch, err := getBatchByRoot(c, root, banSet)
281 if err != nil || len(toFetch) == 0 {
282 return err
283 }
284
285 mutKeys, muts, err := loadFilteredMutations(c, toFetch)
286 if err != nil {
287 return err
288 }
289
290 select {
291 case <-c.Done():
292 l.Warningf("Lost lock during processRoot")
293 default:
294 }
295
296 allShards := map[uint64]struct{}{}
297
298 toDel := make([]*datastore.Key, 0, len(muts))
299 numMuts := uint64(0)
300 err = datastore.Get(txnBuf.FilterRDS(c)).RunInTransaction(func(c context .Context) error {
301 toDel = toDel[:0]
302 numMuts = 0
303
304 for i, m := range muts {
305 shards, numNewMuts, err := enterTransactionInternal(c, r oot, m.RollForward)
306 if err != nil {
307 l.Errorf("Executing decoded gob(%T) failed: %q: %+v", m, err, m)
308 continue
309 }
310 toDel = append(toDel, mutKeys[i])
311 numMuts += uint64(numNewMuts)
312 for shard := range shards {
313 allShards[shard] = struct{}{}
314 }
315 }
316
317 return nil
318 }, nil)
319 if err != nil {
320 l.Errorf("failed running transaction: %s", err)
321 return err
322 }
323 fireTasks(c, allShards)
324 l.Infof("successfully processed %d mutations, adding %d more", len(toDel ), numMuts)
325
326 if len(toDel) > 0 {
327 atomic.StoreInt64(counter, int64(len(toDel)))
328
329 for _, k := range toDel {
330 banSet.Add(k.Encode())
331 }
332 if err := datastore.Get(c).DeleteMulti(toDel); err != nil {
333 l.Warningf("error deleting finished mutations: %s", err)
334 }
335 }
336
337 return nil
338 }
OLDNEW
« no previous file with comments | « appengine/tumble/model_mutation.go ('k') | appengine/tumble/tumble.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698