Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(183)

Side by Side Diff: server/internal/logdog/service/service.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package service
6
7 import (
8 "errors"
9 "flag"
10 "net/http"
11 "os"
12 "os/signal"
13 "sync"
14
15 "github.com/luci/luci-go/client/authcli"
16 "github.com/luci/luci-go/common/auth"
17 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/logging/gologger"
19 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
20 "github.com/luci/luci-go/server/internal/logdog/config"
21 "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
22 "github.com/luci/luci-go/server/logdog/storage"
23 "github.com/luci/luci-go/server/logdog/storage/bigtable"
24 "golang.org/x/net/context"
25 "google.golang.org/cloud"
26 )
27
28 var (
29 // ErrInvalidConfig is an error that is returned when the supplied
30 // configuration is invalid.
31 ErrInvalidConfig = errors.New("invalid configuration")
32 )
33
34 // Service is a base class full of common LogDog service application parameters.
35 type Service struct {
36 context.Context
37
38 // UserAgent is the user agent string that will be used for service
39 // communication.
40 UserAgent string
41
42 // ShutdownFunc, if not nil, is a function that will be called when a sh utdown
43 // signal is received.
44 ShutdownFunc func()
45
46 // topCancelFunc is the Context cancel function for the top-level applic ation
47 // Context.
48 topCancelFunc func()
49
50 // shutdownMu protects the shutdown variables.
51 shutdownMu sync.Mutex
52 shutdownFunc func()
53 shutdownCount int32
54
55 loggingFlags log.Config
56 authFlags authcli.Flags
57 configFlags config.Flags
58
59 coordinatorURL string
60 storageCredentialJSONPath string
61
62 coord *coordinatorClient.Client
63 config *config.Manager
64 }
65
66 // New instantiates a new Service.
67 func New(c context.Context) *Service {
68 c, cancelFunc := context.WithCancel(c)
69 c = gologger.Use(c)
70
71 return &Service{
72 Context: c,
73 topCancelFunc: cancelFunc,
74 }
75 }
76
77 // AddFlags adds standard service flags to the supplied FlagSet.
78 func (s *Service) AddFlags(fs *flag.FlagSet) {
79 s.loggingFlags.AddFlags(fs)
80 s.authFlags.Register(fs, auth.Options{
81 Context: s,
82 Logger: log.Get(s),
83 })
84 s.configFlags.AddToFlagSet(fs)
85
86 fs.StringVar(&s.coordinatorURL, "coordinator-url", "",
87 "The URL of the Coordinator service to use.")
88 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path ", "",
89 "If supplied, the path of a JSON credential file to load and use for storage operations.")
90 }
91
92 // Run loads the Service's base runtime and invokes the specified run function.
93 func (s *Service) Run(f func() error) error {
94 s.Context = s.loggingFlags.Set(s.Context)
95
96 // Configure our signal handler. It will listen for terminating signals and
97 // issue a shutdown signal if one is received.
98 signalC := make(chan os.Signal)
99 go func() {
100 for sig := range signalC {
101 s.Shutdown()
102 log.Warningf(log.SetField(s, "signal", sig), "Received c lose signal. Send again to terminate immediately.")
103 }
104 }()
105 signal.Notify(signalC, os.Interrupt)
106 defer func() {
107 signal.Stop(signalC)
108 close(signalC)
109 }()
110
111 // Setup our Coordinator client.
112 var err error
113 s.coord, err = s.initCoordinatorClient()
114 if err != nil {
115 log.Errorf(log.SetError(s, err), "Failed to setup Coordinator cl ient.")
116 return err
117 }
118
119 s.config, err = s.initConfig()
120 if err != nil {
121 log.Errorf(log.SetError(s, err), "Failed to setup configuration. ")
122 return err
123 }
124
125 return f()
126 }
127
128 func (s *Service) initCoordinatorClient() (*coordinatorClient.Client, error) {
129 if s.coordinatorURL == "" {
130 log.Errorf(s, "Missing Coordinator URL (-coordinator-url).")
131 return nil, ErrInvalidConfig
132 }
133
134 client, err := s.AuthenticatedClient(func(o *auth.Options) {
135 o.Scopes = coordinatorClient.ServiceScopes
136 })
137 if err != nil {
138 log.Errorf(log.SetError(s, err), "Failed to create Coordinator c lient.")
139 return nil, err
140 }
141
142 return coordinatorClient.New(coordinatorClient.Options{
143 Client: client,
144 BasePath: s.coordinatorURL,
145 UserAgent: s.UserAgent,
146 }), nil
147 }
148
149 func (s *Service) initConfig() (*config.Manager, error) {
150 rt, err := s.AuthenticatedTransport(nil)
151 if err != nil {
152 log.Errorf(log.SetError(s, err), "Failed to create config client .")
153 return nil, err
154 }
155
156 s.configFlags.RoundTripper = rt
157 o, err := s.configFlags.CoordinatorOptions(s, s.coord)
158 if err != nil {
159 log.Errorf(log.SetError(s, err), "Failed to load configuration p arameters.")
160 return nil, err
161 }
162 o.KillFunc = s.Shutdown
163
164 return config.NewManager(s, *o)
165 }
166
167 // Shutdown issues a shutdown signal to the service.
168 func (s *Service) Shutdown() {
169 s.shutdownMu.Lock()
170 defer s.shutdownMu.Unlock()
171
172 if s.shutdownCount > 0 {
173 os.Exit(1)
174 }
175 s.shutdownCount++
176
177 if f := s.shutdownFunc; f != nil {
178 f()
179 } else {
180 s.topCancelFunc()
181 }
182 }
183
184 // SetShutdownFunc sets the service shutdown function.
185 func (s *Service) SetShutdownFunc(f func()) {
186 s.shutdownMu.Lock()
187 defer s.shutdownMu.Unlock()
188 s.shutdownFunc = f
189 }
190
191 // Config returns the cached service configuration.
192 func (s *Service) Config() *svcconfig.Config {
193 return s.config.Config()
194 }
195
196 // Coordinator returns the cached Coordinator client.
197 func (s *Service) Coordinator() *coordinatorClient.Client {
198 return s.coord
199 }
200
201 // Storage instantiates the configured Storage instance.
202 func (s *Service) Storage() (storage.Storage, error) {
203 cfg := s.config.Config()
204 if cfg.GetStorage() == nil {
205 log.Errorf(s, "Missing storage configuration.")
206 return nil, ErrInvalidConfig
207 }
208
209 btcfg := cfg.GetStorage().GetBigtable()
210 if btcfg == nil {
211 log.Errorf(s, "Missing BigTable storage configuration")
212 return nil, ErrInvalidConfig
213 }
214
215 // Initialize Storage authentication.
216 a, err := s.Authenticator(func(o *auth.Options) {
217 o.Scopes = bigtable.StorageScopes
218 if s.storageCredentialJSONPath != "" {
219 o.ServiceAccountJSONPath = s.storageCredentialJSONPath
220 }
221 })
222 if err != nil {
223 log.Errorf(log.SetError(s, err), "Failed to create BigTable Auth enticator.")
224 return nil, err
225 }
226
227 bt, err := bigtable.New(s, bigtable.Options{
228 Project: btcfg.Project,
229 Zone: btcfg.Zone,
230 Cluster: btcfg.Cluster,
231 LogTable: btcfg.LogTableName,
232 ClientOptions: []cloud.ClientOption{
233 cloud.WithTokenSource(a.TokenSource()),
234 },
235 })
236 if err != nil {
237 log.Errorf(log.SetError(s, err), "Failed to create BigTable inst ance.")
238 return nil, err
239 }
240 return bt, nil
241 }
242
243 // Authenticator returns an Authenticator instance. The Authenticator is
244 // configured from a base set of Authenticator Options.
245 //
246 // An optional permutation functon can be provided to modify those Options
247 // before the Authenticator is created.
248 func (s *Service) Authenticator(f func(o *auth.Options)) (*auth.Authenticator, e rror) {
249 authOpts, err := s.authFlags.Options()
250 if err != nil {
251 log.Errorf(log.SetError(s, err), "Failed to create authenticator options.")
252 return nil, ErrInvalidConfig
253 }
254 if f != nil {
255 f(&authOpts)
256 }
257 return auth.NewAuthenticator(auth.SilentLogin, authOpts), nil
258 }
259
260 // AuthenticatedTransport returns an authenticated http.RoundTripper transport.
261 // The transport is configured from a base set of Authenticator Options.
262 //
263 // An optional permutation functon can be provided to modify those Options
264 // before the Authenticator is created.
265 func (s *Service) AuthenticatedTransport(f func(o *auth.Options)) (http.RoundTri pper, error) {
266 a, err := s.Authenticator(f)
267 if err != nil {
268 return nil, err
269 }
270 return a.Transport()
271 }
272
273 // AuthenticatedClient returns an authenticated http.Client. The Client is
274 // configured from a base set of Authenticator Options.
275 //
276 // An optional permutation functon can be provided to modify those Options
277 // before the Authenticator is created.
278 func (s *Service) AuthenticatedClient(f func(o *auth.Options)) (*http.Client, er ror) {
279 a, err := s.Authenticator(f)
280 if err != nil {
281 return nil, err
282 }
283 return a.Client()
284 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698