Chromium Code Reviews| Index: appengine/logdog/coordinator/service.go |
| diff --git a/appengine/logdog/coordinator/service.go b/appengine/logdog/coordinator/service.go |
| index edfa808f2b4e992862bded1516611674472deeb7..ef351247cdcb4bd49dd52da035b8a2ad81cbb5b3 100644 |
| --- a/appengine/logdog/coordinator/service.go |
| +++ b/appengine/logdog/coordinator/service.go |
| @@ -5,65 +5,176 @@ |
| package coordinator |
| import ( |
| - "errors" |
| + "sync" |
| gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client" |
| "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| + "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/gcloud/gs" |
| + "github.com/luci/luci-go/common/gcloud/pubsub" |
| log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| "github.com/luci/luci-go/server/logdog/storage" |
| + "github.com/luci/luci-go/server/logdog/storage/bigtable" |
| "golang.org/x/net/context" |
| + "google.golang.org/cloud" |
| + gcps "google.golang.org/cloud/pubsub" |
| + "google.golang.org/grpc/metadata" |
| ) |
| -// Service is the base service container for LogDog handlers and endpoints. It |
| -// is primarily usable as a means of consistently stubbing out various external |
| -// components. |
| -type Service struct { |
| - // StorageFunc is a function that generates an intermediate Storage instance |
| - // for use by this service. If nil, the production intermediate Storage |
| - // instance will be used. |
| +// Services is a set of support services used by Coordinator. |
| +// |
| +// Each Services instance is valid for a singel request, but can be re-used |
| +// throughout that request. This is advised, as the Services instance may |
| +// optionally cache values. |
| +// |
| +// Services methods are goroutine-safe. |
| +// |
| +// By default, a production set of services will be used. However, this can be |
| +// overridden for testing to mock the service layer. |
| +type Services interface { |
|
dnj
2016/04/11 17:20:04
This is a reconstruction of Service that is more v
|
| + // Config returns the current instance and application configuration |
| + // instances. |
| // |
| - // This is provided for testing purposes. |
| - StorageFunc func(context.Context) (storage.Storage, error) |
| + // The production instance will cache the results for the duration of the |
| + // request. |
| + Config(context.Context) (*config.GlobalConfig, *svcconfig.Config, error) |
| - // GSClientFunc is a function that generates a Google Storage client instance |
| - // for use by this service. If nil, the production Google Storage Client will |
| - // be used. |
| - GSClientFunc func(context.Context) (gs.Client, error) |
| + // Storage returns an intermediate storage instance for use by this service. |
| + // |
| + // The caller must close the returned instance if successful. |
| + IntermediateStorage(context.Context) (storage.Storage, error) |
| + |
| + // GSClient instantiates a Google Storage client. |
| + GSClient(context.Context) (gs.Client, error) |
| + |
| + // ArchivalPublisher returns an ArchivalPublisher instance. |
| + ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| +} |
| + |
| +// ServiceBase is an embeddable struct that offers a production Services |
| +// implementation. |
| +// |
| +// Its Services member can be overridden to provide alternative implementations |
| +// for testing. |
| +type ServiceBase struct { |
| + Services |
| +} |
| + |
| +// GetServices returns a new Services instance. |
| +func (s *ServiceBase) GetServices() Services { |
| + if s.Services != nil { |
| + return s.Services |
| + } |
| + return &prodServices{} |
| +} |
| + |
| +// Request is a Service context for a given request. |
| +type prodServices struct { |
| + sync.Mutex |
| + |
| + // gcfg is the cached global configuration. |
| + gcfg *config.GlobalConfig |
| + // cfg is the cached configuration. |
| + cfg *svcconfig.Config |
| } |
| -// Storage retrieves the configured Storage instance. |
| -func (s *Service) Storage(c context.Context) (storage.Storage, error) { |
| - sf := s.StorageFunc |
| - if sf == nil { |
| - // Production: use BigTable storage. |
| - sf = config.GetStorage |
| +// Config returns the current instance and application configuration instances. |
| +// |
| +// After a success, successive calls will return a cached result. |
| +func (s *prodServices) Config(c context.Context) (*config.GlobalConfig, *svcconfig.Config, error) { |
| + s.Lock() |
| + defer s.Unlock() |
| + |
| + // Load/cache the global config. |
| + if s.gcfg == nil { |
| + var err error |
| + s.gcfg, err = config.LoadGlobalConfig(c) |
| + if err != nil { |
| + return nil, nil, err |
| + } |
| } |
| - st, err := sf(c) |
| + if s.cfg == nil { |
| + var err error |
| + s.cfg, err = s.gcfg.LoadConfig(c) |
| + if err != nil { |
| + return nil, nil, err |
| + } |
| + } |
| + |
| + return s.gcfg, s.cfg, nil |
| +} |
| + |
| +func (s *prodServices) IntermediateStorage(c context.Context) (storage.Storage, error) { |
| + gcfg, cfg, err := s.Config(c) |
| if err != nil { |
| - log.Errorf(log.SetError(c, err), "Failed to get Storage instance.") |
| return nil, err |
| } |
| - return st, nil |
| -} |
| -// GSClient instantiates a Google Storage client. |
| -func (s *Service) GSClient(c context.Context) (gs.Client, error) { |
| - f := s.GSClientFunc |
| - if f == nil { |
| - f = s.newProdGSClient |
| + // Is BigTable configured? |
| + if cfg.Storage == nil { |
| + return nil, errors.New("no storage configuration") |
| + } |
| + |
| + bt := cfg.Storage.GetBigtable() |
| + if bt == nil { |
| + return nil, errors.New("no BigTable configuration") |
| + } |
| + |
| + // Validate the BigTable configuration. |
| + log.Fields{ |
| + "project": bt.Project, |
| + "zone": bt.Zone, |
| + "cluster": bt.Cluster, |
| + "logTableName": bt.LogTableName, |
| + }.Debugf(c, "Connecting to BigTable.") |
| + var merr errors.MultiError |
| + if bt.Project == "" { |
| + merr = append(merr, errors.New("missing project")) |
| + } |
| + if bt.Zone == "" { |
| + merr = append(merr, errors.New("missing zone")) |
| + } |
| + if bt.Cluster == "" { |
| + merr = append(merr, errors.New("missing cluster")) |
| + } |
| + if bt.LogTableName == "" { |
| + merr = append(merr, errors.New("missing log table name")) |
| + } |
| + if len(merr) > 0 { |
| + return nil, merr |
| + } |
| + |
| + // Get an Authenticator bound to the token scopes that we need for BigTable. |
| + a, err := gaeauthClient.Authenticator(c, bigtable.StorageScopes, gcfg.BigTableServiceAccountJSON) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to create BigTable authenticator.") |
| + return nil, errors.New("failed to create BigTable authenticator") |
| } |
| - gsc, err := f(c) |
| + // Explicitly clear gRPC metadata from the Context. It is forwarded to |
| + // delegate calls by default, and standard request metadata can break BigTable |
| + // calls. |
| + c = metadata.NewContext(c, nil) |
| + |
| + st, err := bigtable.New(c, bigtable.Options{ |
| + Project: bt.Project, |
| + Zone: bt.Zone, |
| + Cluster: bt.Cluster, |
| + LogTable: bt.LogTableName, |
| + ClientOptions: []cloud.ClientOption{ |
| + cloud.WithTokenSource(a.TokenSource()), |
| + }, |
| + }) |
| if err != nil { |
| - log.Errorf(log.SetError(c, err), "Failed to get Google Storage client.") |
| + log.WithError(err).Errorf(c, "Failed to create BigTable instance.") |
| return nil, err |
| } |
| - return gsc, nil |
| + return st, nil |
| } |
| -func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) { |
| +func (s *prodServices) GSClient(c context.Context) (gs.Client, error) { |
| // Get an Authenticator bound to the token scopes that we need for |
| // authenticated Cloud Storage access. |
| rt, err := gaeauthClient.Transport(c, gs.ReadOnlyScopes, nil) |
| @@ -73,3 +184,44 @@ func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) { |
| } |
| return gs.NewProdClient(c, rt) |
| } |
| + |
| +func (s *prodServices) ArchivalPublisher(c context.Context) (ArchivalPublisher, error) { |
|
dnj
2016/04/11 17:20:04
(Except this, this is new).
|
| + _, cfg, err := s.Config(c) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) |
| + if err := fullTopic.Validate(); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "topic": fullTopic, |
| + }.Errorf(c, "Failed to validate archival topic.") |
| + return nil, errors.New("invalid archival topic") |
| + } |
| + project, topic := fullTopic.Split() |
| + |
| + // Create an authenticated Pub/Sub client. |
| + // Pub/Sub topic publishing. |
| + auth, err := gaeauthClient.Authenticator(c, pubsub.PublisherScopes, nil) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub authenticator.") |
| + return nil, errors.New("failed to create Pub/Sub authenticator") |
| + } |
| + |
| + client, err := auth.Client() |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub HTTP client.") |
| + return nil, errors.New("failed to create Pub/Sub HTTP client") |
| + } |
| + |
| + psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client)) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| + return nil, errors.New("failed to create Pub/Sub client") |
| + } |
| + |
| + return &pubsubArchivalPublisher{ |
| + topic: psClient.Topic(topic), |
| + }, nil |
| +} |