Index: appengine/cmd/dm/distributor/registry.go |
diff --git a/appengine/cmd/dm/distributor/registry.go b/appengine/cmd/dm/distributor/registry.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a74879a3d3463bee67f89944ef5e5f109a7bfdaf |
--- /dev/null |
+++ b/appengine/cmd/dm/distributor/registry.go |
@@ -0,0 +1,185 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package distributor |
+ |
+import ( |
+ "fmt" |
+ "reflect" |
+ "strings" |
+ |
+ "github.com/golang/protobuf/proto" |
+ "github.com/luci/gae/service/info" |
+ "github.com/luci/luci-go/appengine/tumble" |
+ "github.com/luci/luci-go/common/api/dm/distributor" |
+ "github.com/luci/luci-go/common/api/dm/service/v1" |
+ "github.com/luci/luci-go/common/config" |
+ "github.com/luci/luci-go/common/logging" |
+ "golang.org/x/net/context" |
+) |
+ |
+var regKey = "holds a Registry" |
dnj (Google)
2016/06/09 18:00:56
nit: "holds a DM Distributor Registry"
iannucci
2016/06/15 00:46:00
Done.
|
+ |
+// WithRegistry adds the registry to the Context. |
+func WithRegistry(c context.Context, r Registry) context.Context { |
+ if r == nil { |
+ return c |
dnj (Google)
2016/06/09 18:00:56
Perhaps this should panic? I think that this is cl
iannucci
2016/06/15 00:46:00
Done.
|
+ } |
+ return context.WithValue(c, ®Key, r) |
+} |
+ |
+// GetRegistry gets the registry from the Context. This will return nil if the |
+// Context does not contain a Registry. |
+func GetRegistry(c context.Context) Registry { |
+ ret, _ := c.Value(®Key).(Registry) |
+ return ret |
+} |
+ |
+// FinishExecutionFn is required to eliminate a circular dependency |
+// between mutate <-> distributor. Essentially this just makes a new |
+// mutate.FinishExecution. |
+// |
+// See mutate.FinishExecutionFn for the only actual implementation of this. |
+type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) |
+ |
+// Registry holds a collection of all of the available distributor types. |
+type Registry interface { |
+ FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) |
+ |
+ // MakeDistributor builds a distributor instance that's configured with the |
+ // provided config. |
+ // |
+ // The configuration for this distributor are obtained from luci-config at the |
+ // time an Execution is started. |
+ MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) |
+} |
+ |
+// NewRegistry builds a new implementation of Registry configured to load |
+// configuration data from luci-config. |
+// |
+// The mapping should hold nil-ptrs of various config protos -> respective |
+// Factory. When loading from luci-config, when we see a given message type, |
+// we'll construct the distributor instance using the provided Factory. |
+func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Registry { |
+ ret := ®istry{fFn, make(map[reflect.Type]Factory, len(mapping))} |
+ add := func(p proto.Message, factory Factory) { |
+ if factory == nil { |
+ panic("factory is nil") |
+ } |
+ if p == nil { |
+ panic("proto.Message is nil") |
+ } |
+ |
+ typ := reflect.TypeOf(p) |
+ |
+ if _, ok := ret.data[typ]; ok { |
+ panic(fmt.Errorf("trying to register %q twice", typ)) |
+ } |
+ ret.data[typ] = factory |
+ } |
+ for p, f := range mapping { |
+ add(p, f) |
+ } |
+ return ret |
+} |
+ |
+type registry struct { |
+ finishExecutionImpl FinishExecutionFn |
+ data map[reflect.Type]Factory |
+} |
+ |
+var _ Registry = (*registry)(nil) |
+ |
+func (r *registry) SetFinishExecutionImpl(i func(context.Context, *dm.Execution_ID, *TaskResult) ([]tumble.Mutation, error)) { |
+ r.finishExecutionImpl = i |
dnj (Google)
2016/06/09 18:00:55
This seems like it would need locking. But it's no
iannucci
2016/06/15 00:46:00
it was a bogus extra method
|
+} |
+ |
+func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) { |
+ return r.finishExecutionImpl(c, eid, rslt) |
+} |
+ |
+func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) { |
+ cfg, err := loadConfig(c, cfgName) |
+ if err != nil { |
+ logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to load config") |
+ return |
+ } |
+ |
+ ver = cfg.Version |
+ |
+ typ := reflect.TypeOf(cfg.Content) |
+ |
+ fn, ok := r.data[typ] |
+ if !ok { |
+ return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Content) |
+ } |
+ |
+ d, err = fn(c, cfg) |
+ return |
+} |
+ |
+func getTrimmedAppID(c context.Context) string { |
dnj (Google)
2016/06/09 18:00:56
Maybe this should be built into gae/info?
iannucci
2016/06/15 00:46:00
yeah probably. Done: https://chromiumcodereview.ap
|
+ // custom domains show up as "foo.com:appid" |
+ toks := strings.Split(info.Get(c).AppID(), ":") |
+ return toks[len(toks)-1] |
+} |
+ |
+// loadConfig loads the named distributor configurtaion from luci-config, |
dnj (Google)
2016/06/09 18:00:55
nit: "configuration"
iannucci
2016/06/15 00:46:00
derp
|
+// possibly using the in-memory or memcache version. |
+func loadConfig(c context.Context, cfgName string) (ret *Config, err error) { |
+ aid := getTrimmedAppID(c) |
+ cfgSvc := config.Get(c) |
dnj (Google)
2016/06/09 18:00:56
Note: You allow a nil config service in "init()",
iannucci
2016/06/15 00:46:00
was missing a fallthrough there, added.
|
+ distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "distributors.cfg", false) |
+ if err != nil { |
+ return |
+ } |
+ |
+ cfgVersion := distCfgObj.Revision |
+ distCfg := &distributor.Config{} |
+ if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil { |
+ return |
+ } |
+ |
+ cfg, ok := distCfg.DistributorConfigs[cfgName] |
+ if !ok { |
+ err = fmt.Errorf("unknown distributor configuration: %q", cfgName) |
+ return |
+ } |
+ if alias := cfg.GetAlias(); alias != nil { |
+ cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig] |
+ if !ok { |
+ err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig) |
+ return |
+ } |
+ if cfg.GetAlias() != nil { |
+ err = fmt.Errorf("too many levels of indirection for alias %q (points to alias %q)", cfgName, alias.OtherConfig) |
+ return |
+ } |
+ } |
+ |
+ dt := cfg.DistributorType |
+ if dt == nil { |
+ err = fmt.Errorf("blank or unrecognized distributor_type") |
+ return |
+ } |
+ dVal := reflect.ValueOf(dt) |
+ |
+ // All non-nil DistributorType's have a single field which is the actual oneof |
+ // value. |
+ implConfig := dVal.Elem().Field(0).Interface().(proto.Message) |
+ |
+ inf := info.Get(c) |
+ scheme := "https" |
+ if inf.IsDevAppServer() { |
+ scheme = "http" |
+ } |
+ |
+ ret = &Config{ |
+ scheme + "://" + inf.DefaultVersionHostname() + "/", |
dnj (Google)
2016/06/09 18:00:55
I think you should use a net.URL here, then call i
iannucci
2016/06/15 00:46:00
better, changed Config to have a url.URL natively.
|
+ cfgName, |
+ cfgVersion, |
+ implConfig, |
+ } |
+ return |
+} |