OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package tumble | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 "math" | |
11 "net/http" | |
12 "strconv" | |
13 "sync/atomic" | |
14 "time" | |
15 | |
16 "github.com/julienschmidt/httprouter" | |
17 "github.com/luci/gae/filter/txnBuf" | |
18 "github.com/luci/gae/service/datastore" | |
19 "github.com/luci/gae/service/datastore/serialize" | |
20 "github.com/luci/gae/service/memcache" | |
21 "github.com/luci/luci-go/appengine/memlock" | |
22 "github.com/luci/luci-go/common/clock" | |
23 "github.com/luci/luci-go/common/errors" | |
24 "github.com/luci/luci-go/common/logging" | |
25 "github.com/luci/luci-go/common/parallel" | |
26 "github.com/luci/luci-go/common/stringset" | |
27 "golang.org/x/net/context" | |
28 ) | |
29 | |
30 // expandedShardBounds returns the boundary of the expandedShard query that | |
31 // currently corresponds to this shard number. If Shard is < 0 or > NumShards | |
32 // (the currently configured number of shards), this will return a low > high. | |
33 // Otherwise low < high. | |
34 func expandedShardBounds(c context.Context, shard uint64) (low, high int64) { | |
35 cfg := GetConfig(c) | |
36 | |
37 if shard < 0 || uint64(shard) >= cfg.NumShards { | |
38 logging.Warningf(c, "Invalid shard: %d", shard) | |
39 // return inverted bounds | |
40 return 0, -1 | |
41 } | |
42 | |
43 expandedShardsPerShard := int64(math.MaxUint64 / cfg.NumShards) | |
44 low = math.MinInt64 + (int64(shard) * expandedShardsPerShard) | |
45 if uint64(shard) == cfg.NumShards-1 { | |
46 high = math.MaxInt64 | |
47 } else { | |
48 high = low + expandedShardsPerShard | |
49 } | |
50 return | |
51 } | |
52 | |
53 var dustSettleTimeout = 2 * time.Second | |
54 | |
55 // ProcessShardHandler is a http handler suitable for installation into | |
56 // a httprouter. It expects `logging` and `luci/gae` services to be installed | |
57 // into the context. | |
58 // | |
59 // ProcessShardHandler verifies that its being run as a taskqueue task and that | |
60 // the following parameters exist and are well-formed: | |
61 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. | |
62 // * shard_id: decimal-encoded shard identifier. | |
63 // | |
64 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. | |
65 func ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Requ est, p httprouter.Params) { | |
66 if _, ok := r.Header["X-AppEngine-QueueName"]; !ok { | |
Vadim Sh.
2015/10/12 20:05:14
ah.. you do it here... still, consider a middlewar
| |
67 rw.WriteHeader(http.StatusUnauthorized) | |
68 fmt.Fprintf(rw, "process_task must be called from taskqueue") | |
69 return | |
70 } | |
71 | |
72 tstampStr := p.ByName("timestamp") | |
73 sidStr := p.ByName("shard_id") | |
74 | |
75 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) | |
76 if err != nil { | |
77 logging.Errorf(c, "bad timestamp %q", tstampStr) | |
78 rw.WriteHeader(http.StatusNotFound) | |
79 fmt.Fprintf(rw, "bad timestamp") | |
80 return | |
81 } | |
82 | |
83 sid, err := strconv.ParseUint(sidStr, 10, 64) | |
84 if err != nil { | |
85 logging.Errorf(c, "bad shardID %q", tstampStr) | |
86 rw.WriteHeader(http.StatusNotFound) | |
87 fmt.Fprintf(rw, "bad shardID") | |
88 return | |
89 } | |
90 | |
91 err = ProcessShard(c, time.Unix(tstamp, 0).UTC(), sid) | |
92 if err != nil { | |
Vadim Sh.
2015/10/12 20:05:14
in luci-cron I use errors.IsTransient (and errors.
iannucci
2015/10/13 02:39:46
The error here is purely advisory; cron and/or tas
| |
93 rw.WriteHeader(http.StatusInternalServerError) | |
94 fmt.Fprintf(rw, "error: %s", err) | |
95 } else { | |
96 rw.Write([]byte("ok")) | |
97 } | |
98 } | |
99 | |
100 // ProcessShard is the tumble backend endpoint. This accepts a shard number whic h | |
101 // is expected to be < GlobalConfig.NumShards. | |
102 func ProcessShard(c context.Context, timestamp time.Time, shard uint64) error { | |
103 low, high := expandedShardBounds(c, shard) | |
104 if low > high { | |
105 return nil | |
106 } | |
107 | |
108 l := logging.Get(logging.SetField(c, "shard", shard)) | |
109 | |
110 cfg := GetConfig(c) | |
111 | |
112 lockKey := fmt.Sprintf("%s.%d.lock", cfg.Name, shard) | |
113 clientID := fmt.Sprintf("%d_%d", timestamp.Unix(), shard) | |
114 | |
115 // this last key allows buffered tasks to early exit if some other shard | |
116 // processor has already processed past this task's target timestamp. | |
117 lastKey := fmt.Sprintf("%s.%d.last", cfg.Name, shard) | |
118 mc := memcache.Get(c) | |
119 lastItm, err := mc.Get(lastKey) | |
120 if err != nil { | |
121 if err != memcache.ErrCacheMiss { | |
122 l.Warningf("couldn't obtain last timestamp: %s", err) | |
123 } | |
124 } else { | |
125 val := lastItm.Value() | |
126 last, err := serialize.ReadTime(bytes.NewBuffer(val)) | |
127 if err != nil { | |
128 l.Warningf("could not decode timestamp %v: %s", val, err ) | |
129 } else { | |
130 last = last.Add(cfg.TemporalRoundFactor) | |
131 if timestamp.After(last) { | |
132 l.Infof("early exit, %s > %s", timestamp, last) | |
133 return nil | |
134 } | |
135 } | |
136 } | |
137 err = nil | |
138 | |
139 q := datastore.NewQuery("tumble.Mutation"). | |
140 Gte("ExpandedShard", low).Lte("ExpandedShard", high). | |
141 Project("TargetRoot").Distinct(true). | |
142 Limit(cfg.ProcessMaxBatchSize) | |
143 | |
144 banSets := map[string]stringset.Set{} | |
145 | |
146 limitSemaphore := make(chan struct{}, cfg.NumGoroutines) | |
147 | |
148 for try := 0; try < 2; try++ { | |
149 err = memlock.TryWithLock(c, lockKey, clientID, func(c context.C ontext) error { | |
150 l.Infof("Got lock (try %d)", try) | |
151 | |
152 for { | |
153 processCounters := []*int64{} | |
154 err := parallel.FanOutIn(func(ch chan<- func() e rror) { | |
155 err := datastore.Get(c).Run(q, func(pm d atastore.PropertyMap, _ datastore.CursorCB) bool { | |
156 root := pm["TargetRoot"][0].Valu e().(*datastore.Key) | |
157 encRoot := root.Encode() | |
158 | |
159 // TODO(riannucci): make banSets remove keys from the banSet which | |
160 // weren't hit. Once they stop s howing up, they'll never show up | |
161 // again. | |
162 | |
163 bs := banSets[encRoot] | |
164 if bs == nil { | |
165 bs = stringset.New(0) | |
166 banSets[encRoot] = bs | |
167 } | |
168 counter := new(int64) | |
169 processCounters = append(process Counters, counter) | |
170 | |
171 ch <- func() error { | |
172 limitSemaphore <- struct {}{} | |
173 defer func() { | |
174 <-limitSemaphore | |
175 }() | |
176 return processRoot(c, ro ot, bs, counter) | |
177 } | |
178 | |
179 select { | |
180 case <-c.Done(): | |
181 l.Warningf("Lost lock!") | |
182 return false | |
183 default: | |
184 return true | |
185 } | |
186 }) | |
187 if err != nil { | |
188 l.Errorf("Failure to query: %s", err) | |
189 ch <- func() error { | |
190 return err | |
191 } | |
192 } | |
193 }) | |
194 if err != nil { | |
195 return err | |
196 } | |
197 numProcessed := int64(0) | |
198 for _, n := range processCounters { | |
199 numProcessed += *n | |
200 } | |
201 l.Infof("cumulatively processed %d items", numPr ocessed) | |
202 if numProcessed == 0 { | |
203 break | |
204 } | |
205 | |
206 err = mc.Set(mc.NewItem(lastKey).SetValue(serial ize.ToBytes(clock.Now(c)))) | |
207 if err != nil { | |
208 l.Warningf("could not update last proces s memcache key %s: %s", lastKey, err) | |
209 } | |
210 | |
211 clock.Sleep(c, dustSettleTimeout) | |
212 } | |
213 return nil | |
214 }) | |
215 if err != memlock.ErrFailedToLock { | |
216 break | |
217 } | |
218 l.Infof("Couldn't obtain lock (try %d) (sleeping 2s)", try+1) | |
219 clock.Sleep(c, time.Second*2) | |
220 } | |
221 if err == memlock.ErrFailedToLock { | |
222 l.Infof("Couldn't obtain lock (giving up): %s", err) | |
223 err = nil | |
224 } | |
225 return err | |
226 } | |
227 | |
228 func getBatchByRoot(c context.Context, root *datastore.Key, banSet stringset.Set ) ([]*realMutation, error) { | |
229 cfg := GetConfig(c) | |
230 ds := datastore.Get(c) | |
231 q := datastore.NewQuery("tumble.Mutation").Eq("TargetRoot", root) | |
232 toFetch := make([]*realMutation, 0, cfg.ProcessMaxBatchSize) | |
233 err := ds.Run(q, func(k *datastore.Key, _ datastore.CursorCB) bool { | |
234 if !banSet.Has(k.Encode()) { | |
235 toFetch = append(toFetch, &realMutation{ | |
236 ID: k.StringID(), | |
237 Parent: k.Parent(), | |
238 }) | |
239 } | |
240 return len(toFetch) < cap(toFetch) | |
241 }) | |
242 return toFetch, err | |
243 } | |
244 | |
245 func loadFilteredMutations(c context.Context, rms []*realMutation) ([]*datastore .Key, []Mutation, error) { | |
246 ds := datastore.Get(c) | |
247 | |
248 mutKeys := make([]*datastore.Key, 0, len(rms)) | |
249 muts := make([]Mutation, 0, len(rms)) | |
250 err := ds.GetMulti(rms) | |
251 me, ok := err.(errors.MultiError) | |
252 if !ok && err != nil { | |
253 return nil, nil, err | |
254 } | |
255 | |
256 for i, rm := range rms { | |
257 err = nil | |
258 if me != nil { | |
259 err = me[i] | |
260 } | |
261 if err == nil { | |
262 m, err := rm.GetMutation() | |
263 if err != nil { | |
264 logging.Errorf(c, "couldn't load mutation: %s", err) | |
265 continue | |
266 } | |
267 muts = append(muts, m) | |
268 mutKeys = append(mutKeys, ds.KeyForObj(rm)) | |
269 } else if err != datastore.ErrNoSuchEntity { | |
270 return nil, nil, me | |
271 } | |
272 } | |
273 | |
274 return mutKeys, muts, nil | |
275 } | |
276 | |
277 func processRoot(c context.Context, root *datastore.Key, banSet stringset.Set, c ounter *int64) error { | |
278 l := logging.Get(c) | |
279 | |
280 toFetch, err := getBatchByRoot(c, root, banSet) | |
281 if err != nil || len(toFetch) == 0 { | |
282 return err | |
283 } | |
284 | |
285 mutKeys, muts, err := loadFilteredMutations(c, toFetch) | |
286 if err != nil { | |
287 return err | |
288 } | |
289 | |
290 select { | |
291 case <-c.Done(): | |
292 l.Warningf("Lost lock during processRoot") | |
293 default: | |
294 } | |
295 | |
296 allShards := map[uint64]struct{}{} | |
297 | |
298 toDel := make([]*datastore.Key, 0, len(muts)) | |
299 numMuts := uint64(0) | |
300 err = datastore.Get(txnBuf.FilterRDS(c)).RunInTransaction(func(c context .Context) error { | |
301 toDel = toDel[:0] | |
302 numMuts = 0 | |
303 | |
304 for i, m := range muts { | |
305 shards, numNewMuts, err := enterTransactionInternal(c, r oot, m.RollForward) | |
306 if err != nil { | |
307 l.Errorf("Executing decoded gob(%T) failed: %q: %+v", m, err, m) | |
308 continue | |
309 } | |
310 toDel = append(toDel, mutKeys[i]) | |
311 numMuts += uint64(numNewMuts) | |
312 for shard := range shards { | |
313 allShards[shard] = struct{}{} | |
314 } | |
315 } | |
316 | |
317 return nil | |
318 }, nil) | |
319 if err != nil { | |
320 l.Errorf("failed running transaction: %s", err) | |
321 return err | |
322 } | |
323 fireTasks(c, allShards) | |
324 l.Infof("successfully processed %d mutations, adding %d more", len(toDel ), numMuts) | |
325 | |
326 if len(toDel) > 0 { | |
327 atomic.StoreInt64(counter, int64(len(toDel))) | |
328 | |
329 for _, k := range toDel { | |
330 banSet.Add(k.Encode()) | |
331 } | |
332 if err := datastore.Get(c).DeleteMulti(toDel); err != nil { | |
333 l.Warningf("error deleting finished mutations: %s", err) | |
334 } | |
335 } | |
336 | |
337 return nil | |
338 } | |
OLD | NEW |