Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(307)

Side by Side Diff: appengine/tumble/fire_tasks.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "fmt"
9 "net/http"
10
11 "github.com/luci/gae/service/taskqueue"
12 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/errors"
14 "github.com/luci/luci-go/common/logging"
15 "golang.org/x/net/context"
16 )
17
18 func fireTasks(c context.Context, shards map[uint64]struct{}) bool {
19 if len(shards) == 0 {
20 return true
21 }
22
23 tq := taskqueue.Get(c)
24
25 cfg := GetConfig(c)
26
27 trf := cfg.TemporalRoundFactor
28 eta := clock.Now(c).UTC().Add(cfg.TemporalMinDelay + trf).Round(trf)
29 tasks := make([]*taskqueue.Task, 0, len(shards))
30
31 for shard := range shards {
32 tasks = append(tasks, &taskqueue.Task{
33 Name: fmt.Sprintf("%d_%d", eta.Unix(), shard),
34
35 Path: cfg.ProcessURL(eta, shard),
36
37 ETA: eta,
38
39 // TODO(riannucci): Tune RetryOptions?
40 })
41 }
42
43 err := tq.AddMulti(tasks, cfg.Name)
44 if err != nil {
45 if merr, ok := err.(errors.MultiError); ok {
46 lme := errors.NewLazyMultiError(len(merr))
47 for i, err := range merr {
48 if err == taskqueue.ErrTaskAlreadyAdded {
49 continue
50 }
51 lme.Assign(i, err)
52 }
53 err = lme.Get()
54 }
55 if err != nil {
56 logging.Warningf(c, "attempted to fire tasks %v, but fai led: %s", shards, err)
57 }
58 }
59 return err == nil
60 }
61
62 // FireAllTasks fires off 1 task per shard to ensure that no tumble work
63 // languishes forever. This may not be needed in a constantly-loaded system with
64 // good tumble key distribution.
65 func FireAllTasks(c context.Context) error {
66 num := GetConfig(c).NumShards
67 shards := make(map[uint64]struct{}, num)
68 for i := uint64(0); i < num; i++ {
69 shards[i] = struct{}{}
70 }
71
72 err := error(nil)
73 if !fireTasks(c, shards) {
74 err = errors.New("unable to fire all tasks")
75 }
76
77 return err
78 }
79
80 // FireAllTasksHandler is a http handler suitable for installation into
81 // a httprouter. It expects `logging` and `luci/gae` services to be installed
82 // into the context.
83 //
84 // FireAllTasksHandler verifies that it was called within an Appengine Cron
85 // request, and then invokes the FireAllTasks function.
86 func FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Requ est) {
87 if r.Header.Get("X-Appengine-Cron") != "true" {
88 logging.Errorf(c, "request not from cron")
89 rw.WriteHeader(http.StatusUnauthorized)
90 fmt.Fprintf(rw, "fire_all_tasks must be called from cron")
91 return
92 }
93
94 if err := FireAllTasks(c); err != nil {
95 rw.WriteHeader(http.StatusInternalServerError)
96 fmt.Fprintf(rw, "fire_all_tasks failed: %s", err)
97 } else {
98 rw.Write([]byte("ok"))
99 }
100 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698