| Index: appengine/logdog/coordinator/backend/archiveCron.go
|
| diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
|
| index 9e8b104d74e6ba3b1c1965d86f758d8537cabe22..f3b5d79e063b6fa2a1c0435ada4a5c63b51968fd 100644
|
| --- a/appengine/logdog/coordinator/backend/archiveCron.go
|
| +++ b/appengine/logdog/coordinator/backend/archiveCron.go
|
| @@ -5,23 +5,26 @@
|
| package backend
|
|
|
| import (
|
| - "errors"
|
| "fmt"
|
| "net/http"
|
| "time"
|
|
|
| "github.com/julienschmidt/httprouter"
|
| + "github.com/luci/gae/filter/dsQueryBatch"
|
| ds "github.com/luci/gae/service/datastore"
|
| tq "github.com/luci/gae/service/taskqueue"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator/config"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/errors"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| "golang.org/x/net/context"
|
| )
|
|
|
| +const archiveTaskVersion = "v1"
|
| +
|
| // archiveTaskQueueName returns the task queue name for archival, or an error
|
| // if it's not configured.
|
| func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
|
| @@ -32,6 +35,10 @@ func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
|
| return q, nil
|
| }
|
|
|
| +func archiveTaskNameForHash(hashID string) string {
|
| + return fmt.Sprintf("archive-%s-%s", hashID, archiveTaskVersion)
|
| +}
|
| +
|
| // createArchiveTask creates a new archive Task.
|
| func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, complete bool) (*tq.Task, error) {
|
| desc := logdog.ArchiveTask{
|
| @@ -43,7 +50,7 @@ func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co
|
| return nil, err
|
| }
|
|
|
| - t.Name = fmt.Sprintf("archive-%s", ls.HashID())
|
| + t.Name = archiveTaskNameForHash(ls.HashID())
|
| return t, nil
|
| }
|
|
|
| @@ -121,55 +128,97 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error {
|
| }
|
| q = q.Lte("Updated", now.Add(-threshold))
|
|
|
| - // Query and dispatch our tasks.
|
| - var ierr error
|
| - count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) {
|
| - ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error {
|
| + // If the log stream has a terminal index, and its Updated time is less than
|
| + // the maximum archive delay, require this archival to be complete (no
|
| + // missing LogEntry).
|
| + //
|
| + // If we're past maximum archive delay, settle for any (even empty) archival.
|
| + // This is a failsafe to prevent logs from sitting in limbo forever.
|
| + maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
|
| +
|
| + // Perform a query, dispatching tasks in batches.
|
| + batch := b.getMultiTaskBatchSize()
|
| +
|
| + ti := tq.Get(c)
|
| + tasks := make([]*tq.Task, 0, batch)
|
| + totalScheduledTasks := 0
|
| + addAndMaybeDispatchTasks := func(task *tq.Task) error {
|
| + switch task {
|
| + case nil:
|
| + if len(tasks) == 0 {
|
| + return nil
|
| + }
|
| +
|
| + default:
|
| + tasks = append(tasks, task)
|
| + if len(tasks) < batch {
|
| + return nil
|
| + }
|
| + }
|
| +
|
| + err := ti.AddMulti(tasks, queueName)
|
| + if merr, ok := err.(errors.MultiError); ok {
|
| + for _, e := range merr {
|
| + log.Warningf(c, "Task add error: %v", e)
|
| + }
|
| + }
|
| + if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "queue": queueName,
|
| + "numTasks": len(tasks),
|
| + "scheduledTaskCount": totalScheduledTasks,
|
| + }.Errorf(c, "Failed to add tasks to task queue.")
|
| + return errors.New("failed to add tasks to task queue")
|
| + }
|
| +
|
| + totalScheduledTasks += len(tasks)
|
| + tasks = tasks[:0]
|
| + return nil
|
| + }
|
| +
|
| + err = ds.Get(dsQueryBatch.BatchQueries(c, int32(batch))).Run(q, func(ls *coordinator.LogStream) error {
|
| + requireComplete := !now.After(ls.Updated.Add(maxDelay))
|
| + if !requireComplete {
|
| + log.Fields{
|
| + "path": ls.Path(),
|
| + "id": ls.HashID(),
|
| + "updatedTimestamp": ls.Updated,
|
| + "maxDelay": maxDelay,
|
| + }.Warningf(c, "Identified log stream past maximum archival delay.")
|
| + } else {
|
| log.Fields{
|
| "id": ls.HashID(),
|
| "updated": ls.Updated.String(),
|
| }.Infof(c, "Identified log stream ready for archival.")
|
| + }
|
|
|
| - // If the log stream has a terminal index, and its Updated time is less than
|
| - // the maximum archive delay, require this archival to be complete (no
|
| - // missing LogEntry).
|
| - //
|
| - // If we're past maximum archive delay, settle for any (even empty) archival.
|
| - // This is a failsafe to prevent logs from sitting in limbo forever.
|
| - maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
|
| - requireComplete := !now.After(ls.Updated.Add(maxDelay))
|
| - if !requireComplete {
|
| - log.Fields{
|
| - "path": ls.Path(),
|
| - "updatedTimestamp": ls.Updated,
|
| - "maxDelay": maxDelay,
|
| - }.Warningf(c, "Log stream is past maximum archival delay. Dropping completeness requirement.")
|
| - }
|
| -
|
| - task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
|
| - if err != nil {
|
| - log.Fields{
|
| - log.ErrorKey: err,
|
| - "path": ls.Path(),
|
| - }.Errorf(c, "Failed to create archive task.")
|
| - return err
|
| - }
|
| + task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "path": ls.Path(),
|
| + }.Errorf(c, "Failed to create archive task.")
|
| + return err
|
| + }
|
|
|
| - taskC <- task
|
| - return nil
|
| - })
|
| + return addAndMaybeDispatchTasks(task)
|
| })
|
| - if err != nil || ierr != nil {
|
| + if err != nil {
|
| log.Fields{
|
| - log.ErrorKey: err,
|
| - "queryErr": ierr,
|
| - "taskCount": count,
|
| - }.Errorf(c, "Failed to dispatch archival tasks.")
|
| - return errors.New("failed to dispatch archival tasks")
|
| + log.ErrorKey: err,
|
| + "scheduledTaskCount": totalScheduledTasks,
|
| + }.Errorf(c, "Outer archive query failed.")
|
| + return errors.New("outer archive query failed")
|
| + }
|
| +
|
| + // Dispatch any remaining enqueued tasks.
|
| + if err := addAndMaybeDispatchTasks(nil); err != nil {
|
| + return err
|
| }
|
|
|
| log.Fields{
|
| - "taskCount": count,
|
| + "scheduledTaskCount": totalScheduledTasks,
|
| }.Debugf(c, "Archive sweep completed successfully.")
|
| return nil
|
| }
|
|
|