Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1000)

Unified Diff: appengine/logdog/coordinator/service.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/service.go
diff --git a/appengine/logdog/coordinator/service.go b/appengine/logdog/coordinator/service.go
index edfa808f2b4e992862bded1516611674472deeb7..ef351247cdcb4bd49dd52da035b8a2ad81cbb5b3 100644
--- a/appengine/logdog/coordinator/service.go
+++ b/appengine/logdog/coordinator/service.go
@@ -5,65 +5,176 @@
package coordinator
import (
- "errors"
+ "sync"
gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client"
"github.com/luci/luci-go/appengine/logdog/coordinator/config"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
+ "github.com/luci/luci-go/common/gcloud/pubsub"
log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/logdog/svcconfig"
"github.com/luci/luci-go/server/logdog/storage"
+ "github.com/luci/luci-go/server/logdog/storage/bigtable"
"golang.org/x/net/context"
+ "google.golang.org/cloud"
+ gcps "google.golang.org/cloud/pubsub"
+ "google.golang.org/grpc/metadata"
)
-// Service is the base service container for LogDog handlers and endpoints. It
-// is primarily usable as a means of consistently stubbing out various external
-// components.
-type Service struct {
- // StorageFunc is a function that generates an intermediate Storage instance
- // for use by this service. If nil, the production intermediate Storage
- // instance will be used.
+// Services is a set of support services used by Coordinator.
+//
+// Each Services instance is valid for a singel request, but can be re-used
+// throughout that request. This is advised, as the Services instance may
+// optionally cache values.
+//
+// Services methods are goroutine-safe.
+//
+// By default, a production set of services will be used. However, this can be
+// overridden for testing to mock the service layer.
+type Services interface {
dnj 2016/04/11 17:20:04 This is a reconstruction of Service that is more v
+ // Config returns the current instance and application configuration
+ // instances.
//
- // This is provided for testing purposes.
- StorageFunc func(context.Context) (storage.Storage, error)
+ // The production instance will cache the results for the duration of the
+ // request.
+ Config(context.Context) (*config.GlobalConfig, *svcconfig.Config, error)
- // GSClientFunc is a function that generates a Google Storage client instance
- // for use by this service. If nil, the production Google Storage Client will
- // be used.
- GSClientFunc func(context.Context) (gs.Client, error)
+ // Storage returns an intermediate storage instance for use by this service.
+ //
+ // The caller must close the returned instance if successful.
+ IntermediateStorage(context.Context) (storage.Storage, error)
+
+ // GSClient instantiates a Google Storage client.
+ GSClient(context.Context) (gs.Client, error)
+
+ // ArchivalPublisher returns an ArchivalPublisher instance.
+ ArchivalPublisher(context.Context) (ArchivalPublisher, error)
+}
+
+// ServiceBase is an embeddable struct that offers a production Services
+// implementation.
+//
+// Its Services member can be overridden to provide alternative implementations
+// for testing.
+type ServiceBase struct {
+ Services
+}
+
+// GetServices returns a new Services instance.
+func (s *ServiceBase) GetServices() Services {
+ if s.Services != nil {
+ return s.Services
+ }
+ return &prodServices{}
+}
+
+// Request is a Service context for a given request.
+type prodServices struct {
+ sync.Mutex
+
+ // gcfg is the cached global configuration.
+ gcfg *config.GlobalConfig
+ // cfg is the cached configuration.
+ cfg *svcconfig.Config
}
-// Storage retrieves the configured Storage instance.
-func (s *Service) Storage(c context.Context) (storage.Storage, error) {
- sf := s.StorageFunc
- if sf == nil {
- // Production: use BigTable storage.
- sf = config.GetStorage
+// Config returns the current instance and application configuration instances.
+//
+// After a success, successive calls will return a cached result.
+func (s *prodServices) Config(c context.Context) (*config.GlobalConfig, *svcconfig.Config, error) {
+ s.Lock()
+ defer s.Unlock()
+
+ // Load/cache the global config.
+ if s.gcfg == nil {
+ var err error
+ s.gcfg, err = config.LoadGlobalConfig(c)
+ if err != nil {
+ return nil, nil, err
+ }
}
- st, err := sf(c)
+ if s.cfg == nil {
+ var err error
+ s.cfg, err = s.gcfg.LoadConfig(c)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ return s.gcfg, s.cfg, nil
+}
+
+func (s *prodServices) IntermediateStorage(c context.Context) (storage.Storage, error) {
+ gcfg, cfg, err := s.Config(c)
if err != nil {
- log.Errorf(log.SetError(c, err), "Failed to get Storage instance.")
return nil, err
}
- return st, nil
-}
-// GSClient instantiates a Google Storage client.
-func (s *Service) GSClient(c context.Context) (gs.Client, error) {
- f := s.GSClientFunc
- if f == nil {
- f = s.newProdGSClient
+ // Is BigTable configured?
+ if cfg.Storage == nil {
+ return nil, errors.New("no storage configuration")
+ }
+
+ bt := cfg.Storage.GetBigtable()
+ if bt == nil {
+ return nil, errors.New("no BigTable configuration")
+ }
+
+ // Validate the BigTable configuration.
+ log.Fields{
+ "project": bt.Project,
+ "zone": bt.Zone,
+ "cluster": bt.Cluster,
+ "logTableName": bt.LogTableName,
+ }.Debugf(c, "Connecting to BigTable.")
+ var merr errors.MultiError
+ if bt.Project == "" {
+ merr = append(merr, errors.New("missing project"))
+ }
+ if bt.Zone == "" {
+ merr = append(merr, errors.New("missing zone"))
+ }
+ if bt.Cluster == "" {
+ merr = append(merr, errors.New("missing cluster"))
+ }
+ if bt.LogTableName == "" {
+ merr = append(merr, errors.New("missing log table name"))
+ }
+ if len(merr) > 0 {
+ return nil, merr
+ }
+
+ // Get an Authenticator bound to the token scopes that we need for BigTable.
+ a, err := gaeauthClient.Authenticator(c, bigtable.StorageScopes, gcfg.BigTableServiceAccountJSON)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create BigTable authenticator.")
+ return nil, errors.New("failed to create BigTable authenticator")
}
- gsc, err := f(c)
+ // Explicitly clear gRPC metadata from the Context. It is forwarded to
+ // delegate calls by default, and standard request metadata can break BigTable
+ // calls.
+ c = metadata.NewContext(c, nil)
+
+ st, err := bigtable.New(c, bigtable.Options{
+ Project: bt.Project,
+ Zone: bt.Zone,
+ Cluster: bt.Cluster,
+ LogTable: bt.LogTableName,
+ ClientOptions: []cloud.ClientOption{
+ cloud.WithTokenSource(a.TokenSource()),
+ },
+ })
if err != nil {
- log.Errorf(log.SetError(c, err), "Failed to get Google Storage client.")
+ log.WithError(err).Errorf(c, "Failed to create BigTable instance.")
return nil, err
}
- return gsc, nil
+ return st, nil
}
-func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) {
+func (s *prodServices) GSClient(c context.Context) (gs.Client, error) {
// Get an Authenticator bound to the token scopes that we need for
// authenticated Cloud Storage access.
rt, err := gaeauthClient.Transport(c, gs.ReadOnlyScopes, nil)
@@ -73,3 +184,44 @@ func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) {
}
return gs.NewProdClient(c, rt)
}
+
+func (s *prodServices) ArchivalPublisher(c context.Context) (ArchivalPublisher, error) {
dnj 2016/04/11 17:20:04 (Except this, this is new).
+ _, cfg, err := s.Config(c)
+ if err != nil {
+ return nil, err
+ }
+
+ fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic)
+ if err := fullTopic.Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "topic": fullTopic,
+ }.Errorf(c, "Failed to validate archival topic.")
+ return nil, errors.New("invalid archival topic")
+ }
+ project, topic := fullTopic.Split()
+
+ // Create an authenticated Pub/Sub client.
+ // Pub/Sub topic publishing.
+ auth, err := gaeauthClient.Authenticator(c, pubsub.PublisherScopes, nil)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Pub/Sub authenticator.")
+ return nil, errors.New("failed to create Pub/Sub authenticator")
+ }
+
+ client, err := auth.Client()
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Pub/Sub HTTP client.")
+ return nil, errors.New("failed to create Pub/Sub HTTP client")
+ }
+
+ psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client))
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
+ return nil, errors.New("failed to create Pub/Sub client")
+ }
+
+ return &pubsubArchivalPublisher{
+ topic: psClient.Topic(topic),
+ }, nil
+}

Powered by Google App Engine
This is Rietveld 408576698