Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: appengine/tumble/config.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: use exists Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/tumble/doc.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "fmt"
9 "strings"
10 "time"
11
12 "github.com/julienschmidt/httprouter"
13 "github.com/luci/luci-go/appengine/middleware"
14 "golang.org/x/net/context"
15 )
16
17 // Config is the set of tweakable things for tumble. If you use something other
18 // than the defaults (e.g. unset values), you must ensure that all aspects of
19 // your application use the same config.
20 type Config struct {
21 // Name is the name of this service. This is the expected name of the
22 // configured taskqueue, as well as the prefix used for things like memc ache
23 // keys.
24 //
25 // It defaults to "tumble". It is illegal for the Name to contain '/', a nd
26 // Use will panic if it does.
27 Name string
28
29 // URLPrefix is the prefix to append for all registered routes. It's
30 // normalized to begin and end with a '/'. So "wat" would register:
31 // "/wat/{Service.Name}/fire_all_tasks"
32 // "/wat/{Service.Name}/process_shard/:shard_id/at/:timestamp"
33 //
34 // This defaults to "internal"
35 URLPrefix string
36
37 // NumShards is the number of tumble shards that will process concurrent ly.
38 // It defaults to 32.
39 NumShards uint64
40
41 // TemporalMinDelay is the minimum number of seconds to wait before the
42 // task queue entry for a given shard will run. It defaults to 1 second.
43 TemporalMinDelay time.Duration
44
45 // TemporalRoundFactor is the number of seconds to batch together in tas k
46 // queue tasks. It defaults to 4 seconds.
47 TemporalRoundFactor time.Duration
48
49 // NumGoroutines is the number of gorountines that will process in paral lel
50 // in a single shard. Each goroutine will process exactly one root entit y.
51 // It defaults to 16.
52 NumGoroutines uint64
53
54 // ProcessMaxBatchSize is the number of mutations that each processor go routine
55 // will attempt to include in each commit.
56 //
57 // It defaults to 128. A negative value means no limit.
58 ProcessMaxBatchSize int32
59 }
60
61 type key int
62
63 var defaultConfig = Config{
64 Name: "tumble",
65 URLPrefix: "/internal/",
66 NumShards: 32,
67 TemporalMinDelay: time.Second,
68 TemporalRoundFactor: 4 * time.Second,
69 NumGoroutines: 16,
70 ProcessMaxBatchSize: 128,
71 }
72
73 // DefaultConfig returns a Config with all the default values populated.
74 func DefaultConfig() Config {
75 return defaultConfig
76 }
77
78 // Use allows you to set a specific configuration in the context. This
79 // configuration can be obtained by calling GetConfig. Any zero-value fields
80 // in the Config will be replaced with its default value.
81 //
82 // This Config may be retrieved with GetConfig.
83 func Use(c context.Context, cfg Config) context.Context {
84 if cfg.Name == "" {
85 cfg.Name = defaultConfig.Name
86 }
87 if strings.Contains(cfg.Name, "/") {
88 panic(fmt.Errorf("tumble: name may not contain '/': %q", cfg.Nam e))
89 }
90 if cfg.URLPrefix == "" {
91 cfg.URLPrefix = defaultConfig.URLPrefix
92 }
93 if !strings.HasPrefix(cfg.URLPrefix, "/") {
94 cfg.URLPrefix = "/" + cfg.URLPrefix
95 }
96 if !strings.HasSuffix(cfg.URLPrefix, "/") {
97 cfg.URLPrefix = cfg.URLPrefix + "/"
98 }
99 if cfg.NumShards == 0 {
100 cfg.NumShards = defaultConfig.NumShards
101 }
102 if cfg.TemporalMinDelay == 0 {
103 cfg.TemporalMinDelay = defaultConfig.TemporalMinDelay
104 }
105 if cfg.TemporalRoundFactor == 0 {
106 cfg.TemporalRoundFactor = defaultConfig.TemporalRoundFactor
107 }
108 if cfg.NumGoroutines == 0 {
109 cfg.NumGoroutines = defaultConfig.NumGoroutines
110 }
111 if cfg.ProcessMaxBatchSize == 0 {
112 cfg.ProcessMaxBatchSize = defaultConfig.ProcessMaxBatchSize
113 }
114 return context.WithValue(c, key(0), &cfg)
115 }
116
117 // GetConfig retrieves the Config from the current context. If none has been set ,
118 // this returns a Config which has all the defaults filled out.
119 func GetConfig(c context.Context) Config {
120 if cfg, ok := c.Value(key(0)).(*Config); ok {
121 return *cfg
122 }
123 return defaultConfig
124 }
125
126 const processShardURLFormat = "/process_shard/:shard_id/at/:timestamp"
127
128 // ProcessURLPattern returns the httprouter-style URL pattern for the taskqueue
129 // process handler.
130 func (c *Config) ProcessURLPattern() string {
131 return c.URLPrefix + c.Name + processShardURLFormat
132 }
133
134 // ProcessURL creates a new url for a process shard taskqueue task, including
135 // the given timestamp and shard number.
136 func (c *Config) ProcessURL(ts time.Time, shard uint64) string {
137 return strings.NewReplacer(
138 ":shard_id", fmt.Sprint(shard),
139 ":timestamp", fmt.Sprint(ts.Unix())).Replace(c.ProcessURLPattern ())
140 }
141
142 // FireAllTasksURL returns the url intended to be hit by appengine cron to fire
143 // an instance of all the processing tasks.
144 func (c *Config) FireAllTasksURL() string {
145 return c.URLPrefix + c.Name + "/fire_all_tasks"
146 }
147
148 // InstallHandlers installs http handlers
149 func (c *Config) InstallHandlers(r *httprouter.Router) {
150 // GET so that this can be invoked from cron
151 r.GET(c.FireAllTasksURL(),
152 middleware.BaseProd(middleware.RequireCron(FireAllTasksHandler)) )
153
154 r.POST(c.ProcessURLPattern(),
155 middleware.BaseProd(middleware.RequireTaskQueue(c.Name, ProcessS hardHandler)))
156 }
OLDNEW
« no previous file with comments | « no previous file | appengine/tumble/doc.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698