Index: appengine/tumble/config.go |
diff --git a/appengine/tumble/config.go b/appengine/tumble/config.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..d26d01321f9b8c364509bd32c29b642df2eea616 |
--- /dev/null |
+++ b/appengine/tumble/config.go |
@@ -0,0 +1,163 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package tumble |
+ |
+import ( |
+ "fmt" |
+ "net/http" |
+ "strings" |
+ "time" |
+ |
+ "github.com/julienschmidt/httprouter" |
+ "github.com/luci/gae/impl/prod" |
+ "github.com/luci/luci-go/appengine/gaelogger" |
+ "golang.org/x/net/context" |
+) |
+ |
+// Config is the set of tweakable things for tumble. If you use something other |
+// than the defaults (e.g. unset values), you must ensure that all aspects of |
+// your application use the same config. |
+type Config struct { |
+ // Name is the name of this service. This is the expected name of the |
+ // configured taskqueue, as well as the prefix used for things like memcache |
+ // keys. |
+ // |
+ // It defaults to "tumble". It is illegal for the Name to contain '/', and |
+ // Use will panic if it does. |
+ Name string |
+ |
+ // URLPrefix is the prefix to append for all registered routes. It's |
+ // normalized to begin and end with a '/'. So "wat" would register: |
+ // "/wat/{Service.Name}/fire_all_tasks" |
+ // "/wat/{Service.Name}/process_shard/:shard_id/at/:timestamp" |
+ // |
+ // This defaults to "internal" |
+ URLPrefix string |
+ |
+ // NumShards is the number of tumble shards that will process concurrently. |
+ // It defaults to 32. |
+ NumShards uint64 |
+ |
+ // TemporalMinDelay is the minimum number of seconds to wait before the |
+ // task queue entry for a given shard will run. It defaults to 1 second. |
+ TemporalMinDelay time.Duration |
+ |
+ // TemporalRoundFactor is the number of seconds to batch together in task |
+ // queue tasks. It defaults to 4 seconds. |
+ TemporalRoundFactor time.Duration |
+ |
+ // NumGoroutines is the number of gorountines that will process in parallel |
+ // in a single shard. Each goroutine will process exactly one root entity. |
+ // It defaults to 16. |
+ NumGoroutines uint64 |
+ |
+ // ProcessMaxBatchSize is the number of mutations that each processor goroutine |
+ // will attempt to include in each commit. |
+ // |
+ // It defaults to 128. A negative value means no limit. |
+ ProcessMaxBatchSize int32 |
+} |
+ |
+type key int |
+ |
+var defaultConfig = Config{ |
+ Name: "tumble", |
+ URLPrefix: "/internal/", |
+ NumShards: 32, |
+ TemporalMinDelay: time.Second, |
+ TemporalRoundFactor: 4 * time.Second, |
+ NumGoroutines: 16, |
+ ProcessMaxBatchSize: 128, |
+} |
+ |
+// DefaultConfig returns a Config with all the default values populated. |
+func DefaultConfig() Config { |
+ return defaultConfig |
+} |
+ |
+// Use allows you to set a specific configuration in the context. This |
+// configuration can be obtained by calling GetConfig. Any zero-value fields |
+// in the Config will be replaced with its default value. |
+// |
+// This Config may be retrieved with GetConfig. |
+func Use(c context.Context, cfg Config) context.Context { |
+ if cfg.Name == "" { |
+ cfg.Name = defaultConfig.Name |
+ } |
+ if strings.Contains(cfg.Name, "/") { |
+ panic(fmt.Errorf("tumble: name may not contain '/': %q", cfg.Name)) |
+ } |
+ if cfg.URLPrefix == "" { |
+ cfg.URLPrefix = defaultConfig.URLPrefix |
+ } |
+ if !strings.HasPrefix(cfg.URLPrefix, "/") { |
+ cfg.URLPrefix = "/" + cfg.URLPrefix |
+ } |
+ if !strings.HasSuffix(cfg.URLPrefix, "/") { |
+ cfg.URLPrefix = cfg.URLPrefix + "/" |
+ } |
+ if cfg.NumShards == 0 { |
+ cfg.NumShards = defaultConfig.NumShards |
+ } |
+ if cfg.TemporalMinDelay == 0 { |
+ cfg.TemporalMinDelay = defaultConfig.TemporalMinDelay |
+ } |
+ if cfg.TemporalRoundFactor == 0 { |
+ cfg.TemporalRoundFactor = defaultConfig.TemporalRoundFactor |
+ } |
+ if cfg.NumGoroutines == 0 { |
+ cfg.NumGoroutines = defaultConfig.NumGoroutines |
+ } |
+ if cfg.ProcessMaxBatchSize == 0 { |
+ cfg.ProcessMaxBatchSize = defaultConfig.ProcessMaxBatchSize |
+ } |
+ return context.WithValue(c, key(0), &cfg) |
+} |
+ |
+// GetConfig retrieves the Config from the current context. If none has been set, |
+// this returns a Config which has all the defaults filled out. |
+func GetConfig(c context.Context) Config { |
+ if cfg, ok := c.Value(key(0)).(*Config); ok { |
+ return *cfg |
+ } |
+ return defaultConfig |
+} |
+ |
+const processShardURLFormat = "/process_shard/:shard_id/at/:timestamp" |
+ |
+// ProcessURLPattern returns the httprouter-style URL pattern for the taskqueue |
+// process handler. |
+func (c *Config) ProcessURLPattern() string { |
+ return c.URLPrefix + c.Name + processShardURLFormat |
+} |
+ |
+// ProcessURL creates a new url for a process shard taskqueue task, including |
+// the given timestamp and shard number. |
+func (c *Config) ProcessURL(ts time.Time, shard uint64) string { |
+ return strings.NewReplacer( |
+ ":shard_id", fmt.Sprint(shard), |
+ ":timestamp", fmt.Sprint(ts.Unix())).Replace(c.ProcessURLPattern()) |
+} |
+ |
+// FireAllTasksURL returns the url intended to be hit by appengine cron to fire |
+// an instance of all the processing tasks. |
+func (c *Config) FireAllTasksURL() string { |
+ return c.URLPrefix + c.Name + "/fire_all_tasks" |
+} |
+ |
+// InstallHandlers installs http handlers |
+func (c *Config) InstallHandlers(r *httprouter.Router) { |
+ // GET so that this can be invoked from cron |
+ r.GET(c.FireAllTasksURL(), |
Vadim Sh.
2015/10/12 20:05:14
check X-AppEngine-Cron header is set to "true" as
iannucci
2015/10/13 02:39:46
middlewaarrrereeerereerrere
|
+ func(rw http.ResponseWriter, r *http.Request, _ httprouter.Params) { |
+ c := gaelogger.Use(prod.UseRequest(r)) |
+ FireAllTasksHandler(c, rw, r) |
+ }) |
+ r.POST(c.ProcessURLPattern(), |
Vadim Sh.
2015/10/12 20:05:14
same here (X-AppEngine-QueueName header).
Perhaps
iannucci
2015/10/13 02:39:46
да
|
+ func(rw http.ResponseWriter, r *http.Request, p httprouter.Params) { |
+ c := gaelogger.Use(prod.UseRequest(r)) |
+ ProcessShardHandler(c, rw, r, p) |
+ }) |
+} |