Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(255)

Side by Side Diff: appengine/cmd/dm/distributor/registry.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package distributor
6
7 import (
8 "errors"
9 "fmt"
10 "net/url"
11 "reflect"
12
13 "github.com/golang/protobuf/proto"
14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/appengine/tumble"
16 "github.com/luci/luci-go/common/api/dm/distributor"
17 "github.com/luci/luci-go/common/api/dm/service/v1"
18 "github.com/luci/luci-go/common/config"
19 "github.com/luci/luci-go/common/logging"
20 "golang.org/x/net/context"
21 )
22
23 var regKey = "holds a DM Distributor Registry"
24
25 // WithRegistry adds the registry to the Context.
26 func WithRegistry(c context.Context, r Registry) context.Context {
27 if r == nil {
28 panic(errors.New("you may not use WithRegistry on a nil Registry "))
29 }
30 return context.WithValue(c, &regKey, r)
31 }
32
33 // GetRegistry gets the registry from the Context. This will return nil if the
34 // Context does not contain a Registry.
35 func GetRegistry(c context.Context) Registry {
36 ret, _ := c.Value(&regKey).(Registry)
37 return ret
38 }
39
40 // FinishExecutionFn is required to eliminate a circular dependency
41 // between mutate <-> distributor. Essentially this just makes a new
42 // mutate.FinishExecution.
43 //
44 // See mutate.FinishExecutionFn for the only actual implementation of this.
45 type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *TaskR esult) ([]tumble.Mutation, error)
46
47 // Registry holds a collection of all of the available distributor types.
48 type Registry interface {
49 FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResul t) ([]tumble.Mutation, error)
50
51 // MakeDistributor builds a distributor instance that's configured with the
52 // provided config.
53 //
54 // The configuration for this distributor are obtained from luci-config at the
55 // time an Execution is started.
56 MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error)
57 }
58
59 // NewRegistry builds a new implementation of Registry configured to load
60 // configuration data from luci-config.
61 //
62 // The mapping should hold nil-ptrs of various config protos -> respective
63 // Factory. When loading from luci-config, when we see a given message type,
64 // we'll construct the distributor instance using the provided Factory.
65 func NewRegistry(mapping map[proto.Message]Factory, fFn FinishExecutionFn) Regis try {
66 ret := &registry{fFn, make(map[reflect.Type]Factory, len(mapping))}
67 add := func(p proto.Message, factory Factory) {
68 if factory == nil {
69 panic("factory is nil")
70 }
71 if p == nil {
72 panic("proto.Message is nil")
73 }
74
75 typ := reflect.TypeOf(p)
76
77 if _, ok := ret.data[typ]; ok {
78 panic(fmt.Errorf("trying to register %q twice", typ))
79 }
80 ret.data[typ] = factory
81 }
82 for p, f := range mapping {
83 add(p, f)
84 }
85 return ret
86 }
87
88 type registry struct {
89 finishExecutionImpl FinishExecutionFn
90 data map[reflect.Type]Factory
91 }
92
93 var _ Registry = (*registry)(nil)
94
95 func (r *registry) FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *TaskResult) ([]tumble.Mutation, error) {
96 return r.finishExecutionImpl(c, eid, rslt)
97 }
98
99 func (r *registry) MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error) {
100 cfg, err := loadConfig(c, cfgName)
101 if err != nil {
102 logging.Fields{"error": err, "cfg": cfgName}.Errorf(c, "Failed t o load config")
103 return
104 }
105
106 ver = cfg.Version
107
108 typ := reflect.TypeOf(cfg.Content)
109
110 fn, ok := r.data[typ]
111 if !ok {
112 return nil, "", fmt.Errorf("unknown distributor type %T", cfg.Co ntent)
113 }
114
115 d, err = fn(c, cfg)
116 return
117 }
118
119 // loadConfig loads the named distributor configuration from luci-config,
120 // possibly using the in-memory or memcache version.
121 func loadConfig(c context.Context, cfgName string) (ret *Config, err error) {
122 aid := info.Get(c).TrimmedAppID()
123 cfgSvc := config.Get(c)
124 distCfgObj, err := cfgSvc.GetConfig(fmt.Sprintf("services/%s", aid), "di stributors.cfg", false)
125 if err != nil {
126 return
127 }
128
129 cfgVersion := distCfgObj.Revision
130 distCfg := &distributor.Config{}
131 if err = proto.UnmarshalText(distCfgObj.Content, distCfg); err != nil {
132 return
133 }
134
135 cfg, ok := distCfg.DistributorConfigs[cfgName]
136 if !ok {
137 err = fmt.Errorf("unknown distributor configuration: %q", cfgNam e)
138 return
139 }
140 if alias := cfg.GetAlias(); alias != nil {
141 cfg, ok = distCfg.DistributorConfigs[alias.OtherConfig]
142 if !ok {
143 err = fmt.Errorf("unknown distributor configuration: %q (via alias %q)", cfgName, alias.OtherConfig)
144 return
145 }
146 if cfg.GetAlias() != nil {
147 err = fmt.Errorf("too many levels of indirection for ali as %q (points to alias %q)", cfgName, alias.OtherConfig)
148 return
149 }
150 }
151
152 dt := cfg.DistributorType
153 if dt == nil {
154 err = fmt.Errorf("blank or unrecognized distributor_type")
155 return
156 }
157 dVal := reflect.ValueOf(dt)
158
159 // All non-nil DistributorType's have a single field which is the actual oneof
160 // value.
161 implConfig := dVal.Elem().Field(0).Interface().(proto.Message)
162
163 inf := info.Get(c)
164 scheme := "https"
165 if inf.IsDevAppServer() {
166 scheme = "http"
167 }
168
169 ret = &Config{
170 &url.URL{
171 Scheme: scheme,
172 Host: inf.DefaultVersionHostname(),
173 },
174 cfgName,
175 cfgVersion,
176 implConfig,
177 }
178 return
179 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/distributor/pubsub.go ('k') | appengine/cmd/dm/distributor/task_description.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698