Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1116)

Unified Diff: appengine/cmd/dm/distributor/registry.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/dm/distributor/pubsub.go ('k') | appengine/cmd/dm/distributor/task_description.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/dm/distributor/registry.go
diff --git a/appengine/cmd/dm/distributor/registry.go b/appengine/cmd/dm/distributor/registry.go
new file mode 100644
index 0000000000000000000000000000000000000000..86a4a18216dd73dca7797e3967bb53bab2a893cf
--- /dev/null
+++ b/appengine/cmd/dm/distributor/registry.go
@@ -0,0 +1,179 @@
+// Copyright 2016 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package distributor
+
+import (
+ "errors"
+ "fmt"
+ "net/url"
+ "reflect"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/gae/service/info"
+ "github.com/luci/luci-go/appengine/tumble"
+ "github.com/luci/luci-go/common/api/dm/distributor"
+ "github.com/luci/luci-go/common/api/dm/service/v1"
+ "github.com/luci/luci-go/common/config"
+ "github.com/luci/luci-go/common/logging"
+ "golang.org/x/net/context"
+)
+
+var regKey = "holds a DM Distributor Registry"
+
+// WithRegistry adds the registry to the Context.
+func WithRegistry(c context.Context, r Registry) context.Context {
+ if r == nil {
+ panic(errors.New("you may not use WithRegistry on a nil Registry"))
+ }
+ return context.WithValue(c, &regKey, r)
+}
+
+// GetRegistry gets the registry from the Context. This will return nil if the
+// Context does not contain a Registry.
+func GetRegistry(c context.Context) Registry {
+ ret, _ := c.Value(&regKey).(Registry)
+ return ret
+}
+
+// FinishExecutionFn is required to eliminate a circular dependency
+// between mutate <-> distributor. Essentially this just makes a new
+// mutate.FinishExecution.
+//
+// See mutate.FinishExecutionFn for the only actual implementation of this.
+type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error)
+
+// Registry holds a collection of all of the available distributor types.
+type Registry interface {
+ FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error)
+
+ // MakeDistributor builds a distributor instance that's configured with the
+ // provided config.
+ //
+ // The configuration for this distributor are obtained from luci-config at the
+ // time an Execution is started.
+ MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error)
+}
+
+// NewRegistry builds a new implementation of Registry configured to load
+// configuration data from luci-config.
+//
+// The mapping should hold nil-ptrs of various config protos -> respective
+// Factory. When loading from luci-config, when we see a given message type,
+// we'll construct the distributor instance using the provided Factory.
+func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Registry {
+ ret := &registry{fFn, make(map[reflect.Type]Factory, len(mapping))}
+ add := func(p proto.Message, factory Factory) {
+ if factory == nil {
+ panic("factory is nil")
+ }
+ if p == nil {
+ panic("proto.Message is nil")
+ }
+
+ typ := reflect.TypeOf(p)
+
+ if _, ok := ret.data[typ]; ok {
+ panic(fmt.Errorf("trying to register %q twice", typ))
+ }
+ ret.data[typ] = factory
+ }
+ for p, f := range mapping {
+ add(p, f)
+ }
+ return ret
+}
+
+type registry struct {
+ finishExecutionImpl FinishExecutionFn
+ data map[reflect.Type]Factory
+}
+
+var _ Registry = (*registry)(nil)
+
+func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) {
+ return r.finishExecutionImpl(c, eid, rslt)
+}
+
+func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) {
+ cfg, err := loadConfig(c, cfgName)
+ if err != nil {
+ logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed to load config")
+ return
+ }
+
+ ver = cfg.Version
+
+ typ := reflect.TypeOf(cfg.Content)
+
+ fn, ok := r.data[typ]
+ if !ok {
+ return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Content)
+ }
+
+ d, err = fn(c, cfg)
+ return
+}
+
+// loadConfig loads the named distributor configuration from luci-config,
+// possibly using the in-memory or memcache version.
+func loadConfig(c context.Context, cfgName string) (ret *Config, err error) {
+ aid := info.Get(c).TrimmedAppID()
+ cfgSvc := config.Get(c)
+ distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "distributors.cfg", false)
+ if err != nil {
+ return
+ }
+
+ cfgVersion := distCfgObj.Revision
+ distCfg := &distributor.Config{}
+ if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil {
+ return
+ }
+
+ cfg, ok := distCfg.DistributorConfigs[cfgName]
+ if !ok {
+ err = fmt.Errorf("unknown distributor configuration: %q", cfgName)
+ return
+ }
+ if alias := cfg.GetAlias(); alias != nil {
+ cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig]
+ if !ok {
+ err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig)
+ return
+ }
+ if cfg.GetAlias() != nil {
+ err = fmt.Errorf("too many levels of indirection for alias %q (points to alias %q)", cfgName, alias.OtherConfig)
+ return
+ }
+ }
+
+ dt := cfg.DistributorType
+ if dt == nil {
+ err = fmt.Errorf("blank or unrecognized distributor_type")
+ return
+ }
+ dVal := reflect.ValueOf(dt)
+
+ // All non-nil DistributorType's have a single field which is the actual oneof
+ // value.
+ implConfig := dVal.Elem().Field(0).Interface().(proto.Message)
+
+ inf := info.Get(c)
+ scheme := "https"
+ if inf.IsDevAppServer() {
+ scheme = "http"
+ }
+
+ ret = &Config{
+ &url.URL{
+ Scheme: scheme,
+ Host: inf.DefaultVersionHostname(),
+ },
+ cfgName,
+ cfgVersion,
+ implConfig,
+ }
+ return
+}
« no previous file with comments | « appengine/cmd/dm/distributor/pubsub.go ('k') | appengine/cmd/dm/distributor/task_description.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698