OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package distributor | |
6 | |
7 import ( | |
8 "fmt" | |
9 "reflect" | |
10 "strings" | |
11 | |
12 "github.com/golang/protobuf/proto" | |
13 "github.com/luci/gae/service/info" | |
14 "github.com/luci/luci-go/appengine/tumble" | |
15 "github.com/luci/luci-go/common/api/dm/distributor" | |
16 "github.com/luci/luci-go/common/api/dm/service/v1" | |
17 "github.com/luci/luci-go/common/config" | |
18 "github.com/luci/luci-go/common/logging" | |
19 "golang.org/x/net/context" | |
20 ) | |
21 | |
22 var regKey = "holds a Registry" | |
dnj (Google)
2016/06/09 18:00:56
nit: "holds a DM Distributor Registry"
iannucci
2016/06/15 00:46:00
Done.
| |
23 | |
24 // WithRegistry adds the registry to the Context. | |
25 func WithRegistry(c context.Context, r Registry) context.Context { | |
26 if r == nil { | |
27 return c | |
dnj (Google)
2016/06/09 18:00:56
Perhaps this should panic? I think that this is cl
iannucci
2016/06/15 00:46:00
Done.
| |
28 } | |
29 return context.WithValue(c, ®Key, r) | |
30 } | |
31 | |
32 // GetRegistry gets the registry from the Context. This will return nil if the | |
33 // Context does not contain a Registry. | |
34 func GetRegistry(c context.Context) Registry { | |
35 ret, _ := c.Value(®Key).(Registry) | |
36 return ret | |
37 } | |
38 | |
39 // FinishExecutionFn is required to eliminate a circular dependency | |
40 // between mutate <-> distributor. Essentially this just makes a new | |
41 // mutate.FinishExecution. | |
42 // | |
43 // See mutate.FinishExecutionFn for the only actual implementation of this. | |
44 type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskR esult) ([]tumble.Mutation, error) | |
45 | |
46 // Registry holds a collection of all of the available distributor types. | |
47 type Registry interface { | |
48 FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResul t) ([]tumble.Mutation, error) | |
49 | |
50 // MakeDistributor builds a distributor instance that's configured with the | |
51 // provided config. | |
52 // | |
53 // The configuration for this distributor are obtained from luci-config at the | |
54 // time an Execution is started. | |
55 MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) | |
56 } | |
57 | |
58 // NewRegistry builds a new implementation of Registry configured to load | |
59 // configuration data from luci-config. | |
60 // | |
61 // The mapping should hold nil-ptrs of various config protos -> respective | |
62 // Factory. When loading from luci-config, when we see a given message type, | |
63 // we'll construct the distributor instance using the provided Factory. | |
64 func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Regis try { | |
65 ret := ®istry{fFn, make(map[reflect.Type]Factory, len(mapping))} | |
66 add := func(p proto.Message, factory Factory) { | |
67 if factory == nil { | |
68 panic("factory is nil") | |
69 } | |
70 if p == nil { | |
71 panic("proto.Message is nil") | |
72 } | |
73 | |
74 typ := reflect.TypeOf(p) | |
75 | |
76 if _, ok := ret.data[typ]; ok { | |
77 panic(fmt.Errorf("trying to register %q twice", typ)) | |
78 } | |
79 ret.data[typ] = factory | |
80 } | |
81 for p, f := range mapping { | |
82 add(p, f) | |
83 } | |
84 return ret | |
85 } | |
86 | |
87 type registry struct { | |
88 finishExecutionImpl FinishExecutionFn | |
89 data map[reflect.Type]Factory | |
90 } | |
91 | |
92 var _ Registry = (*registry)(nil) | |
93 | |
94 func (r *registry) SetFinishExecutionImpl(i func(context.Context, *dm.Execution_ ID, *TaskResult) ([]tumble.Mutation, error)) { | |
95 r.finishExecutionImpl = i | |
dnj (Google)
2016/06/09 18:00:55
This seems like it would need locking. But it's no
iannucci
2016/06/15 00:46:00
it was a bogus extra method
| |
96 } | |
97 | |
98 func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) { | |
99 return r.finishExecutionImpl(c, eid, rslt) | |
100 } | |
101 | |
102 func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) { | |
103 cfg, err := loadConfig(c, cfgName) | |
104 if err != nil { | |
105 logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed t o load config") | |
106 return | |
107 } | |
108 | |
109 ver = cfg.Version | |
110 | |
111 typ := reflect.TypeOf(cfg.Content) | |
112 | |
113 fn, ok := r.data[typ] | |
114 if !ok { | |
115 return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Co ntent) | |
116 } | |
117 | |
118 d, err = fn(c, cfg) | |
119 return | |
120 } | |
121 | |
122 func getTrimmedAppID(c context.Context) string { | |
dnj (Google)
2016/06/09 18:00:56
Maybe this should be built into gae/info?
iannucci
2016/06/15 00:46:00
yeah probably. Done: https://chromiumcodereview.ap
| |
123 // custom domains show up as "foo.com:appid" | |
124 toks := strings.Split(info.Get(c).AppID(), ":") | |
125 return toks[len(toks)-1] | |
126 } | |
127 | |
128 // loadConfig loads the named distributor configurtaion from luci-config, | |
dnj (Google)
2016/06/09 18:00:55
nit: "configuration"
iannucci
2016/06/15 00:46:00
derp
| |
129 // possibly using the in-memory or memcache version. | |
130 func loadConfig(c context.Context, cfgName string) (ret *Config, err error) { | |
131 aid := getTrimmedAppID(c) | |
132 cfgSvc := config.Get(c) | |
dnj (Google)
2016/06/09 18:00:56
Note: You allow a nil config service in "init()",
iannucci
2016/06/15 00:46:00
was missing a fallthrough there, added.
| |
133 distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "di stributors.cfg", false) | |
134 if err != nil { | |
135 return | |
136 } | |
137 | |
138 cfgVersion := distCfgObj.Revision | |
139 distCfg := &distributor.Config{} | |
140 if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil { | |
141 return | |
142 } | |
143 | |
144 cfg, ok := distCfg.DistributorConfigs[cfgName] | |
145 if !ok { | |
146 err = fmt.Errorf("unknown distributor configuration: %q", cfgNam e) | |
147 return | |
148 } | |
149 if alias := cfg.GetAlias(); alias != nil { | |
150 cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig] | |
151 if !ok { | |
152 err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig) | |
153 return | |
154 } | |
155 if cfg.GetAlias() != nil { | |
156 err = fmt.Errorf("too many levels of indirection for ali as %q (points to alias %q)", cfgName, alias.OtherConfig) | |
157 return | |
158 } | |
159 } | |
160 | |
161 dt := cfg.DistributorType | |
162 if dt == nil { | |
163 err = fmt.Errorf("blank or unrecognized distributor_type") | |
164 return | |
165 } | |
166 dVal := reflect.ValueOf(dt) | |
167 | |
168 // All non-nil DistributorType's have a single field which is the actual oneof | |
169 // value. | |
170 implConfig := dVal.Elem().Field(0).Interface().(proto.Message) | |
171 | |
172 inf := info.Get(c) | |
173 scheme := "https" | |
174 if inf.IsDevAppServer() { | |
175 scheme = "http" | |
176 } | |
177 | |
178 ret = &Config{ | |
179 scheme + "://" + inf.DefaultVersionHostname() + "/", | |
dnj (Google)
2016/06/09 18:00:55
I think you should use a net.URL here, then call i
iannucci
2016/06/15 00:46:00
better, changed Config to have a url.URL natively.
| |
180 cfgName, | |
181 cfgVersion, | |
182 implConfig, | |
183 } | |
184 return | |
185 } | |
OLD | NEW |