Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(675)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Use new GAE filter for query batching. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/logdog/coordinator/backend/archiveCron.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go
index 9e8b104d74e6ba3b1c1965d86f758d8537cabe22..60eed2a845c0ec6b0df83d6a3284b030a6fd70f3 100644
--- a/appengine/logdog/coordinator/backend/archiveCron.go
+++ b/appengine/logdog/coordinator/backend/archiveCron.go
@@ -5,18 +5,19 @@
package backend
import (
- "errors"
"fmt"
"net/http"
"time"
"github.com/julienschmidt/httprouter"
+ "github.com/luci/gae/filter/dsQueryBatch"
ds "github.com/luci/gae/service/datastore"
tq "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/appengine/logdog/coordinator"
"github.com/luci/luci-go/appengine/logdog/coordinator/config"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/logdog/svcconfig"
"golang.org/x/net/context"
@@ -32,6 +33,10 @@ func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) {
return q, nil
}
+func archiveTaskNameForHash(hashID string) string {
+ return fmt.Sprintf("archive--%s", hashID)
iannucci 2016/03/31 22:52:31 let's put a version number in here too or undo th
dnj 2016/04/01 00:21:52 Done.
+}
+
// createArchiveTask creates a new archive Task.
func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, complete bool) (*tq.Task, error) {
desc := logdog.ArchiveTask{
@@ -43,7 +48,7 @@ func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co
return nil, err
}
- t.Name = fmt.Sprintf("archive-%s", ls.HashID())
+ t.Name = archiveTaskNameForHash(ls.HashID())
return t, nil
}
@@ -121,55 +126,93 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error {
}
q = q.Lte("Updated", now.Add(-threshold))
- // Query and dispatch our tasks.
- var ierr error
- count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) {
- ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error {
+ // If the log stream has a terminal index, and its Updated time is less than
+ // the maximum archive delay, require this archival to be complete (no
+ // missing LogEntry).
+ //
+ // If we're past maximum archive delay, settle for any (even empty) archival.
+ // This is a failsafe to prevent logs from sitting in limbo forever.
+ maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
+
+ // Perform a query, dispatching tasks in batches.
+ batch := b.getMultiTaskBatchSize()
+
+ ti := tq.Get(c)
+ tasks := make([]*tq.Task, 0, batch)
+ totalScheduledTasks := 0
+ dispatchTasks := func() error {
+ if len(tasks) == 0 {
+ return nil
+ }
+
+ err := ti.AddMulti(tasks, queueName)
+ if merr, ok := err.(errors.MultiError); ok {
+ for _, e := range merr {
+ log.Warningf(c, "Task add error: %v", e)
+ }
+ }
+ if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "queue": queueName,
+ "numTasks": len(tasks),
+ "scheduledTaskCount": totalScheduledTasks,
+ }.Errorf(c, "Failed to add tasks to task queue.")
+ return errors.New("failed to add tasks to task queue")
+ }
+
+ totalScheduledTasks += len(tasks)
+ tasks = tasks[:0]
iannucci 2016/03/31 22:52:31 would it be cleaner to do this prune at the callsi
dnj 2016/04/01 00:21:52 I don't think so. Since this manages the "tasks" a
+ return nil
+ }
+
+ c = dsQueryBatch.BatchQueries(c, int32(batch))
+ err = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error {
iannucci 2016/03/31 22:52:31 dsS := ds.Get(dsQueryBatch.......) or something
+ requireComplete := !now.After(ls.Updated.Add(maxDelay))
+ if !requireComplete {
+ log.Fields{
+ "path": ls.Path(),
+ "id": ls.HashID(),
+ "updatedTimestamp": ls.Updated,
+ "maxDelay": maxDelay,
+ }.Warningf(c, "Identified log stream past maximum archival delay.")
+ } else {
log.Fields{
"id": ls.HashID(),
"updated": ls.Updated.String(),
}.Infof(c, "Identified log stream ready for archival.")
+ }
- // If the log stream has a terminal index, and its Updated time is less than
- // the maximum archive delay, require this archival to be complete (no
- // missing LogEntry).
- //
- // If we're past maximum archive delay, settle for any (even empty) archival.
- // This is a failsafe to prevent logs from sitting in limbo forever.
- maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration()
- requireComplete := !now.After(ls.Updated.Add(maxDelay))
- if !requireComplete {
- log.Fields{
- "path": ls.Path(),
- "updatedTimestamp": ls.Updated,
- "maxDelay": maxDelay,
- }.Warningf(c, "Log stream is past maximum archival delay. Dropping completeness requirement.")
- }
-
- task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
- if err != nil {
- log.Fields{
- log.ErrorKey: err,
- "path": ls.Path(),
- }.Errorf(c, "Failed to create archive task.")
- return err
- }
+ task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete)
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "path": ls.Path(),
+ }.Errorf(c, "Failed to create archive task.")
+ return err
+ }
- taskC <- task
- return nil
- })
+ tasks = append(tasks, task)
+ if len(tasks) >= batch {
+ return dispatchTasks()
+ }
+ return nil
})
- if err != nil || ierr != nil {
+ if err != nil {
log.Fields{
- log.ErrorKey: err,
- "queryErr": ierr,
- "taskCount": count,
- }.Errorf(c, "Failed to dispatch archival tasks.")
- return errors.New("failed to dispatch archival tasks")
+ log.ErrorKey: err,
+ "scheduledTaskCount": totalScheduledTasks,
+ }.Errorf(c, "Outer archive query failed.")
+ return errors.New("outer archive query failed")
+ }
+
+ // Dispatch any remaining enqueued tasks.
+ if err := dispatchTasks(); err != nil {
+ return err
}
log.Fields{
- "taskCount": count,
+ "scheduledTaskCount": totalScheduledTasks,
}.Debugf(c, "Archive sweep completed successfully.")
return nil
}
« no previous file with comments | « no previous file | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698