Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: appengine/tumble/config.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/tumble/doc.go » ('j') | appengine/tumble/doc.go » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "fmt"
9 "net/http"
10 "strings"
11 "time"
12
13 "github.com/julienschmidt/httprouter"
14 "github.com/luci/gae/impl/prod"
15 "github.com/luci/luci-go/appengine/gaelogger"
16 "golang.org/x/net/context"
17 )
18
19 // Config is the set of tweakable things for tumble. If you use something other
20 // than the defaults (e.g. unset values), you must ensure that all aspects of
21 // your application use the same config.
22 type Config struct {
23 // Name is the name of this service. This is the expected name of the
24 // configured taskqueue, as well as the prefix used for things like memc ache
25 // keys.
26 //
27 // It defaults to "tumble". It is illegal for the Name to contain '/', a nd
28 // Use will panic if it does.
29 Name string
30
31 // URLPrefix is the prefix to append for all registered routes. It's
32 // normalized to begin and end with a '/'. So "wat" would register:
33 // "/wat/{Service.Name}/fire_all_tasks"
34 // "/wat/{Service.Name}/process_shard/:shard_id/at/:timestamp"
35 //
36 // This defaults to "internal"
37 URLPrefix string
38
39 // NumShards is the number of tumble shards that will process concurrent ly.
40 // It defaults to 32.
41 NumShards uint64
42
43 // TemporalMinDelay is the minimum number of seconds to wait before the
44 // task queue entry for a given shard will run. It defaults to 1 second.
45 TemporalMinDelay time.Duration
46
47 // TemporalRoundFactor is the number of seconds to batch together in tas k
48 // queue tasks. It defaults to 4 seconds.
49 TemporalRoundFactor time.Duration
50
51 // NumGoroutines is the number of gorountines that will process in paral lel
52 // in a single shard. Each goroutine will process exactly one root entit y.
53 // It defaults to 16.
54 NumGoroutines uint64
55
56 // ProcessMaxBatchSize is the number of mutations that each processor go routine
57 // will attempt to include in each commit.
58 //
59 // It defaults to 128. A negative value means no limit.
60 ProcessMaxBatchSize int32
61 }
62
63 type key int
64
65 var defaultConfig = Config{
66 Name: "tumble",
67 URLPrefix: "/internal/",
68 NumShards: 32,
69 TemporalMinDelay: time.Second,
70 TemporalRoundFactor: 4 * time.Second,
71 NumGoroutines: 16,
72 ProcessMaxBatchSize: 128,
73 }
74
75 // DefaultConfig returns a Config with all the default values populated.
76 func DefaultConfig() Config {
77 return defaultConfig
78 }
79
80 // Use allows you to set a specific configuration in the context. This
81 // configuration can be obtained by calling GetConfig. Any zero-value fields
82 // in the Config will be replaced with its default value.
83 //
84 // This Config may be retrieved with GetConfig.
85 func Use(c context.Context, cfg Config) context.Context {
86 if cfg.Name == "" {
87 cfg.Name = defaultConfig.Name
88 }
89 if strings.Contains(cfg.Name, "/") {
90 panic(fmt.Errorf("tumble: name may not contain '/': %q", cfg.Nam e))
91 }
92 if cfg.URLPrefix == "" {
93 cfg.URLPrefix = defaultConfig.URLPrefix
94 }
95 if !strings.HasPrefix(cfg.URLPrefix, "/") {
96 cfg.URLPrefix = "/" + cfg.URLPrefix
97 }
98 if !strings.HasSuffix(cfg.URLPrefix, "/") {
99 cfg.URLPrefix = cfg.URLPrefix + "/"
100 }
101 if cfg.NumShards == 0 {
102 cfg.NumShards = defaultConfig.NumShards
103 }
104 if cfg.TemporalMinDelay == 0 {
105 cfg.TemporalMinDelay = defaultConfig.TemporalMinDelay
106 }
107 if cfg.TemporalRoundFactor == 0 {
108 cfg.TemporalRoundFactor = defaultConfig.TemporalRoundFactor
109 }
110 if cfg.NumGoroutines == 0 {
111 cfg.NumGoroutines = defaultConfig.NumGoroutines
112 }
113 if cfg.ProcessMaxBatchSize == 0 {
114 cfg.ProcessMaxBatchSize = defaultConfig.ProcessMaxBatchSize
115 }
116 return context.WithValue(c, key(0), &cfg)
117 }
118
119 // GetConfig retrieves the Config from the current context. If none has been set ,
120 // this returns a Config which has all the defaults filled out.
121 func GetConfig(c context.Context) Config {
122 if cfg, ok := c.Value(key(0)).(*Config); ok {
123 return *cfg
124 }
125 return defaultConfig
126 }
127
128 const processShardURLFormat = "/process_shard/:shard_id/at/:timestamp"
129
130 // ProcessURLPattern returns the httprouter-style URL pattern for the taskqueue
131 // process handler.
132 func (c *Config) ProcessURLPattern() string {
133 return c.URLPrefix + c.Name + processShardURLFormat
134 }
135
136 // ProcessURL creates a new url for a process shard taskqueue task, including
137 // the given timestamp and shard number.
138 func (c *Config) ProcessURL(ts time.Time, shard uint64) string {
139 return strings.NewReplacer(
140 ":shard_id", fmt.Sprint(shard),
141 ":timestamp", fmt.Sprint(ts.Unix())).Replace(c.ProcessURLPattern ())
142 }
143
144 // FireAllTasksURL returns the url intended to be hit by appengine cron to fire
145 // an instance of all the processing tasks.
146 func (c *Config) FireAllTasksURL() string {
147 return c.URLPrefix + c.Name + "/fire_all_tasks"
148 }
149
150 // InstallHandlers installs http handlers
151 func (c *Config) InstallHandlers(r *httprouter.Router) {
152 // GET so that this can be invoked from cron
153 r.GET(c.FireAllTasksURL(),
Vadim Sh. 2015/10/12 20:05:14 check X-AppEngine-Cron header is set to "true" as
iannucci 2015/10/13 02:39:46 middlewaarrrereeerereerrere
154 func(rw http.ResponseWriter, r *http.Request, _ httprouter.Param s) {
155 c := gaelogger.Use(prod.UseRequest(r))
156 FireAllTasksHandler(c, rw, r)
157 })
158 r.POST(c.ProcessURLPattern(),
Vadim Sh. 2015/10/12 20:05:14 same here (X-AppEngine-QueueName header). Perhaps
iannucci 2015/10/13 02:39:46 да
159 func(rw http.ResponseWriter, r *http.Request, p httprouter.Param s) {
160 c := gaelogger.Use(prod.UseRequest(r))
161 ProcessShardHandler(c, rw, r, p)
162 })
163 }
OLDNEW
« no previous file with comments | « no previous file | appengine/tumble/doc.go » ('j') | appengine/tumble/doc.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698