| Index: server/cmd/logdog_archivist/main.go
|
| diff --git a/server/cmd/logdog_archivist/main.go b/server/cmd/logdog_archivist/main.go
|
| index ecb0609cb3af8d158d3908b72d6a7fabacadf045..a008454d246b62501c11fffdef0ad73e1de72f5a 100644
|
| --- a/server/cmd/logdog_archivist/main.go
|
| +++ b/server/cmd/logdog_archivist/main.go
|
| @@ -5,21 +5,33 @@
|
| package main
|
|
|
| import (
|
| + "io"
|
| + "time"
|
| +
|
| "github.com/luci/luci-go/common/auth"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| + "github.com/luci/luci-go/common/gcloud/pubsub"
|
| log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/parallel"
|
| "github.com/luci/luci-go/server/internal/logdog/archivist"
|
| "github.com/luci/luci-go/server/internal/logdog/service"
|
| - "github.com/luci/luci-go/server/taskqueueClient"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/cloud"
|
| + gcps "google.golang.org/cloud/pubsub"
|
| )
|
|
|
| var (
|
| errInvalidConfig = errors.New("invalid configuration")
|
| )
|
|
|
| +const (
|
| + // subscriptionErrorDelay is the amount of time to sleep after a subscription
|
| + // iterator returns a non-terminal error.
|
| + subscriptionErrorDelay = 10 * time.Second
|
| +)
|
| +
|
| // application is the Archivist application state.
|
| type application struct {
|
| service.Service
|
| @@ -33,42 +45,78 @@ func (a *application) runArchivist(c context.Context) error {
|
| switch {
|
| case coordCfg == nil:
|
| fallthrough
|
| - case coordCfg.Project == "":
|
| - return errors.New("missing coordinator project name")
|
| - case coordCfg.ArchiveTaskQueue == "":
|
| - return errors.New("missing archive task queue name")
|
|
|
| case acfg == nil:
|
| return errors.New("missing Archivist configuration")
|
| case acfg.GsBase == "":
|
| return errors.New("missing archive GS bucket")
|
| + case acfg.GsStagingBase == "":
|
| + return errors.New("missing archive staging GS bucket")
|
| }
|
|
|
| - // Construct and validate our GS base.
|
| + // Construct and validate our GS bases.
|
| gsBase := gs.Path(acfg.GsBase)
|
| if gsBase.Bucket() == "" {
|
| log.Fields{
|
| - "gsBase": acfg.GsBase,
|
| + "value": gsBase,
|
| }.Errorf(c, "Google Storage base does not include a bucket name.")
|
| return errors.New("invalid Google Storage base")
|
| }
|
|
|
| - // Initialize task queue client.
|
| - tqClient, err := a.AuthenticatedClient(func(o *auth.Options) {
|
| - o.Scopes = taskqueueClient.Scopes
|
| + gsStagingBase := gs.Path(acfg.GsStagingBase)
|
| + if gsStagingBase.Bucket() == "" {
|
| + log.Fields{
|
| + "value": gsStagingBase,
|
| + }.Errorf(c, "Google Storage staging base does not include a bucket name.")
|
| + return errors.New("invalid Google Storage staging base")
|
| + }
|
| +
|
| + // Initialize Pub/Sub client.
|
| + //
|
| + // We will initialize both an authenticated Client instance and an
|
| + // authenticated Context, since we need the latter for raw ACK deadline
|
| + // updates.
|
| + taskSub := pubsub.Subscription(acfg.Subscription)
|
| + if err := taskSub.Validate(); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "value": taskSub,
|
| + }.Errorf(c, "Task subscription did not validate.")
|
| + return errors.New("invalid task subscription name")
|
| + }
|
| + psProject, psSubscriptionName := taskSub.Split()
|
| +
|
| + psAuth, err := a.Authenticator(func(o *auth.Options) {
|
| + o.Scopes = pubsub.SubscriberScopes
|
| })
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to get task queue client.")
|
| + log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticator.")
|
| + return err
|
| + }
|
| +
|
| + // Pub/Sub: HTTP Client => Context
|
| + psHTTPClient, err := psAuth.Client()
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create authenticated Pub/Sub transport.")
|
| + return err
|
| + }
|
| + psContext := cloud.WithContext(c, psProject, psHTTPClient)
|
| +
|
| + // Pub/Sub: TokenSource => Client
|
| + psClient, err := gcps.NewClient(c, psProject, cloud.WithTokenSource(psAuth.TokenSource()))
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
|
| return err
|
| }
|
| + sub := psClient.Subscription(psSubscriptionName)
|
|
|
| // Initialize our Storage.
|
| - s, err := a.IntermediateStorage(c)
|
| + st, err := a.IntermediateStorage(c)
|
| if err != nil {
|
| log.WithError(err).Errorf(c, "Failed to get storage instance.")
|
| return err
|
| }
|
| - defer s.Close()
|
| + defer st.Close()
|
|
|
| // Initialize our Google Storage client.
|
| gsClient, err := a.GSClient(c)
|
| @@ -78,64 +126,91 @@ func (a *application) runArchivist(c context.Context) error {
|
| }
|
| defer gsClient.Close()
|
|
|
| - // Application shutdown will now operate by cancelling the Collector's
|
| - // shutdown Context.
|
| - shutdownCtx, shutdownFunc := context.WithCancel(c)
|
| - a.SetShutdownFunc(shutdownFunc)
|
| -
|
| ar := archivist.Archivist{
|
| Service: a.Coordinator(),
|
| - Storage: s,
|
| + Storage: st,
|
| GSClient: gsClient,
|
|
|
| GSBase: gsBase,
|
| + GSStagingBase: gsStagingBase,
|
| StreamIndexRange: int(acfg.StreamIndexRange),
|
| PrefixIndexRange: int(acfg.PrefixIndexRange),
|
| ByteRange: int(acfg.ByteRange),
|
| }
|
|
|
| - tqOpts := taskqueueClient.Options{
|
| - Project: coordCfg.Project,
|
| - Queue: coordCfg.ArchiveTaskQueue,
|
| - Client: tqClient,
|
| - UserAgent: "LogDog Archivist",
|
| - Tasks: int(acfg.Tasks),
|
| + tasks := int(acfg.Tasks)
|
| + if tasks <= 0 {
|
| + tasks = 1
|
| }
|
|
|
| log.Fields{
|
| - "project": tqOpts.Project,
|
| - "queue": tqOpts.Queue,
|
| - }.Infof(c, "Pulling tasks from task queue.")
|
| - taskqueueClient.RunTasks(shutdownCtx, tqOpts, func(c context.Context, t taskqueueClient.Task) bool {
|
| - c = log.SetField(c, "taskID", t.ID)
|
| -
|
| - startTime := clock.Now(c)
|
| - err := ar.ArchiveTask(c, t.Payload)
|
| - duration := clock.Now(c).Sub(startTime)
|
| -
|
| - switch {
|
| - case errors.IsTransient(err):
|
| - // Do not consume
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "duration": duration,
|
| - }.Warningf(c, "TRANSIENT error processing task.")
|
| - return false
|
| -
|
| - case err == nil:
|
| - log.Fields{
|
| - "duration": duration,
|
| - }.Infof(c, "Task successfully processed; deleting.")
|
| - return true
|
| -
|
| - default:
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "duration": duration,
|
| - }.Errorf(c, "Non-transient error processing task; deleting.")
|
| - return true
|
| + "subscription": taskSub,
|
| + "tasks": tasks,
|
| + }.Infof(c, "Pulling tasks from Pub/Sub subscription.")
|
| + it, err := sub.Pull(c, gcps.MaxExtension(pubsub.MaxACKDeadline), gcps.MaxPrefetch(tasks))
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "subscription": taskSub,
|
| + }.Errorf(c, "Failed to create Pub/Sub subscription iterator.")
|
| + }
|
| + defer it.Stop()
|
| +
|
| + // Application shutdown will now operate by stopping the Iterator.
|
| + a.SetShutdownFunc(it.Stop)
|
| +
|
| + // Loop, pulling messages from our iterator and dispatching them.
|
| + parallel.Ignore(parallel.Run(tasks, func(taskC chan<- func() error) {
|
| + for {
|
| + msg, err := it.Next()
|
| + switch err {
|
| + case nil:
|
| + c := log.SetFields(c, log.Fields{
|
| + "messageID": msg.ID,
|
| + "ackID": msg.AckID,
|
| + })
|
| +
|
| + // Dispatch an archive handler for this message.
|
| + taskC <- func() error {
|
| + deleteTask := false
|
| + defer func() {
|
| + msg.Done(deleteTask)
|
| + }()
|
| +
|
| + task, err := makePubSubArchivistTask(psContext, psSubscriptionName, msg)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to unmarshal archive task from message.")
|
| + deleteTask = true
|
| + return nil
|
| + }
|
| +
|
| + startTime := clock.Now(c)
|
| + deleteTask = ar.ArchiveTask(c, task)
|
| + duration := clock.Now(c).Sub(startTime)
|
| +
|
| + if deleteTask {
|
| + log.Fields{
|
| + "duration": duration,
|
| + }.Infof(c, "Task successfully processed; deleting.")
|
| + } else {
|
| + log.Fields{
|
| + "duration": duration,
|
| + }.Infof(c, "Task processing incomplete. Not deleting.")
|
| + }
|
| + return nil
|
| + }
|
| +
|
| + case io.EOF, context.Canceled, context.DeadlineExceeded:
|
| + log.Infof(c, "Subscription iterator is finished.")
|
| + return
|
| +
|
| + default:
|
| + log.WithError(err).Warningf(c, "Subscription iterator returned error. Sleeping...")
|
| + clock.Sleep(c, subscriptionErrorDelay)
|
| + continue
|
| + }
|
| }
|
| - })
|
| + }))
|
|
|
| log.Debugf(c, "Archivist finished.")
|
| return nil
|
|
|