Chromium Code Reviews| Index: appengine/logdog/coordinator/backend/archiveCron.go |
| diff --git a/appengine/logdog/coordinator/backend/archiveCron.go b/appengine/logdog/coordinator/backend/archiveCron.go |
| index 9e8b104d74e6ba3b1c1965d86f758d8537cabe22..60eed2a845c0ec6b0df83d6a3284b030a6fd70f3 100644 |
| --- a/appengine/logdog/coordinator/backend/archiveCron.go |
| +++ b/appengine/logdog/coordinator/backend/archiveCron.go |
| @@ -5,18 +5,19 @@ |
| package backend |
| import ( |
| - "errors" |
| "fmt" |
| "net/http" |
| "time" |
| "github.com/julienschmidt/httprouter" |
| + "github.com/luci/gae/filter/dsQueryBatch" |
| ds "github.com/luci/gae/service/datastore" |
| tq "github.com/luci/gae/service/taskqueue" |
| "github.com/luci/luci-go/appengine/logdog/coordinator" |
| "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/errors" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| "golang.org/x/net/context" |
| @@ -32,6 +33,10 @@ func archiveTaskQueueName(cfg *svcconfig.Config) (string, error) { |
| return q, nil |
| } |
| +func archiveTaskNameForHash(hashID string) string { |
| + return fmt.Sprintf("archive--%s", hashID) |
|
iannucci
2016/03/31 22:52:31
let's put a version number in here too
or undo th
dnj
2016/04/01 00:21:52
Done.
|
| +} |
| + |
| // createArchiveTask creates a new archive Task. |
| func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, complete bool) (*tq.Task, error) { |
| desc := logdog.ArchiveTask{ |
| @@ -43,7 +48,7 @@ func createArchiveTask(cfg *svcconfig.Coordinator, ls *coordinator.LogStream, co |
| return nil, err |
| } |
| - t.Name = fmt.Sprintf("archive-%s", ls.HashID()) |
| + t.Name = archiveTaskNameForHash(ls.HashID()) |
| return t, nil |
| } |
| @@ -121,55 +126,93 @@ func (b *Backend) archiveCron(c context.Context, complete bool) error { |
| } |
| q = q.Lte("Updated", now.Add(-threshold)) |
| - // Query and dispatch our tasks. |
| - var ierr error |
| - count, err := b.multiTask(c, queueName, func(taskC chan<- *tq.Task) { |
| - ierr = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { |
| + // If the log stream has a terminal index, and its Updated time is less than |
| + // the maximum archive delay, require this archival to be complete (no |
| + // missing LogEntry). |
| + // |
| + // If we're past maximum archive delay, settle for any (even empty) archival. |
| + // This is a failsafe to prevent logs from sitting in limbo forever. |
| + maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| + |
| + // Perform a query, dispatching tasks in batches. |
| + batch := b.getMultiTaskBatchSize() |
| + |
| + ti := tq.Get(c) |
| + tasks := make([]*tq.Task, 0, batch) |
| + totalScheduledTasks := 0 |
| + dispatchTasks := func() error { |
| + if len(tasks) == 0 { |
| + return nil |
| + } |
| + |
| + err := ti.AddMulti(tasks, queueName) |
| + if merr, ok := err.(errors.MultiError); ok { |
| + for _, e := range merr { |
| + log.Warningf(c, "Task add error: %v", e) |
| + } |
| + } |
| + if err := errors.Filter(err, tq.ErrTaskAlreadyAdded); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "queue": queueName, |
| + "numTasks": len(tasks), |
| + "scheduledTaskCount": totalScheduledTasks, |
| + }.Errorf(c, "Failed to add tasks to task queue.") |
| + return errors.New("failed to add tasks to task queue") |
| + } |
| + |
| + totalScheduledTasks += len(tasks) |
| + tasks = tasks[:0] |
|
iannucci
2016/03/31 22:52:31
would it be cleaner to do this prune at the callsi
dnj
2016/04/01 00:21:52
I don't think so. Since this manages the "tasks" a
|
| + return nil |
| + } |
| + |
| + c = dsQueryBatch.BatchQueries(c, int32(batch)) |
| + err = ds.Get(c).Run(q, func(ls *coordinator.LogStream) error { |
|
iannucci
2016/03/31 22:52:31
dsS := ds.Get(dsQueryBatch.......)
or something
|
| + requireComplete := !now.After(ls.Updated.Add(maxDelay)) |
| + if !requireComplete { |
| + log.Fields{ |
| + "path": ls.Path(), |
| + "id": ls.HashID(), |
| + "updatedTimestamp": ls.Updated, |
| + "maxDelay": maxDelay, |
| + }.Warningf(c, "Identified log stream past maximum archival delay.") |
| + } else { |
| log.Fields{ |
| "id": ls.HashID(), |
| "updated": ls.Updated.String(), |
| }.Infof(c, "Identified log stream ready for archival.") |
| + } |
| - // If the log stream has a terminal index, and its Updated time is less than |
| - // the maximum archive delay, require this archival to be complete (no |
| - // missing LogEntry). |
| - // |
| - // If we're past maximum archive delay, settle for any (even empty) archival. |
| - // This is a failsafe to prevent logs from sitting in limbo forever. |
| - maxDelay := cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| - requireComplete := !now.After(ls.Updated.Add(maxDelay)) |
| - if !requireComplete { |
| - log.Fields{ |
| - "path": ls.Path(), |
| - "updatedTimestamp": ls.Updated, |
| - "maxDelay": maxDelay, |
| - }.Warningf(c, "Log stream is past maximum archival delay. Dropping completeness requirement.") |
| - } |
| - |
| - task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) |
| - if err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "path": ls.Path(), |
| - }.Errorf(c, "Failed to create archive task.") |
| - return err |
| - } |
| + task, err := createArchiveTask(cfg.GetCoordinator(), ls, requireComplete) |
| + if err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "path": ls.Path(), |
| + }.Errorf(c, "Failed to create archive task.") |
| + return err |
| + } |
| - taskC <- task |
| - return nil |
| - }) |
| + tasks = append(tasks, task) |
| + if len(tasks) >= batch { |
| + return dispatchTasks() |
| + } |
| + return nil |
| }) |
| - if err != nil || ierr != nil { |
| + if err != nil { |
| log.Fields{ |
| - log.ErrorKey: err, |
| - "queryErr": ierr, |
| - "taskCount": count, |
| - }.Errorf(c, "Failed to dispatch archival tasks.") |
| - return errors.New("failed to dispatch archival tasks") |
| + log.ErrorKey: err, |
| + "scheduledTaskCount": totalScheduledTasks, |
| + }.Errorf(c, "Outer archive query failed.") |
| + return errors.New("outer archive query failed") |
| + } |
| + |
| + // Dispatch any remaining enqueued tasks. |
| + if err := dispatchTasks(); err != nil { |
| + return err |
| } |
| log.Fields{ |
| - "taskCount": count, |
| + "scheduledTaskCount": totalScheduledTasks, |
| }.Debugf(c, "Archive sweep completed successfully.") |
| return nil |
| } |