Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3065)

Unified Diff: appengine/tumble/fire_tasks.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: use exists Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/tumble/example_test.go ('k') | appengine/tumble/fire_tasks_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/tumble/fire_tasks.go
diff --git a/appengine/tumble/fire_tasks.go b/appengine/tumble/fire_tasks.go
new file mode 100644
index 0000000000000000000000000000000000000000..6fcfe578a2a09c12f7e5864124f1f1ddb57c183a
--- /dev/null
+++ b/appengine/tumble/fire_tasks.go
@@ -0,0 +1,94 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package tumble
+
+import (
+ "fmt"
+ "net/http"
+
+ "github.com/julienschmidt/httprouter"
+ "github.com/luci/gae/service/taskqueue"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logging"
+ "golang.org/x/net/context"
+)
+
+func fireTasks(c context.Context, shards map[uint64]struct{}) bool {
+ if len(shards) == 0 {
+ return true
+ }
+
+ tq := taskqueue.Get(c)
+
+ cfg := GetConfig(c)
+
+ trf := cfg.TemporalRoundFactor
+ eta := clock.Now(c).UTC().Add(cfg.TemporalMinDelay + trf).Round(trf)
+ tasks := make([]*taskqueue.Task, 0, len(shards))
+
+ for shard := range shards {
+ tasks = append(tasks, &taskqueue.Task{
+ Name: fmt.Sprintf("%d_%d", eta.Unix(), shard),
+
+ Path: cfg.ProcessURL(eta, shard),
+
+ ETA: eta,
+
+ // TODO(riannucci): Tune RetryOptions?
+ })
+ }
+
+ err := tq.AddMulti(tasks, cfg.Name)
+ if err != nil {
+ if merr, ok := err.(errors.MultiError); ok {
+ lme := errors.NewLazyMultiError(len(merr))
+ for i, err := range merr {
+ if err == taskqueue.ErrTaskAlreadyAdded {
+ continue
+ }
+ lme.Assign(i, err)
+ }
+ err = lme.Get()
+ }
+ if err != nil {
+ logging.Warningf(c, "attempted to fire tasks %v, but failed: %s", shards, err)
+ }
+ }
+ return err == nil
+}
+
+// FireAllTasks fires off 1 task per shard to ensure that no tumble work
+// languishes forever. This may not be needed in a constantly-loaded system with
+// good tumble key distribution.
+func FireAllTasks(c context.Context) error {
+ num := GetConfig(c).NumShards
+ shards := make(map[uint64]struct{}, num)
+ for i := uint64(0); i < num; i++ {
+ shards[i] = struct{}{}
+ }
+
+ err := error(nil)
+ if !fireTasks(c, shards) {
+ err = errors.New("unable to fire all tasks")
+ }
+
+ return err
+}
+
+// FireAllTasksHandler is a http handler suitable for installation into
+// a httprouter. It expects `logging` and `luci/gae` services to be installed
+// into the context.
+//
+// FireAllTasksHandler verifies that it was called within an Appengine Cron
+// request, and then invokes the FireAllTasks function.
+func FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Request, _ httprouter.Params) {
+ if err := FireAllTasks(c); err != nil {
+ rw.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(rw, "fire_all_tasks failed: %s", err)
+ } else {
+ rw.Write([]byte("ok"))
+ }
+}
« no previous file with comments | « appengine/tumble/example_test.go ('k') | appengine/tumble/fire_tasks_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698