Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/backend/archiveCron.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
index f3b5d79e063b6fa2a1c0435ada4a5c63b51968fd..d06558df20c8e70814d1fd72e01f86bee11ba561 100644
--- a/appengine/logdog/coordinator/backend/archiveCron.go
+++ b/appengine/logdog/coordinator/backend/archiveCron.go
@@ -7,218 +7,150 @@ package backend
import (
"fmt"
"net/http"
- "time"
+ "sync/atomic"
"github.com/julienschmidt/httprouter"
"github.com/luci/gae/filter/dsQueryBatch"
ds "github.com/luci/gae/service/datastore"
- tq "github.com/luci/gae/service/taskqueue"
+ "github.com/luci/gae/service/info"
"github.com/luci/luci-go/appengine/logdog/coordinator"
- "github.com/luci/luci-go/appengine/logdog/coordinator/config"
- "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
- "github.com/luci/luci-go/common/proto/logdog/svcconfig"
+ "github.com/luci/luci-go/common/parallel"
"golang.org/x/net/context"
)
-const archiveTaskVersion = "v1"
-
-// archiveTaskQueueName returns the task queue name for archival, or an error
-// if it's not configured.
-func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
- q := cfg.GetCoordinator().ArchiveTaskQueue
- if q == "" {
- return "", errors.New("missing archive task queue name")
- }
- return q, nil
-}
-
-func archiveTaskNameForHash(hashID string) string {
- return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion)
-}
-
-// createArchiveTask creates a new archive Task.
-func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, complete bool) (*tq.Task, error) {
- desc := logdog.ArchiveTask{
- Path: string(ls.Path()),
- Complete: complete,
- }
- t, err := createPullTask(&desc)
- if err != nil {
- return nil, err
- }
-
- t.Name = archiveTaskNameForHash(ls.HashID())
- return t, nil
-}
+const archiveTaskVersion = "v4"
// HandleArchiveCron is the handler for the archive cron endpoint. This scans
-// for terminal log streams that are ready for archival.
+// for log streams that are ready for archival.
//
// This will be called periodically by AppEngine cron.
func (b *Backend) HandleArchiveCron(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
errorWrapper(c, w, func() error {
- return b.archiveCron(c, true)
- })
-}
-
-// HandleArchiveCronNT is the handler for the archive non-terminal cron
-// endpoint. This scans for non-terminal log streams that have not been updated
-// in sufficiently long that we're willing to declare them complete and mark
-// them terminal.
-//
-// This will be called periodically by AppEngine cron.
-func (b *Backend) HandleArchiveCronNT(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
- errorWrapper(c, w, func() error {
- return b.archiveCron(c, false)
- })
-}
-
-// HandleArchiveCronPurge purges all archival tasks from the task queue.
-func (b *Backend) HandleArchiveCronPurge(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
- errorWrapper(c, w, func() error {
- cfg, err := config.Load(c)
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to load configuration.")
- return err
- }
-
- queueName, err := archiveTaskQueueName(cfg)
- if err != nil {
- log.Errorf(c, "Failed to get task queue name.")
- return err
- }
-
- if err := tq.Get(c).Purge(queueName); err != nil {
- log.Fields{
- log.ErrorKey: err,
- "queue": queueName,
- }.Errorf(c, "Failed to purge task queue.")
- return err
- }
- return nil
+ return b.archiveCron(c)
})
}
-func (b *Backend) archiveCron(c context.Context, complete bool) error {
- cfg, err := config.Load(c)
+func (b *Backend) archiveCron(c context.Context) error {
+ services := b.GetServices()
+ _, cfg, err := services.Config(c)
if err != nil {
- log.WithError(err).Errorf(c, "Failed to load configuration.")
- return err
+ return fmt.Errorf("failed to load configuration: %v", err)
}
- queueName, err := archiveTaskQueueName(cfg)
- if err != nil {
- log.Errorf(c, "Failed to get task queue name.")
- return err
+ archiveDelayMax := cfg.Coordinator.ArchiveDelayMax.Duration()
+ if archiveDelayMax <= 0 {
+ return fmt.Errorf("must have positive maximum archive delay, not %q", archiveDelayMax.String())
}
- now := clock.Now(c).UTC()
- q := ds.NewQuery("LogStream")
-
- var threshold time.Duration
- if complete {
- threshold = cfg.GetCoordinator().ArchiveDelay.Duration()
- q = q.Eq("State", coordinator.LSTerminated)
- } else {
- threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration()
- q = q.Eq("State", coordinator.LSPending)
+ ap, err := services.ArchivalPublisher(c)
+ if err != nil {
+ return fmt.Errorf("failed to get archival publisher: %v", err)
}
- q = q.Lte("Updated", now.Add(-threshold))
- // If the log stream has a terminal index, and its Updated time is less than
- // the maximum archive delay, require this archival to be complete (no
- // missing LogEntry).
+ threshold := clock.Now(c).UTC().Add(-archiveDelayMax)
+ log.Fields{
+ "threshold": threshold,
+ }.Infof(c, "Querying for all streaming logs created before max archival threshold.")
+
+ // Query for log streams that were created <= our threshold and that are
+ // still in LSStreaming state.
//
- // If we're past maximum archive delay, settle for any (even empty) archival.
- // This is a failsafe to prevent logs from sitting in limbo forever.
- maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
+ // We order descending because this is already an index that we use for our
+ // "logdog.Logs.Query".
+ q := ds.NewQuery("LogStream").
+ KeysOnly(true).
+ Eq("State", coordinator.LSStreaming).
+ Lte("Created", threshold).
+ Order("-Created", "State")
+
+ // Since these logs are beyond maximum archival delay, we will dispatch
+ // archival immediately.
+ params := coordinator.ArchivalParams{
+ RequestID: info.Get(c).RequestID(),
+ }
- // Perform a query, dispatching tasks in batches.
+ // Create archive tasks for our expired log streams in parallel.
batch := b.getMultiTaskBatchSize()
-
- ti := tq.Get(c)
- tasks := make([]*tq.Task, 0, batch)
- totalScheduledTasks := 0
- addAndMaybeDispatchTasks := func(task *tq.Task) error {
- switch task {
- case nil:
- if len(tasks) == 0 {
+ var tasked int32
+ var failed int32
+
+ var ierr error
+ parallel.Ignore(parallel.Run(batch, func(taskC chan<- func() error) {
+ // Run a batched query across the expired log stream space.
+ ierr = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(lsKey *ds.Key) error {
+ var ls coordinator.LogStream
+ ds.PopulateKey(&ls, lsKey)
+
+ // Archive this log stream in a transaction.
+ taskC <- func() error {
+ err := ds.Get(c).RunInTransaction(func(c context.Context) error {
+ if err := ds.Get(c).Get(&ls); err != nil {
+ log.WithError(err).Errorf(c, "Failed to load stream.")
+ return err
+ }
+
+ log.Fields{
+ "path": ls.Path(),
+ "id": ls.HashID,
+ }.Infof(c, "Identified expired log stream.")
+
+ if err := params.PublishTask(c, ap, &ls); err != nil {
dnj 2016/04/11 17:20:04 PublishTask is transaction-friendly, as it interna
+ if err == coordinator.ErrArchiveTasked {
+ log.Warningf(c, "Archival has already been tasked for this stream.")
+ return nil
+ }
+ return err
+ }
+ return ds.Get(c).Put(&ls)
+ }, nil)
+
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "path": ls.Path(),
+ }.Errorf(c, "Failed to archive log stream.")
+ atomic.AddInt32(&failed, 1)
+ return nil // Nothing will consume it anyway.
+ }
+
+ log.Fields{
+ "path": ls.Path(),
+ "id": ls.HashID,
+ "archiveTopic": cfg.Coordinator.ArchiveTopic,
+ }.Infof(c, "Created archive task.")
+ atomic.AddInt32(&tasked, 1)
return nil
}
- default:
- tasks = append(tasks, task)
- if len(tasks) < batch {
- return nil
- }
- }
+ return nil
+ })
+ }))
- err := ti.AddMulti(tasks, queueName)
- if merr, ok := err.(errors.MultiError); ok {
- for _, e := range merr {
- log.Warningf(c, "Task add error: %v", e)
- }
- }
- if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil {
- log.Fields{
- log.ErrorKey: err,
- "queue": queueName,
- "numTasks": len(tasks),
- "scheduledTaskCount": totalScheduledTasks,
- }.Errorf(c, "Failed to add tasks to task queue.")
- return errors.New("failed to add tasks to task queue")
- }
-
- totalScheduledTasks += len(tasks)
- tasks = tasks[:0]
- return nil
- }
+ // Return an error code if we experienced any failures. This doesn't really
+ // have an impact, but it will show up as a "!" in the cron UI.
+ switch {
+ case ierr != nil:
+ log.Fields{
+ log.ErrorKey: err,
+ "archiveCount": tasked,
+ }.Errorf(c, "Failed to execute expired tasks query.")
+ return ierr
- err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error {
- requireComplete := !now.After(ls.Updated.Add(maxDelay))
- if !requireComplete {
- log.Fields{
- "path": ls.Path(),
- "id": ls.HashID(),
- "updatedTimestamp": ls.Updated,
- "maxDelay": maxDelay,
- }.Warningf(c, "Identified log stream past maximum archival delay.")
- } else {
- log.Fields{
- "id": ls.HashID(),
- "updated": ls.Updated.String(),
- }.Infof(c, "Identified log stream ready for archival.")
- }
-
- task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
- if err != nil {
- log.Fields{
- log.ErrorKey: err,
- "path": ls.Path(),
- }.Errorf(c, "Failed to create archive task.")
- return err
- }
-
- return addAndMaybeDispatchTasks(task)
- })
- if err != nil {
+ case failed > 0:
log.Fields{
- log.ErrorKey: err,
- "scheduledTaskCount": totalScheduledTasks,
- }.Errorf(c, "Outer archive query failed.")
- return errors.New("outer archive query failed")
- }
+ log.ErrorKey: err,
+ "archiveCount": tasked,
+ "failCount": failed,
+ }.Errorf(c, "Failed to archive candidate all streams.")
+ return errors.New("failed to archive all candidate streams")
dnj 2016/04/11 17:20:04 Since this is cron, we're basically just using the
- // Dispatch any remaining enqueued tasks.
- if err := addAndMaybeDispatchTasks(nil); err != nil {
- return err
+ default:
+ log.Fields{
+ "archiveCount": tasked,
+ }.Infof(c, "Archive sweep completed successfully.")
+ return nil
}
-
- log.Fields{
- "scheduledTaskCount": totalScheduledTasks,
- }.Debugf(c, "Archive sweep completed successfully.")
- return nil
}

Powered by Google App Engine
This is Rietveld 408576698