Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(277)

Side by Side Diff: appengine/cmd/dm/distributor/registry.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package distributor
6
7 import (
8 "fmt"
9 "reflect"
10 "strings"
11
12 "github.com/golang/protobuf/proto"
13 "github.com/luci/gae/service/info"
14 "github.com/luci/luci-go/appengine/tumble"
15 "github.com/luci/luci-go/common/api/dm/distributor"
16 "github.com/luci/luci-go/common/api/dm/service/v1"
17 "github.com/luci/luci-go/common/config"
18 "github.com/luci/luci-go/common/logging"
19 "golang.org/x/net/context"
20 )
21
22 var regKey = "holds a Registry"
dnj (Google) 2016/06/09 18:00:56 nit: "holds a DM Distributor Registry"
iannucci 2016/06/15 00:46:00 Done.
23
24 // WithRegistry adds the registry to the Context.
25 func WithRegistry(c context.Context, r Registry) context.Context {
26 if r == nil {
27 return c
dnj (Google) 2016/06/09 18:00:56 Perhaps this should panic? I think that this is cl
iannucci 2016/06/15 00:46:00 Done.
28 }
29 return context.WithValue(c, &regKey, r)
30 }
31
32 // GetRegistry gets the registry from the Context. This will return nil if the
33 // Context does not contain a Registry.
34 func GetRegistry(c context.Context) Registry {
35 ret, _ := c.Value(&regKey).(Registry)
36 return ret
37 }
38
39 // FinishExecutionFn is required to eliminate a circular dependency
40 // between mutate <-> distributor. Essentially this just makes a new
41 // mutate.FinishExecution.
42 //
43 // See mutate.FinishExecutionFn for the only actual implementation of this.
44 type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskR esult) ([]tumble.Mutation, error)
45
46 // Registry holds a collection of all of the available distributor types.
47 type Registry interface {
48 FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResul t) ([]tumble.Mutation, error)
49
50 // MakeDistributor builds a distributor instance that's configured with the
51 // provided config.
52 //
53 // The configuration for this distributor are obtained from luci-config at the
54 // time an Execution is started.
55 MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error)
56 }
57
58 // NewRegistry builds a new implementation of Registry configured to load
59 // configuration data from luci-config.
60 //
61 // The mapping should hold nil-ptrs of various config protos -> respective
62 // Factory. When loading from luci-config, when we see a given message type,
63 // we'll construct the distributor instance using the provided Factory.
64 func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Regis try {
65 ret := &registry{fFn, make(map[reflect.Type]Factory, len(mapping))}
66 add := func(p proto.Message, factory Factory) {
67 if factory == nil {
68 panic("factory is nil")
69 }
70 if p == nil {
71 panic("proto.Message is nil")
72 }
73
74 typ := reflect.TypeOf(p)
75
76 if _, ok := ret.data[typ]; ok {
77 panic(fmt.Errorf("trying to register %q twice", typ))
78 }
79 ret.data[typ] = factory
80 }
81 for p, f := range mapping {
82 add(p, f)
83 }
84 return ret
85 }
86
87 type registry struct {
88 finishExecutionImpl FinishExecutionFn
89 data map[reflect.Type]Factory
90 }
91
92 var _ Registry = (*registry)(nil)
93
94 func (r *registry) SetFinishExecutionImpl(i func(context.Context, *dm.Execution_ ID, *TaskResult) ([]tumble.Mutation, error)) {
95 r.finishExecutionImpl = i
dnj (Google) 2016/06/09 18:00:55 This seems like it would need locking. But it's no
iannucci 2016/06/15 00:46:00 it was a bogus extra method
96 }
97
98 func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) {
99 return r.finishExecutionImpl(c, eid, rslt)
100 }
101
102 func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) {
103 cfg, err := loadConfig(c, cfgName)
104 if err != nil {
105 logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed t o load config")
106 return
107 }
108
109 ver = cfg.Version
110
111 typ := reflect.TypeOf(cfg.Content)
112
113 fn, ok := r.data[typ]
114 if !ok {
115 return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Co ntent)
116 }
117
118 d, err = fn(c, cfg)
119 return
120 }
121
122 func getTrimmedAppID(c context.Context) string {
dnj (Google) 2016/06/09 18:00:56 Maybe this should be built into gae/info?
iannucci 2016/06/15 00:46:00 yeah probably. Done: https://chromiumcodereview.ap
123 // custom domains show up as "foo.com:appid"
124 toks := strings.Split(info.Get(c).AppID(), ":")
125 return toks[len(toks)-1]
126 }
127
128 // loadConfig loads the named distributor configurtaion from luci-config,
dnj (Google) 2016/06/09 18:00:55 nit: "configuration"
iannucci 2016/06/15 00:46:00 derp
129 // possibly using the in-memory or memcache version.
130 func loadConfig(c context.Context, cfgName string) (ret *Config, err error) {
131 aid := getTrimmedAppID(c)
132 cfgSvc := config.Get(c)
dnj (Google) 2016/06/09 18:00:56 Note: You allow a nil config service in "init()",
iannucci 2016/06/15 00:46:00 was missing a fallthrough there, added.
133 distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "di stributors.cfg", false)
134 if err != nil {
135 return
136 }
137
138 cfgVersion := distCfgObj.Revision
139 distCfg := &distributor.Config{}
140 if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil {
141 return
142 }
143
144 cfg, ok := distCfg.DistributorConfigs[cfgName]
145 if !ok {
146 err = fmt.Errorf("unknown distributor configuration: %q", cfgNam e)
147 return
148 }
149 if alias := cfg.GetAlias(); alias != nil {
150 cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig]
151 if !ok {
152 err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig)
153 return
154 }
155 if cfg.GetAlias() != nil {
156 err = fmt.Errorf("too many levels of indirection for ali as %q (points to alias %q)", cfgName, alias.OtherConfig)
157 return
158 }
159 }
160
161 dt := cfg.DistributorType
162 if dt == nil {
163 err = fmt.Errorf("blank or unrecognized distributor_type")
164 return
165 }
166 dVal := reflect.ValueOf(dt)
167
168 // All non-nil DistributorType's have a single field which is the actual oneof
169 // value.
170 implConfig := dVal.Elem().Field(0).Interface().(proto.Message)
171
172 inf := info.Get(c)
173 scheme := "https"
174 if inf.IsDevAppServer() {
175 scheme = "http"
176 }
177
178 ret = &Config{
179 scheme + "://" + inf.DefaultVersionHostname() + "/",
dnj (Google) 2016/06/09 18:00:55 I think you should use a net.URL here, then call i
iannucci 2016/06/15 00:46:00 better, changed Config to have a url.URL natively.
180 cfgName,
181 cfgVersion,
182 implConfig,
183 }
184 return
185 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698