Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package distributor | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "reflect" | |
| 10 "strings" | |
| 11 | |
| 12 "github.com/golang/protobuf/proto" | |
| 13 "github.com/luci/gae/service/info" | |
| 14 "github.com/luci/luci-go/appengine/tumble" | |
| 15 "github.com/luci/luci-go/common/api/dm/distributor" | |
| 16 "github.com/luci/luci-go/common/api/dm/service/v1" | |
| 17 "github.com/luci/luci-go/common/config" | |
| 18 "github.com/luci/luci-go/common/logging" | |
| 19 "golang.org/x/net/context" | |
| 20 ) | |
| 21 | |
| 22 var regKey = "holds a Registry" | |
|
dnj (Google)
2016/06/09 18:00:56
nit: "holds a DM Distributor Registry"
iannucci
2016/06/15 00:46:00
Done.
| |
| 23 | |
| 24 // WithRegistry adds the registry to the Context. | |
| 25 func WithRegistry(c context.Context, r Registry) context.Context { | |
| 26 if r == nil { | |
| 27 return c | |
|
dnj (Google)
2016/06/09 18:00:56
Perhaps this should panic? I think that this is cl
iannucci
2016/06/15 00:46:00
Done.
| |
| 28 } | |
| 29 return context.WithValue(c, ®Key, r) | |
| 30 } | |
| 31 | |
| 32 // GetRegistry gets the registry from the Context. This will return nil if the | |
| 33 // Context does not contain a Registry. | |
| 34 func GetRegistry(c context.Context) Registry { | |
| 35 ret, _ := c.Value(®Key).(Registry) | |
| 36 return ret | |
| 37 } | |
| 38 | |
| 39 // FinishExecutionFn is required to eliminate a circular dependency | |
| 40 // between mutate <-> distributor. Essentially this just makes a new | |
| 41 // mutate.FinishExecution. | |
| 42 // | |
| 43 // See mutate.FinishExecutionFn for the only actual implementation of this. | |
| 44 type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskR esult) ([]tumble.Mutation, error) | |
| 45 | |
| 46 // Registry holds a collection of all of the available distributor types. | |
| 47 type Registry interface { | |
| 48 FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResul t) ([]tumble.Mutation, error) | |
| 49 | |
| 50 // MakeDistributor builds a distributor instance that's configured with the | |
| 51 // provided config. | |
| 52 // | |
| 53 // The configuration for this distributor are obtained from luci-config at the | |
| 54 // time an Execution is started. | |
| 55 MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) | |
| 56 } | |
| 57 | |
| 58 // NewRegistry builds a new implementation of Registry configured to load | |
| 59 // configuration data from luci-config. | |
| 60 // | |
| 61 // The mapping should hold nil-ptrs of various config protos -> respective | |
| 62 // Factory. When loading from luci-config, when we see a given message type, | |
| 63 // we'll construct the distributor instance using the provided Factory. | |
| 64 func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Regis try { | |
| 65 ret := ®istry{fFn, make(map[reflect.Type]Factory, len(mapping))} | |
| 66 add := func(p proto.Message, factory Factory) { | |
| 67 if factory == nil { | |
| 68 panic("factory is nil") | |
| 69 } | |
| 70 if p == nil { | |
| 71 panic("proto.Message is nil") | |
| 72 } | |
| 73 | |
| 74 typ := reflect.TypeOf(p) | |
| 75 | |
| 76 if _, ok := ret.data[typ]; ok { | |
| 77 panic(fmt.Errorf("trying to register %q twice", typ)) | |
| 78 } | |
| 79 ret.data[typ] = factory | |
| 80 } | |
| 81 for p, f := range mapping { | |
| 82 add(p, f) | |
| 83 } | |
| 84 return ret | |
| 85 } | |
| 86 | |
| 87 type registry struct { | |
| 88 finishExecutionImpl FinishExecutionFn | |
| 89 data map[reflect.Type]Factory | |
| 90 } | |
| 91 | |
| 92 var _ Registry = (*registry)(nil) | |
| 93 | |
| 94 func (r *registry) SetFinishExecutionImpl(i func(context.Context, *dm.Execution_ ID, *TaskResult) ([]tumble.Mutation, error)) { | |
| 95 r.finishExecutionImpl = i | |
|
dnj (Google)
2016/06/09 18:00:55
This seems like it would need locking. But it's no
iannucci
2016/06/15 00:46:00
it was a bogus extra method
| |
| 96 } | |
| 97 | |
| 98 func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) { | |
| 99 return r.finishExecutionImpl(c, eid, rslt) | |
| 100 } | |
| 101 | |
| 102 func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) { | |
| 103 cfg, err := loadConfig(c, cfgName) | |
| 104 if err != nil { | |
| 105 logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed t o load config") | |
| 106 return | |
| 107 } | |
| 108 | |
| 109 ver = cfg.Version | |
| 110 | |
| 111 typ := reflect.TypeOf(cfg.Content) | |
| 112 | |
| 113 fn, ok := r.data[typ] | |
| 114 if !ok { | |
| 115 return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Co ntent) | |
| 116 } | |
| 117 | |
| 118 d, err = fn(c, cfg) | |
| 119 return | |
| 120 } | |
| 121 | |
| 122 func getTrimmedAppID(c context.Context) string { | |
|
dnj (Google)
2016/06/09 18:00:56
Maybe this should be built into gae/info?
iannucci
2016/06/15 00:46:00
yeah probably. Done: https://chromiumcodereview.ap
| |
| 123 // custom domains show up as "foo.com:appid" | |
| 124 toks := strings.Split(info.Get(c).AppID(), ":") | |
| 125 return toks[len(toks)-1] | |
| 126 } | |
| 127 | |
| 128 // loadConfig loads the named distributor configurtaion from luci-config, | |
|
dnj (Google)
2016/06/09 18:00:55
nit: "configuration"
iannucci
2016/06/15 00:46:00
derp
| |
| 129 // possibly using the in-memory or memcache version. | |
| 130 func loadConfig(c context.Context, cfgName string) (ret *Config, err error) { | |
| 131 aid := getTrimmedAppID(c) | |
| 132 cfgSvc := config.Get(c) | |
|
dnj (Google)
2016/06/09 18:00:56
Note: You allow a nil config service in "init()",
iannucci
2016/06/15 00:46:00
was missing a fallthrough there, added.
| |
| 133 distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "di stributors.cfg", false) | |
| 134 if err != nil { | |
| 135 return | |
| 136 } | |
| 137 | |
| 138 cfgVersion := distCfgObj.Revision | |
| 139 distCfg := &distributor.Config{} | |
| 140 if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil { | |
| 141 return | |
| 142 } | |
| 143 | |
| 144 cfg, ok := distCfg.DistributorConfigs[cfgName] | |
| 145 if !ok { | |
| 146 err = fmt.Errorf("unknown distributor configuration: %q", cfgNam e) | |
| 147 return | |
| 148 } | |
| 149 if alias := cfg.GetAlias(); alias != nil { | |
| 150 cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig] | |
| 151 if !ok { | |
| 152 err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig) | |
| 153 return | |
| 154 } | |
| 155 if cfg.GetAlias() != nil { | |
| 156 err = fmt.Errorf("too many levels of indirection for ali as %q (points to alias %q)", cfgName, alias.OtherConfig) | |
| 157 return | |
| 158 } | |
| 159 } | |
| 160 | |
| 161 dt := cfg.DistributorType | |
| 162 if dt == nil { | |
| 163 err = fmt.Errorf("blank or unrecognized distributor_type") | |
| 164 return | |
| 165 } | |
| 166 dVal := reflect.ValueOf(dt) | |
| 167 | |
| 168 // All non-nil DistributorType's have a single field which is the actual oneof | |
| 169 // value. | |
| 170 implConfig := dVal.Elem().Field(0).Interface().(proto.Message) | |
| 171 | |
| 172 inf := info.Get(c) | |
| 173 scheme := "https" | |
| 174 if inf.IsDevAppServer() { | |
| 175 scheme = "http" | |
| 176 } | |
| 177 | |
| 178 ret = &Config{ | |
| 179 scheme + "://" + inf.DefaultVersionHostname() + "/", | |
|
dnj (Google)
2016/06/09 18:00:55
I think you should use a net.URL here, then call i
iannucci
2016/06/15 00:46:00
better, changed Config to have a url.URL natively.
| |
| 180 cfgName, | |
| 181 cfgVersion, | |
| 182 implConfig, | |
| 183 } | |
| 184 return | |
| 185 } | |
| OLD | NEW |