Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(555)

Unified Diff: server/cmd/logdog_archivist/main.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/cmd/logdog_archivist/main.go
diff --git a/server/cmd/logdog_archivist/main.go b/server/cmd/logdog_archivist/main.go
index ecb0609cb3af8d158d3908b72d6a7fabacadf045..4f40243cd0ea918dadb8f4602d8a44f454fef4a8 100644
--- a/server/cmd/logdog_archivist/main.go
+++ b/server/cmd/logdog_archivist/main.go
@@ -5,21 +5,33 @@
package main
import (
+ "io"
+ "time"
+
"github.com/luci/luci-go/common/auth"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
+ "github.com/luci/luci-go/common/gcloud/pubsub"
log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/parallel"
"github.com/luci/luci-go/server/internal/logdog/archivist"
"github.com/luci/luci-go/server/internal/logdog/service"
- "github.com/luci/luci-go/server/taskqueueClient"
"golang.org/x/net/context"
+ "google.golang.org/cloud"
+ gcps "google.golang.org/cloud/pubsub"
)
var (
errInvalidConfig = errors.New("invalid configuration")
)
+const (
+ // subscriptionErrorDelay is the amount of time to sleep after a subscription
+ // iterator returns a non-terminal error.
+ subscriptionErrorDelay = 10 * time.Second
+)
+
// application is the Archivist application state.
type application struct {
service.Service
@@ -33,34 +45,70 @@ func (a *application) runArchivist(c context.Context) error {
switch {
case coordCfg == nil:
fallthrough
- case coordCfg.Project == "":
- return errors.New("missing coordinator project name")
- case coordCfg.ArchiveTaskQueue == "":
- return errors.New("missing archive task queue name")
case acfg == nil:
return errors.New("missing Archivist configuration")
case acfg.GsBase == "":
return errors.New("missing archive GS bucket")
+ case acfg.GsStagingBase == "":
+ return errors.New("missing archive staging GS bucket")
}
- // Construct and validate our GS base.
+ // Construct and validate our GS bases.
gsBase := gs.Path(acfg.GsBase)
if gsBase.Bucket() == "" {
log.Fields{
- "gsBase": acfg.GsBase,
+ "value": gsBase,
}.Errorf(c, "Google Storage base does not include a bucket name.")
return errors.New("invalid Google Storage base")
}
- // Initialize task queue client.
- tqClient, err := a.AuthenticatedClient(func(o *auth.Options) {
- o.Scopes = taskqueueClient.Scopes
+ gsStagingBase := gs.Path(acfg.GsStagingBase)
+ if gsStagingBase.Bucket() == "" {
+ log.Fields{
+ "value": gsStagingBase,
+ }.Errorf(c, "Google Storage staging base does not include a bucket name.")
+ return errors.New("invalid Google Storage staging base")
+ }
+
+ // Initialize Pub/Sub client.
+ //
+ // We will initialize both an authenticated Client instance and an
+ // authenticated Context, since we need the latter for raw ACK deadline
+ // updates.
+ taskSub := pubsub.Subscription(acfg.Subscription)
+ if err := taskSub.Validate(); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "value": taskSub,
+ }.Errorf(c, "Task subscription did not validate.")
+ return errors.New("invalid task subscription name")
+ }
+ psProject, psSubscriptionName := taskSub.Split()
+
+ psAuth, err := a.Authenticator(func(o *auth.Options) {
+ o.Scopes = pubsub.SubscriberScopes
})
if err != nil {
- log.WithError(err).Errorf(c, "Failed to get task queue client.")
+ log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticator.")
+ return err
+ }
+
+ // Pub/Sub: HTTP Client => Context
+ psHTTPClient, err := psAuth.Client()
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create authenticated Pub/Sub transport.")
return err
}
+ psContext := cloud.WithContext(c, psProject, psHTTPClient)
+
+ // Pub/Sub: TokenSource => Client
+ psClient, err := gcps.NewClient(c, psProject, cloud.WithTokenSource(psAuth.TokenSource()))
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
+ return err
+ }
+ sub := psClient.Subscription(psSubscriptionName)
// Initialize our Storage.
s, err := a.IntermediateStorage(c)
@@ -78,65 +126,87 @@ func (a *application) runArchivist(c context.Context) error {
}
defer gsClient.Close()
- // Application shutdown will now operate by cancelling the Collector's
- // shutdown Context.
- shutdownCtx, shutdownFunc := context.WithCancel(c)
- a.SetShutdownFunc(shutdownFunc)
-
ar := archivist.Archivist{
Service: a.Coordinator(),
Storage: s,
GSClient: gsClient,
GSBase: gsBase,
+ GSStagingBase: gsStagingBase,
StreamIndexRange: int(acfg.StreamIndexRange),
PrefixIndexRange: int(acfg.PrefixIndexRange),
ByteRange: int(acfg.ByteRange),
}
- tqOpts := taskqueueClient.Options{
- Project: coordCfg.Project,
- Queue: coordCfg.ArchiveTaskQueue,
- Client: tqClient,
- UserAgent: "LogDog Archivist",
- Tasks: int(acfg.Tasks),
+ tasks := int(acfg.Tasks)
+ if tasks <= 0 {
+ tasks = 1
}
log.Fields{
- "project": tqOpts.Project,
- "queue": tqOpts.Queue,
- }.Infof(c, "Pulling tasks from task queue.")
- taskqueueClient.RunTasks(shutdownCtx, tqOpts, func(c context.Context, t taskqueueClient.Task) bool {
- c = log.SetField(c, "taskID", t.ID)
-
- startTime := clock.Now(c)
- err := ar.ArchiveTask(c, t.Payload)
- duration := clock.Now(c).Sub(startTime)
-
- switch {
- case errors.IsTransient(err):
- // Do not consume
- log.Fields{
- log.ErrorKey: err,
- "duration": duration,
- }.Warningf(c, "TRANSIENT error processing task.")
- return false
-
- case err == nil:
- log.Fields{
- "duration": duration,
- }.Infof(c, "Task successfully processed; deleting.")
- return true
-
- default:
- log.Fields{
- log.ErrorKey: err,
- "duration": duration,
- }.Errorf(c, "Non-transient error processing task; deleting.")
- return true
- }
+ "subscription": taskSub,
+ }.Infof(c, "Pulling tasks from Pub/Sub subscription.")
+ cancelCtx, cancelFunc := context.WithCancel(c)
+ it, err := sub.Pull(cancelCtx, gcps.MaxExtension(pubsub.MaxACKDeadline), gcps.MaxPrefetch(tasks))
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "subscription": taskSub,
+ }.Errorf(c, "Failed to create Pub/Sub subscription iterator.")
+ }
+ defer it.Stop()
+
+ // Application shutdown will now operate by stopping the Iterator.
+ a.SetShutdownFunc(func() {
+ cancelFunc()
+ it.Stop()
})
+ // Loop, pulling messages from our iterator and dispatching them.
+ parallel.Ignore(parallel.Run(tasks, func(taskC chan<- func() error) {
+ for {
+ msg, err := it.Next()
+ switch err {
+ case nil:
+ // Dispatch an archive handler for this message.
+ taskC <- func() error {
+ c = log.SetFields(c, log.Fields{
+ "messageID": msg.ID,
+ })
+
+ startTime := clock.Now(c)
+ deleteTask := ar.ArchiveTask(c, &pubSubArchivistTask{
+ Context: psContext,
+ subscriptionName: psSubscriptionName,
+ msg: msg,
+ })
+ duration := clock.Now(c).Sub(startTime)
+
+ if deleteTask {
+ log.Fields{
+ "duration": duration,
+ }.Infof(c, "Task successfully processed; deleting.")
+ } else {
+ log.Fields{
+ "duration": duration,
+ }.Infof(c, "Task processing incomplete. Not deleting.")
+ }
+ msg.Done(deleteTask)
+ return nil
+ }
+
+ case io.EOF, context.Canceled, context.DeadlineExceeded:
+ log.Infof(c, "Subscription iterator is finished.")
+ return
+
+ default:
+ log.WithError(err).Warningf(c, "Subscription iterator returned error. Sleeping...")
+ clock.Sleep(c, subscriptionErrorDelay)
+ continue
+ }
+ }
+ }))
+
log.Debugf(c, "Archivist finished.")
return nil
}

Powered by Google App Engine
This is Rietveld 408576698