OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package tumble | |
6 | |
7 import ( | |
8 "fmt" | |
9 "net/http" | |
10 "strings" | |
11 "time" | |
12 | |
13 "github.com/julienschmidt/httprouter" | |
14 "github.com/luci/gae/impl/prod" | |
15 "github.com/luci/luci-go/appengine/gaelogger" | |
16 "golang.org/x/net/context" | |
17 ) | |
18 | |
19 // Config is the set of tweakable things for tumble. If you use something other | |
20 // than the defaults (e.g. unset values), you must ensure that all aspects of | |
21 // your application use the same config. | |
22 type Config struct { | |
23 // Name is the name of this service. This is the expected name of the | |
24 // configured taskqueue, as well as the prefix used for things like memc ache | |
25 // keys. | |
26 // | |
27 // It defaults to "tumble". It is illegal for the Name to contain '/', a nd | |
28 // Use will panic if it does. | |
29 Name string | |
30 | |
31 // URLPrefix is the prefix to append for all registered routes. It's | |
32 // normalized to begin and end with a '/'. So "wat" would register: | |
33 // "/wat/{Service.Name}/fire_all_tasks" | |
34 // "/wat/{Service.Name}/process_shard/:shard_id/at/:timestamp" | |
35 // | |
36 // This defaults to "internal" | |
37 URLPrefix string | |
38 | |
39 // NumShards is the number of tumble shards that will process concurrent ly. | |
40 // It defaults to 32. | |
41 NumShards uint64 | |
42 | |
43 // TemporalMinDelay is the minimum number of seconds to wait before the | |
44 // task queue entry for a given shard will run. It defaults to 1 second. | |
45 TemporalMinDelay time.Duration | |
46 | |
47 // TemporalRoundFactor is the number of seconds to batch together in tas k | |
48 // queue tasks. It defaults to 4 seconds. | |
49 TemporalRoundFactor time.Duration | |
50 | |
51 // NumGoroutines is the number of gorountines that will process in paral lel | |
52 // in a single shard. Each goroutine will process exactly one root entit y. | |
53 // It defaults to 16. | |
54 NumGoroutines uint64 | |
55 | |
56 // ProcessMaxBatchSize is the number of mutations that each processor go routine | |
57 // will attempt to include in each commit. | |
58 // | |
59 // It defaults to 128. A negative value means no limit. | |
60 ProcessMaxBatchSize int32 | |
61 } | |
62 | |
63 type key int | |
64 | |
65 var defaultConfig = Config{ | |
66 Name: "tumble", | |
67 URLPrefix: "/internal/", | |
68 NumShards: 32, | |
69 TemporalMinDelay: time.Second, | |
70 TemporalRoundFactor: 4 * time.Second, | |
71 NumGoroutines: 16, | |
72 ProcessMaxBatchSize: 128, | |
73 } | |
74 | |
75 // DefaultConfig returns a Config with all the default values populated. | |
76 func DefaultConfig() Config { | |
77 return defaultConfig | |
78 } | |
79 | |
80 // Use allows you to set a specific configuration in the context. This | |
81 // configuration can be obtained by calling GetConfig. Any zero-value fields | |
82 // in the Config will be replaced with its default value. | |
83 // | |
84 // This Config may be retrieved with GetConfig. | |
85 func Use(c context.Context, cfg Config) context.Context { | |
86 if cfg.Name == "" { | |
87 cfg.Name = defaultConfig.Name | |
88 } | |
89 if strings.Contains(cfg.Name, "/") { | |
90 panic(fmt.Errorf("tumble: name may not contain '/': %q", cfg.Nam e)) | |
91 } | |
92 if cfg.URLPrefix == "" { | |
93 cfg.URLPrefix = defaultConfig.URLPrefix | |
94 } | |
95 if !strings.HasPrefix(cfg.URLPrefix, "/") { | |
96 cfg.URLPrefix = "/" + cfg.URLPrefix | |
97 } | |
98 if !strings.HasSuffix(cfg.URLPrefix, "/") { | |
99 cfg.URLPrefix = cfg.URLPrefix + "/" | |
100 } | |
101 if cfg.NumShards == 0 { | |
102 cfg.NumShards = defaultConfig.NumShards | |
103 } | |
104 if cfg.TemporalMinDelay == 0 { | |
105 cfg.TemporalMinDelay = defaultConfig.TemporalMinDelay | |
106 } | |
107 if cfg.TemporalRoundFactor == 0 { | |
108 cfg.TemporalRoundFactor = defaultConfig.TemporalRoundFactor | |
109 } | |
110 if cfg.NumGoroutines == 0 { | |
111 cfg.NumGoroutines = defaultConfig.NumGoroutines | |
112 } | |
113 if cfg.ProcessMaxBatchSize == 0 { | |
114 cfg.ProcessMaxBatchSize = defaultConfig.ProcessMaxBatchSize | |
115 } | |
116 return context.WithValue(c, key(0), &cfg) | |
117 } | |
118 | |
119 // GetConfig retrieves the Config from the current context. If none has been set , | |
120 // this returns a Config which has all the defaults filled out. | |
121 func GetConfig(c context.Context) Config { | |
122 if cfg, ok := c.Value(key(0)).(*Config); ok { | |
123 return *cfg | |
124 } | |
125 return defaultConfig | |
126 } | |
127 | |
128 const processShardURLFormat = "/process_shard/:shard_id/at/:timestamp" | |
129 | |
130 // ProcessURLPattern returns the httprouter-style URL pattern for the taskqueue | |
131 // process handler. | |
132 func (c *Config) ProcessURLPattern() string { | |
133 return c.URLPrefix + c.Name + processShardURLFormat | |
134 } | |
135 | |
136 // ProcessURL creates a new url for a process shard taskqueue task, including | |
137 // the given timestamp and shard number. | |
138 func (c *Config) ProcessURL(ts time.Time, shard uint64) string { | |
139 return strings.NewReplacer( | |
140 ":shard_id", fmt.Sprint(shard), | |
141 ":timestamp", fmt.Sprint(ts.Unix())).Replace(c.ProcessURLPattern ()) | |
142 } | |
143 | |
144 // FireAllTasksURL returns the url intended to be hit by appengine cron to fire | |
145 // an instance of all the processing tasks. | |
146 func (c *Config) FireAllTasksURL() string { | |
147 return c.URLPrefix + c.Name + "/fire_all_tasks" | |
148 } | |
149 | |
150 // InstallHandlers installs http handlers | |
151 func (c *Config) InstallHandlers(r *httprouter.Router) { | |
152 // GET so that this can be invoked from cron | |
153 r.GET(c.FireAllTasksURL(), | |
Vadim Sh.
2015/10/12 20:05:14
check X-AppEngine-Cron header is set to "true" as
iannucci
2015/10/13 02:39:46
middlewaarrrereeerereerrere
| |
154 func(rw http.ResponseWriter, r *http.Request, _ httprouter.Param s) { | |
155 c := gaelogger.Use(prod.UseRequest(r)) | |
156 FireAllTasksHandler(c, rw, r) | |
157 }) | |
158 r.POST(c.ProcessURLPattern(), | |
Vadim Sh.
2015/10/12 20:05:14
same here (X-AppEngine-QueueName header).
Perhaps
iannucci
2015/10/13 02:39:46
да
| |
159 func(rw http.ResponseWriter, r *http.Request, p httprouter.Param s) { | |
160 c := gaelogger.Use(prod.UseRequest(r)) | |
161 ProcessShardHandler(c, rw, r, p) | |
162 }) | |
163 } | |
OLD | NEW |