OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package tumble |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "strings" |
| 10 "time" |
| 11 |
| 12 "github.com/julienschmidt/httprouter" |
| 13 "github.com/luci/luci-go/appengine/middleware" |
| 14 "golang.org/x/net/context" |
| 15 ) |
| 16 |
| 17 // Config is the set of tweakable things for tumble. If you use something other |
| 18 // than the defaults (e.g. unset values), you must ensure that all aspects of |
| 19 // your application use the same config. |
| 20 type Config struct { |
| 21 // Name is the name of this service. This is the expected name of the |
| 22 // configured taskqueue, as well as the prefix used for things like memc
ache |
| 23 // keys. |
| 24 // |
| 25 // It defaults to "tumble". It is illegal for the Name to contain '/', a
nd |
| 26 // Use will panic if it does. |
| 27 Name string |
| 28 |
| 29 // URLPrefix is the prefix to append for all registered routes. It's |
| 30 // normalized to begin and end with a '/'. So "wat" would register: |
| 31 // "/wat/{Service.Name}/fire_all_tasks" |
| 32 // "/wat/{Service.Name}/process_shard/:shard_id/at/:timestamp" |
| 33 // |
| 34 // This defaults to "internal" |
| 35 URLPrefix string |
| 36 |
| 37 // NumShards is the number of tumble shards that will process concurrent
ly. |
| 38 // It defaults to 32. |
| 39 NumShards uint64 |
| 40 |
| 41 // TemporalMinDelay is the minimum number of seconds to wait before the |
| 42 // task queue entry for a given shard will run. It defaults to 1 second. |
| 43 TemporalMinDelay time.Duration |
| 44 |
| 45 // TemporalRoundFactor is the number of seconds to batch together in tas
k |
| 46 // queue tasks. It defaults to 4 seconds. |
| 47 TemporalRoundFactor time.Duration |
| 48 |
| 49 // NumGoroutines is the number of gorountines that will process in paral
lel |
| 50 // in a single shard. Each goroutine will process exactly one root entit
y. |
| 51 // It defaults to 16. |
| 52 NumGoroutines uint64 |
| 53 |
| 54 // ProcessMaxBatchSize is the number of mutations that each processor go
routine |
| 55 // will attempt to include in each commit. |
| 56 // |
| 57 // It defaults to 128. A negative value means no limit. |
| 58 ProcessMaxBatchSize int32 |
| 59 } |
| 60 |
| 61 type key int |
| 62 |
| 63 var defaultConfig = Config{ |
| 64 Name: "tumble", |
| 65 URLPrefix: "/internal/", |
| 66 NumShards: 32, |
| 67 TemporalMinDelay: time.Second, |
| 68 TemporalRoundFactor: 4 * time.Second, |
| 69 NumGoroutines: 16, |
| 70 ProcessMaxBatchSize: 128, |
| 71 } |
| 72 |
| 73 // DefaultConfig returns a Config with all the default values populated. |
| 74 func DefaultConfig() Config { |
| 75 return defaultConfig |
| 76 } |
| 77 |
| 78 // Use allows you to set a specific configuration in the context. This |
| 79 // configuration can be obtained by calling GetConfig. Any zero-value fields |
| 80 // in the Config will be replaced with its default value. |
| 81 // |
| 82 // This Config may be retrieved with GetConfig. |
| 83 func Use(c context.Context, cfg Config) context.Context { |
| 84 if cfg.Name == "" { |
| 85 cfg.Name = defaultConfig.Name |
| 86 } |
| 87 if strings.Contains(cfg.Name, "/") { |
| 88 panic(fmt.Errorf("tumble: name may not contain '/': %q", cfg.Nam
e)) |
| 89 } |
| 90 if cfg.URLPrefix == "" { |
| 91 cfg.URLPrefix = defaultConfig.URLPrefix |
| 92 } |
| 93 if !strings.HasPrefix(cfg.URLPrefix, "/") { |
| 94 cfg.URLPrefix = "/" + cfg.URLPrefix |
| 95 } |
| 96 if !strings.HasSuffix(cfg.URLPrefix, "/") { |
| 97 cfg.URLPrefix = cfg.URLPrefix + "/" |
| 98 } |
| 99 if cfg.NumShards == 0 { |
| 100 cfg.NumShards = defaultConfig.NumShards |
| 101 } |
| 102 if cfg.TemporalMinDelay == 0 { |
| 103 cfg.TemporalMinDelay = defaultConfig.TemporalMinDelay |
| 104 } |
| 105 if cfg.TemporalRoundFactor == 0 { |
| 106 cfg.TemporalRoundFactor = defaultConfig.TemporalRoundFactor |
| 107 } |
| 108 if cfg.NumGoroutines == 0 { |
| 109 cfg.NumGoroutines = defaultConfig.NumGoroutines |
| 110 } |
| 111 if cfg.ProcessMaxBatchSize == 0 { |
| 112 cfg.ProcessMaxBatchSize = defaultConfig.ProcessMaxBatchSize |
| 113 } |
| 114 return context.WithValue(c, key(0), &cfg) |
| 115 } |
| 116 |
| 117 // GetConfig retrieves the Config from the current context. If none has been set
, |
| 118 // this returns a Config which has all the defaults filled out. |
| 119 func GetConfig(c context.Context) Config { |
| 120 if cfg, ok := c.Value(key(0)).(*Config); ok { |
| 121 return *cfg |
| 122 } |
| 123 return defaultConfig |
| 124 } |
| 125 |
| 126 const processShardURLFormat = "/process_shard/:shard_id/at/:timestamp" |
| 127 |
| 128 // ProcessURLPattern returns the httprouter-style URL pattern for the taskqueue |
| 129 // process handler. |
| 130 func (c *Config) ProcessURLPattern() string { |
| 131 return c.URLPrefix + c.Name + processShardURLFormat |
| 132 } |
| 133 |
| 134 // ProcessURL creates a new url for a process shard taskqueue task, including |
| 135 // the given timestamp and shard number. |
| 136 func (c *Config) ProcessURL(ts time.Time, shard uint64) string { |
| 137 return strings.NewReplacer( |
| 138 ":shard_id", fmt.Sprint(shard), |
| 139 ":timestamp", fmt.Sprint(ts.Unix())).Replace(c.ProcessURLPattern
()) |
| 140 } |
| 141 |
| 142 // FireAllTasksURL returns the url intended to be hit by appengine cron to fire |
| 143 // an instance of all the processing tasks. |
| 144 func (c *Config) FireAllTasksURL() string { |
| 145 return c.URLPrefix + c.Name + "/fire_all_tasks" |
| 146 } |
| 147 |
| 148 // InstallHandlers installs http handlers |
| 149 func (c *Config) InstallHandlers(r *httprouter.Router) { |
| 150 // GET so that this can be invoked from cron |
| 151 r.GET(c.FireAllTasksURL(), |
| 152 middleware.BaseProd(middleware.RequireCron(FireAllTasksHandler))
) |
| 153 |
| 154 r.POST(c.ProcessURLPattern(), |
| 155 middleware.BaseProd(middleware.RequireTaskQueue(c.Name, ProcessS
hardHandler))) |
| 156 } |
OLD | NEW |