| Index: appengine/logdog/coordinator/service.go
|
| diff --git a/appengine/logdog/coordinator/service.go b/appengine/logdog/coordinator/service.go
|
| index edfa808f2b4e992862bded1516611674472deeb7..ef351247cdcb4bd49dd52da035b8a2ad81cbb5b3 100644
|
| --- a/appengine/logdog/coordinator/service.go
|
| +++ b/appengine/logdog/coordinator/service.go
|
| @@ -5,65 +5,176 @@
|
| package coordinator
|
|
|
| import (
|
| - "errors"
|
| + "sync"
|
|
|
| gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator/config"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| + "github.com/luci/luci-go/common/gcloud/pubsub"
|
| log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| "github.com/luci/luci-go/server/logdog/storage"
|
| + "github.com/luci/luci-go/server/logdog/storage/bigtable"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/cloud"
|
| + gcps "google.golang.org/cloud/pubsub"
|
| + "google.golang.org/grpc/metadata"
|
| )
|
|
|
| -// Service is the base service container for LogDog handlers and endpoints. It
|
| -// is primarily usable as a means of consistently stubbing out various external
|
| -// components.
|
| -type Service struct {
|
| - // StorageFunc is a function that generates an intermediate Storage instance
|
| - // for use by this service. If nil, the production intermediate Storage
|
| - // instance will be used.
|
| +// Services is a set of support services used by Coordinator.
|
| +//
|
| +// Each Services instance is valid for a singel request, but can be re-used
|
| +// throughout that request. This is advised, as the Services instance may
|
| +// optionally cache values.
|
| +//
|
| +// Services methods are goroutine-safe.
|
| +//
|
| +// By default, a production set of services will be used. However, this can be
|
| +// overridden for testing to mock the service layer.
|
| +type Services interface {
|
| + // Config returns the current instance and application configuration
|
| + // instances.
|
| //
|
| - // This is provided for testing purposes.
|
| - StorageFunc func(context.Context) (storage.Storage, error)
|
| + // The production instance will cache the results for the duration of the
|
| + // request.
|
| + Config(context.Context) (*config.GlobalConfig, *svcconfig.Config, error)
|
|
|
| - // GSClientFunc is a function that generates a Google Storage client instance
|
| - // for use by this service. If nil, the production Google Storage Client will
|
| - // be used.
|
| - GSClientFunc func(context.Context) (gs.Client, error)
|
| + // Storage returns an intermediate storage instance for use by this service.
|
| + //
|
| + // The caller must close the returned instance if successful.
|
| + IntermediateStorage(context.Context) (storage.Storage, error)
|
| +
|
| + // GSClient instantiates a Google Storage client.
|
| + GSClient(context.Context) (gs.Client, error)
|
| +
|
| + // ArchivalPublisher returns an ArchivalPublisher instance.
|
| + ArchivalPublisher(context.Context) (ArchivalPublisher, error)
|
| +}
|
| +
|
| +// ServiceBase is an embeddable struct that offers a production Services
|
| +// implementation.
|
| +//
|
| +// Its Services member can be overridden to provide alternative implementations
|
| +// for testing.
|
| +type ServiceBase struct {
|
| + Services
|
| +}
|
| +
|
| +// GetServices returns a new Services instance.
|
| +func (s *ServiceBase) GetServices() Services {
|
| + if s.Services != nil {
|
| + return s.Services
|
| + }
|
| + return &prodServices{}
|
| +}
|
| +
|
| +// Request is a Service context for a given request.
|
| +type prodServices struct {
|
| + sync.Mutex
|
| +
|
| + // gcfg is the cached global configuration.
|
| + gcfg *config.GlobalConfig
|
| + // cfg is the cached configuration.
|
| + cfg *svcconfig.Config
|
| }
|
|
|
| -// Storage retrieves the configured Storage instance.
|
| -func (s *Service) Storage(c context.Context) (storage.Storage, error) {
|
| - sf := s.StorageFunc
|
| - if sf == nil {
|
| - // Production: use BigTable storage.
|
| - sf = config.GetStorage
|
| +// Config returns the current instance and application configuration instances.
|
| +//
|
| +// After a success, successive calls will return a cached result.
|
| +func (s *prodServices) Config(c context.Context) (*config.GlobalConfig, *svcconfig.Config, error) {
|
| + s.Lock()
|
| + defer s.Unlock()
|
| +
|
| + // Load/cache the global config.
|
| + if s.gcfg == nil {
|
| + var err error
|
| + s.gcfg, err = config.LoadGlobalConfig(c)
|
| + if err != nil {
|
| + return nil, nil, err
|
| + }
|
| }
|
|
|
| - st, err := sf(c)
|
| + if s.cfg == nil {
|
| + var err error
|
| + s.cfg, err = s.gcfg.LoadConfig(c)
|
| + if err != nil {
|
| + return nil, nil, err
|
| + }
|
| + }
|
| +
|
| + return s.gcfg, s.cfg, nil
|
| +}
|
| +
|
| +func (s *prodServices) IntermediateStorage(c context.Context) (storage.Storage, error) {
|
| + gcfg, cfg, err := s.Config(c)
|
| if err != nil {
|
| - log.Errorf(log.SetError(c, err), "Failed to get Storage instance.")
|
| return nil, err
|
| }
|
| - return st, nil
|
| -}
|
|
|
| -// GSClient instantiates a Google Storage client.
|
| -func (s *Service) GSClient(c context.Context) (gs.Client, error) {
|
| - f := s.GSClientFunc
|
| - if f == nil {
|
| - f = s.newProdGSClient
|
| + // Is BigTable configured?
|
| + if cfg.Storage == nil {
|
| + return nil, errors.New("no storage configuration")
|
| + }
|
| +
|
| + bt := cfg.Storage.GetBigtable()
|
| + if bt == nil {
|
| + return nil, errors.New("no BigTable configuration")
|
| + }
|
| +
|
| + // Validate the BigTable configuration.
|
| + log.Fields{
|
| + "project": bt.Project,
|
| + "zone": bt.Zone,
|
| + "cluster": bt.Cluster,
|
| + "logTableName": bt.LogTableName,
|
| + }.Debugf(c, "Connecting to BigTable.")
|
| + var merr errors.MultiError
|
| + if bt.Project == "" {
|
| + merr = append(merr, errors.New("missing project"))
|
| + }
|
| + if bt.Zone == "" {
|
| + merr = append(merr, errors.New("missing zone"))
|
| + }
|
| + if bt.Cluster == "" {
|
| + merr = append(merr, errors.New("missing cluster"))
|
| + }
|
| + if bt.LogTableName == "" {
|
| + merr = append(merr, errors.New("missing log table name"))
|
| + }
|
| + if len(merr) > 0 {
|
| + return nil, merr
|
| + }
|
| +
|
| + // Get an Authenticator bound to the token scopes that we need for BigTable.
|
| + a, err := gaeauthClient.Authenticator(c, bigtable.StorageScopes, gcfg.BigTableServiceAccountJSON)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create BigTable authenticator.")
|
| + return nil, errors.New("failed to create BigTable authenticator")
|
| }
|
|
|
| - gsc, err := f(c)
|
| + // Explicitly clear gRPC metadata from the Context. It is forwarded to
|
| + // delegate calls by default, and standard request metadata can break BigTable
|
| + // calls.
|
| + c = metadata.NewContext(c, nil)
|
| +
|
| + st, err := bigtable.New(c, bigtable.Options{
|
| + Project: bt.Project,
|
| + Zone: bt.Zone,
|
| + Cluster: bt.Cluster,
|
| + LogTable: bt.LogTableName,
|
| + ClientOptions: []cloud.ClientOption{
|
| + cloud.WithTokenSource(a.TokenSource()),
|
| + },
|
| + })
|
| if err != nil {
|
| - log.Errorf(log.SetError(c, err), "Failed to get Google Storage client.")
|
| + log.WithError(err).Errorf(c, "Failed to create BigTable instance.")
|
| return nil, err
|
| }
|
| - return gsc, nil
|
| + return st, nil
|
| }
|
|
|
| -func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) {
|
| +func (s *prodServices) GSClient(c context.Context) (gs.Client, error) {
|
| // Get an Authenticator bound to the token scopes that we need for
|
| // authenticated Cloud Storage access.
|
| rt, err := gaeauthClient.Transport(c, gs.ReadOnlyScopes, nil)
|
| @@ -73,3 +184,44 @@ func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) {
|
| }
|
| return gs.NewProdClient(c, rt)
|
| }
|
| +
|
| +func (s *prodServices) ArchivalPublisher(c context.Context) (ArchivalPublisher, error) {
|
| + _, cfg, err := s.Config(c)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic)
|
| + if err := fullTopic.Validate(); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "topic": fullTopic,
|
| + }.Errorf(c, "Failed to validate archival topic.")
|
| + return nil, errors.New("invalid archival topic")
|
| + }
|
| + project, topic := fullTopic.Split()
|
| +
|
| + // Create an authenticated Pub/Sub client.
|
| + // Pub/Sub topic publishing.
|
| + auth, err := gaeauthClient.Authenticator(c, pubsub.PublisherScopes, nil)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub authenticator.")
|
| + return nil, errors.New("failed to create Pub/Sub authenticator")
|
| + }
|
| +
|
| + client, err := auth.Client()
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub HTTP client.")
|
| + return nil, errors.New("failed to create Pub/Sub HTTP client")
|
| + }
|
| +
|
| + psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client))
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
|
| + return nil, errors.New("failed to create Pub/Sub client")
|
| + }
|
| +
|
| + return &pubsubArchivalPublisher{
|
| + topic: psClient.Topic(topic),
|
| + }, nil
|
| +}
|
|
|