OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package tumble |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "net/http" |
| 10 |
| 11 "github.com/luci/gae/service/taskqueue" |
| 12 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/errors" |
| 14 "github.com/luci/luci-go/common/logging" |
| 15 "golang.org/x/net/context" |
| 16 ) |
| 17 |
| 18 func fireTasks(c context.Context, shards map[uint64]struct{}) bool { |
| 19 if len(shards) == 0 { |
| 20 return true |
| 21 } |
| 22 |
| 23 tq := taskqueue.Get(c) |
| 24 |
| 25 cfg := GetConfig(c) |
| 26 |
| 27 trf := cfg.TemporalRoundFactor |
| 28 eta := clock.Now(c).UTC().Add(cfg.TemporalMinDelay + trf).Round(trf) |
| 29 tasks := make([]*taskqueue.Task, 0, len(shards)) |
| 30 |
| 31 for shard := range shards { |
| 32 tasks = append(tasks, &taskqueue.Task{ |
| 33 Name: fmt.Sprintf("%d_%d", eta.Unix(), shard), |
| 34 |
| 35 Path: cfg.ProcessURL(eta, shard), |
| 36 |
| 37 ETA: eta, |
| 38 |
| 39 // TODO(riannucci): Tune RetryOptions? |
| 40 }) |
| 41 } |
| 42 |
| 43 err := tq.AddMulti(tasks, cfg.Name) |
| 44 if err != nil { |
| 45 if merr, ok := err.(errors.MultiError); ok { |
| 46 lme := errors.NewLazyMultiError(len(merr)) |
| 47 for i, err := range merr { |
| 48 if err == taskqueue.ErrTaskAlreadyAdded { |
| 49 continue |
| 50 } |
| 51 lme.Assign(i, err) |
| 52 } |
| 53 err = lme.Get() |
| 54 } |
| 55 if err != nil { |
| 56 logging.Warningf(c, "attempted to fire tasks %v, but fai
led: %s", shards, err) |
| 57 } |
| 58 } |
| 59 return err == nil |
| 60 } |
| 61 |
| 62 // FireAllTasks fires off 1 task per shard to ensure that no tumble work |
| 63 // languishes forever. This may not be needed in a constantly-loaded system with |
| 64 // good tumble key distribution. |
| 65 func FireAllTasks(c context.Context) error { |
| 66 num := GetConfig(c).NumShards |
| 67 shards := make(map[uint64]struct{}, num) |
| 68 for i := uint64(0); i < num; i++ { |
| 69 shards[i] = struct{}{} |
| 70 } |
| 71 |
| 72 err := error(nil) |
| 73 if !fireTasks(c, shards) { |
| 74 err = errors.New("unable to fire all tasks") |
| 75 } |
| 76 |
| 77 return err |
| 78 } |
| 79 |
| 80 // FireAllTasksHandler is a http handler suitable for installation into |
| 81 // a httprouter. It expects `logging` and `luci/gae` services to be installed |
| 82 // into the context. |
| 83 // |
| 84 // FireAllTasksHandler verifies that it was called within an Appengine Cron |
| 85 // request, and then invokes the FireAllTasks function. |
| 86 func FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Requ
est) { |
| 87 if r.Header.Get("X-Appengine-Cron") != "true" { |
| 88 logging.Errorf(c, "request not from cron") |
| 89 rw.WriteHeader(http.StatusUnauthorized) |
| 90 fmt.Fprintf(rw, "fire_all_tasks must be called from cron") |
| 91 return |
| 92 } |
| 93 |
| 94 if err := FireAllTasks(c); err != nil { |
| 95 rw.WriteHeader(http.StatusInternalServerError) |
| 96 fmt.Fprintf(rw, "fire_all_tasks failed: %s", err) |
| 97 } else { |
| 98 rw.Write([]byte("ok")) |
| 99 } |
| 100 } |
OLD | NEW |