OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package tumble |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "net/http" |
| 10 |
| 11 "github.com/julienschmidt/httprouter" |
| 12 "github.com/luci/gae/service/taskqueue" |
| 13 "github.com/luci/luci-go/common/clock" |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/logging" |
| 16 "golang.org/x/net/context" |
| 17 ) |
| 18 |
| 19 func fireTasks(c context.Context, shards map[uint64]struct{}) bool { |
| 20 if len(shards) == 0 { |
| 21 return true |
| 22 } |
| 23 |
| 24 tq := taskqueue.Get(c) |
| 25 |
| 26 cfg := GetConfig(c) |
| 27 |
| 28 trf := cfg.TemporalRoundFactor |
| 29 eta := clock.Now(c).UTC().Add(cfg.TemporalMinDelay + trf).Round(trf) |
| 30 tasks := make([]*taskqueue.Task, 0, len(shards)) |
| 31 |
| 32 for shard := range shards { |
| 33 tasks = append(tasks, &taskqueue.Task{ |
| 34 Name: fmt.Sprintf("%d_%d", eta.Unix(), shard), |
| 35 |
| 36 Path: cfg.ProcessURL(eta, shard), |
| 37 |
| 38 ETA: eta, |
| 39 |
| 40 // TODO(riannucci): Tune RetryOptions? |
| 41 }) |
| 42 } |
| 43 |
| 44 err := tq.AddMulti(tasks, cfg.Name) |
| 45 if err != nil { |
| 46 if merr, ok := err.(errors.MultiError); ok { |
| 47 lme := errors.NewLazyMultiError(len(merr)) |
| 48 for i, err := range merr { |
| 49 if err == taskqueue.ErrTaskAlreadyAdded { |
| 50 continue |
| 51 } |
| 52 lme.Assign(i, err) |
| 53 } |
| 54 err = lme.Get() |
| 55 } |
| 56 if err != nil { |
| 57 logging.Warningf(c, "attempted to fire tasks %v, but fai
led: %s", shards, err) |
| 58 } |
| 59 } |
| 60 return err == nil |
| 61 } |
| 62 |
| 63 // FireAllTasks fires off 1 task per shard to ensure that no tumble work |
| 64 // languishes forever. This may not be needed in a constantly-loaded system with |
| 65 // good tumble key distribution. |
| 66 func FireAllTasks(c context.Context) error { |
| 67 num := GetConfig(c).NumShards |
| 68 shards := make(map[uint64]struct{}, num) |
| 69 for i := uint64(0); i < num; i++ { |
| 70 shards[i] = struct{}{} |
| 71 } |
| 72 |
| 73 err := error(nil) |
| 74 if !fireTasks(c, shards) { |
| 75 err = errors.New("unable to fire all tasks") |
| 76 } |
| 77 |
| 78 return err |
| 79 } |
| 80 |
| 81 // FireAllTasksHandler is a http handler suitable for installation into |
| 82 // a httprouter. It expects `logging` and `luci/gae` services to be installed |
| 83 // into the context. |
| 84 // |
| 85 // FireAllTasksHandler verifies that it was called within an Appengine Cron |
| 86 // request, and then invokes the FireAllTasks function. |
| 87 func FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Requ
est, _ httprouter.Params) { |
| 88 if err := FireAllTasks(c); err != nil { |
| 89 rw.WriteHeader(http.StatusInternalServerError) |
| 90 fmt.Fprintf(rw, "fire_all_tasks failed: %s", err) |
| 91 } else { |
| 92 rw.Write([]byte("ok")) |
| 93 } |
| 94 } |
OLD | NEW |