Index: appengine/tumble/fire_tasks.go |
diff --git a/appengine/tumble/fire_tasks.go b/appengine/tumble/fire_tasks.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6fcfe578a2a09c12f7e5864124f1f1ddb57c183a |
--- /dev/null |
+++ b/appengine/tumble/fire_tasks.go |
@@ -0,0 +1,94 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package tumble |
+ |
+import ( |
+ "fmt" |
+ "net/http" |
+ |
+ "github.com/julienschmidt/httprouter" |
+ "github.com/luci/gae/service/taskqueue" |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/errors" |
+ "github.com/luci/luci-go/common/logging" |
+ "golang.org/x/net/context" |
+) |
+ |
+func fireTasks(c context.Context, shards map[uint64]struct{}) bool { |
+ if len(shards) == 0 { |
+ return true |
+ } |
+ |
+ tq := taskqueue.Get(c) |
+ |
+ cfg := GetConfig(c) |
+ |
+ trf := cfg.TemporalRoundFactor |
+ eta := clock.Now(c).UTC().Add(cfg.TemporalMinDelay + trf).Round(trf) |
+ tasks := make([]*taskqueue.Task, 0, len(shards)) |
+ |
+ for shard := range shards { |
+ tasks = append(tasks, &taskqueue.Task{ |
+ Name: fmt.Sprintf("%d_%d", eta.Unix(), shard), |
+ |
+ Path: cfg.ProcessURL(eta, shard), |
+ |
+ ETA: eta, |
+ |
+ // TODO(riannucci): Tune RetryOptions? |
+ }) |
+ } |
+ |
+ err := tq.AddMulti(tasks, cfg.Name) |
+ if err != nil { |
+ if merr, ok := err.(errors.MultiError); ok { |
+ lme := errors.NewLazyMultiError(len(merr)) |
+ for i, err := range merr { |
+ if err == taskqueue.ErrTaskAlreadyAdded { |
+ continue |
+ } |
+ lme.Assign(i, err) |
+ } |
+ err = lme.Get() |
+ } |
+ if err != nil { |
+ logging.Warningf(c, "attempted to fire tasks %v, but failed: %s", shards, err) |
+ } |
+ } |
+ return err == nil |
+} |
+ |
+// FireAllTasks fires off 1 task per shard to ensure that no tumble work |
+// languishes forever. This may not be needed in a constantly-loaded system with |
+// good tumble key distribution. |
+func FireAllTasks(c context.Context) error { |
+ num := GetConfig(c).NumShards |
+ shards := make(map[uint64]struct{}, num) |
+ for i := uint64(0); i < num; i++ { |
+ shards[i] = struct{}{} |
+ } |
+ |
+ err := error(nil) |
+ if !fireTasks(c, shards) { |
+ err = errors.New("unable to fire all tasks") |
+ } |
+ |
+ return err |
+} |
+ |
+// FireAllTasksHandler is a http handler suitable for installation into |
+// a httprouter. It expects `logging` and `luci/gae` services to be installed |
+// into the context. |
+// |
+// FireAllTasksHandler verifies that it was called within an Appengine Cron |
+// request, and then invokes the FireAllTasks function. |
+func FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Request, _ httprouter.Params) { |
+ if err := FireAllTasks(c); err != nil { |
+ rw.WriteHeader(http.StatusInternalServerError) |
+ fmt.Fprintf(rw, "fire_all_tasks failed: %s", err) |
+ } else { |
+ rw.Write([]byte("ok")) |
+ } |
+} |