Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(29)

Side by Side Diff: server/internal/logdog/service/service.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package service
6
7 import (
8 "errors"
9 "flag"
10 "net/http"
11 "os"
12 "os/signal"
13 "sync"
14
15 "github.com/luci/luci-go/client/authcli"
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
17 "github.com/luci/luci-go/common/auth"
18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/logging/gologger"
20 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
21 "github.com/luci/luci-go/common/prpc"
22 "github.com/luci/luci-go/server/internal/logdog/config"
23 "github.com/luci/luci-go/server/internal/logdog/retryServicesClient"
24 "github.com/luci/luci-go/server/logdog/storage"
25 "github.com/luci/luci-go/server/logdog/storage/bigtable"
26 "golang.org/x/net/context"
27 "google.golang.org/cloud"
28 )
29
30 var (
31 // ErrInvalidConfig is an error that is returned when the supplied
32 // configuration is invalid.
33 ErrInvalidConfig = errors.New("invalid configuration")
34
35 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina tor
36 // client.
37 CoordinatorScopes = []string{
38 auth.OAuthScopeEmail,
39 }
40 )
41
42 // Service is a base class full of common LogDog service application parameters.
43 type Service struct {
44 context.Context
45
46 // UserAgent is the user agent string that will be used for service
47 // communication.
48 UserAgent string
49
50 // ShutdownFunc, if not nil, is a function that will be called when a sh utdown
51 // signal is received.
52 ShutdownFunc func()
53
54 // topCancelFunc is the Context cancel function for the top-level applic ation
55 // Context.
56 topCancelFunc func()
57
58 // shutdownMu protects the shutdown variables.
59 shutdownMu sync.Mutex
60 shutdownFunc func()
61 shutdownCount int32
62
63 loggingFlags log.Config
64 authFlags authcli.Flags
65 configFlags config.Flags
66
67 coordinatorHost string
68 coordinatorInsecure bool
69 storageCredentialJSONPath string
70
71 coord services.ServicesClient
72 config *config.Manager
73 }
74
75 // New instantiates a new Service.
76 func New(c context.Context) *Service {
77 c, cancelFunc := context.WithCancel(c)
78 c = gologger.Use(c)
79
80 return &Service{
81 Context: c,
82 topCancelFunc: cancelFunc,
83 }
84 }
85
86 // AddFlags adds standard service flags to the supplied FlagSet.
87 func (s *Service) AddFlags(fs *flag.FlagSet) {
88 s.loggingFlags.AddFlags(fs)
89 s.authFlags.Register(fs, auth.Options{
90 Context: s,
91 Logger: log.Get(s),
92 })
93 s.configFlags.AddToFlagSet(fs)
94
95 fs.StringVar(&s.coordinatorHost, "coordinator", "",
96 "The Coordinator service's [host][:port].")
97 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false,
98 "Connect to Coordinator over HTTP (instead of HTTPS).")
99 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path ", "",
100 "If supplied, the path of a JSON credential file to load and use for storage operations.")
101 }
102
103 // Run loads the Service's base runtime and invokes the specified run function.
104 func (s *Service) Run(f func() error) error {
105 s.Context = s.loggingFlags.Set(s.Context)
106
107 // Configure our signal handler. It will listen for terminating signals and
108 // issue a shutdown signal if one is received.
109 signalC := make(chan os.Signal)
110 go func() {
111 for sig := range signalC {
112 s.Shutdown()
113 log.Warningf(log.SetField(s, "signal", sig), "Received c lose signal. Send again to terminate immediately.")
114 }
115 }()
116 signal.Notify(signalC, os.Interrupt)
117 defer func() {
118 signal.Stop(signalC)
119 close(signalC)
120 }()
121
122 // Initialize our Client instantiations.
123 var err error
124 s.coord, err = s.initCoordinatorClient()
125 if err != nil {
126 log.Errorf(log.SetError(s, err), "Failed to setup Coordinator cl ient.")
127 return err
128 }
129
130 s.config, err = s.initConfig()
131 if err != nil {
132 log.Errorf(log.SetError(s, err), "Failed to setup configuration. ")
133 return err
134 }
135
136 return f()
137 }
138
139 func (s *Service) initCoordinatorClient() (services.ServicesClient, error) {
140 if s.coordinatorHost == "" {
141 log.Errorf(s, "Missing Coordinator URL (-coordinator).")
142 return nil, ErrInvalidConfig
143 }
144
145 httpClient, err := s.AuthenticatedClient(func(o *auth.Options) {
146 o.Scopes = CoordinatorScopes
147 })
148 if err != nil {
149 log.Errorf(log.SetError(s, err), "Failed to create authenticated client.")
150 return nil, err
151 }
152
153 prpcClient := prpc.Client{
154 C: httpClient,
155 Host: s.coordinatorHost,
156 Options: prpc.DefaultOptions(),
157 }
158 if s.coordinatorInsecure {
159 prpcClient.Options.Insecure = true
160 }
161 sc := services.NewServicesPRPCClient(&prpcClient)
162
163 // Wrap the resulting client in a retry harness.
164 return retryServicesClient.New(sc, nil), nil
165 }
166
167 func (s *Service) initConfig() (*config.Manager, error) {
168 rt, err := s.AuthenticatedTransport(nil)
169 if err != nil {
170 log.Errorf(log.SetError(s, err), "Failed to create config client .")
171 return nil, err
172 }
173
174 s.configFlags.RoundTripper = rt
175 o, err := s.configFlags.CoordinatorOptions(s, s.coord)
176 if err != nil {
177 log.Errorf(log.SetError(s, err), "Failed to load configuration p arameters.")
178 return nil, err
179 }
180 o.KillFunc = s.Shutdown
181
182 return config.NewManager(s, *o)
183 }
184
185 // Shutdown issues a shutdown signal to the service.
186 func (s *Service) Shutdown() {
187 s.shutdownMu.Lock()
188 defer s.shutdownMu.Unlock()
189
190 if s.shutdownCount > 0 {
191 os.Exit(1)
192 }
193 s.shutdownCount++
194
195 if f := s.shutdownFunc; f != nil {
196 f()
197 } else {
198 s.topCancelFunc()
199 }
200 }
201
202 // SetShutdownFunc sets the service shutdown function.
203 func (s *Service) SetShutdownFunc(f func()) {
204 s.shutdownMu.Lock()
205 defer s.shutdownMu.Unlock()
206 s.shutdownFunc = f
207 }
208
209 // Config returns the cached service configuration.
210 func (s *Service) Config() *svcconfig.Config {
211 return s.config.Config()
212 }
213
214 // Coordinator returns the cached Coordinator client.
215 func (s *Service) Coordinator() services.ServicesClient {
216 return s.coord
217 }
218
219 // Storage instantiates the configured Storage instance.
220 func (s *Service) Storage() (storage.Storage, error) {
221 cfg := s.config.Config()
222 if cfg.GetStorage() == nil {
223 log.Errorf(s, "Missing storage configuration.")
224 return nil, ErrInvalidConfig
225 }
226
227 btcfg := cfg.GetStorage().GetBigtable()
228 if btcfg == nil {
229 log.Errorf(s, "Missing BigTable storage configuration")
230 return nil, ErrInvalidConfig
231 }
232
233 // Initialize Storage authentication.
234 a, err := s.Authenticator(func(o *auth.Options) {
235 o.Scopes = bigtable.StorageScopes
236 if s.storageCredentialJSONPath != "" {
237 o.ServiceAccountJSONPath = s.storageCredentialJSONPath
238 }
239 })
240 if err != nil {
241 log.Errorf(log.SetError(s, err), "Failed to create BigTable Auth enticator.")
242 return nil, err
243 }
244
245 bt, err := bigtable.New(s, bigtable.Options{
246 Project: btcfg.Project,
247 Zone: btcfg.Zone,
248 Cluster: btcfg.Cluster,
249 LogTable: btcfg.LogTableName,
250 ClientOptions: []cloud.ClientOption{
251 cloud.WithTokenSource(a.TokenSource()),
252 },
253 })
254 if err != nil {
255 log.Errorf(log.SetError(s, err), "Failed to create BigTable inst ance.")
256 return nil, err
257 }
258 return bt, nil
259 }
260
261 // Authenticator returns an Authenticator instance. The Authenticator is
262 // configured from a base set of Authenticator Options.
263 //
264 // An optional permutation functon can be provided to modify those Options
265 // before the Authenticator is created.
266 func (s *Service) Authenticator(f func(o *auth.Options)) (*auth.Authenticator, e rror) {
267 authOpts, err := s.authFlags.Options()
268 if err != nil {
269 log.Errorf(log.SetError(s, err), "Failed to create authenticator options.")
270 return nil, ErrInvalidConfig
271 }
272 if f != nil {
273 f(&authOpts)
274 }
275 return auth.NewAuthenticator(auth.SilentLogin, authOpts), nil
276 }
277
278 // AuthenticatedTransport returns an authenticated http.RoundTripper transport.
279 // The transport is configured from a base set of Authenticator Options.
280 //
281 // An optional permutation functon can be provided to modify those Options
282 // before the Authenticator is created.
283 func (s *Service) AuthenticatedTransport(f func(o *auth.Options)) (http.RoundTri pper, error) {
284 a, err := s.Authenticator(f)
285 if err != nil {
286 return nil, err
287 }
288 return a.Transport()
289 }
290
291 // AuthenticatedClient returns an authenticated http.Client. The Client is
292 // configured from a base set of Authenticator Options.
293 //
294 // An optional permutation functon can be provided to modify those Options
295 // before the Authenticator is created.
296 func (s *Service) AuthenticatedClient(f func(o *auth.Options)) (*http.Client, er ror) {
297 a, err := s.Authenticator(f)
298 if err != nil {
299 return nil, err
300 }
301 return a.Client()
302 }
OLDNEW
« no previous file with comments | « server/internal/logdog/retryServicesClient/client.go ('k') | server/logdog/storage/bigtable/bigtable.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698