Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2019)

Unified Diff: appengine/logdog/coordinator/backend/util.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Respond to code review comments. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/logdog/coordinator/backend/backend.go ('k') | appengine/logdog/coordinator/backend/util_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/logdog/coordinator/backend/util.go
diff --git a/appengine/logdog/coordinator/backend/util.go b/appengine/logdog/coordinator/backend/util.go
index 10d2a5a5442042d05e6f60d081cc155ef8c2352f..adac00752bf23d605c561060ee69e4008df251f7 100644
--- a/appengine/logdog/coordinator/backend/util.go
+++ b/appengine/logdog/coordinator/backend/util.go
@@ -11,17 +11,10 @@ import (
"github.com/golang/protobuf/proto"
tq "github.com/luci/gae/service/taskqueue"
- "github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"golang.org/x/net/context"
)
-const (
- // defaultMultiTaskBatchSize is the default value for Backend's
- // multiTaskBatchSize parameter.
- defaultMultiTaskBatchSize = 100
-)
-
// httpError
type httpError struct {
reason error
@@ -91,77 +84,3 @@ func createPullTask(msg proto.Message) (*tq.Task, error) {
}
return &t, nil
}
-
-func (b *Backend) multiTask(c context.Context, q string, f func(chan<- *tq.Task)) (int, error) {
- batch := b.multiTaskBatchSize
- if batch <= 0 {
- batch = defaultMultiTaskBatchSize
- }
-
- ti := tq.Get(c)
- send := func(tasks []*tq.Task) int {
- sent := len(tasks)
- if sent == 0 {
- return 0
- }
-
- // Add the tasks. If an error occurs, log each specific error.
- if err := errors.Filter(ti.AddMulti(tasks, q), tq.ErrTaskAlreadyAdded); err != nil {
- switch t := err.(type) {
- case errors.MultiError:
- // Some tasks failed to be added.
- for i, e := range t {
- if e != nil {
- log.Fields{
- log.ErrorKey: e,
- "index": i,
- "taskPath": tasks[i].Path,
- "taskParams": string(tasks[i].Payload),
- }.Errorf(c, "Failed to add task queue task.")
- sent--
- }
- }
-
- default:
- // General AddMulti error.
- log.WithError(t).Errorf(c, "Failed to add task queue tasks.")
- return 0
- }
- }
-
- return sent
- }
-
- // Run our generator function in a separate goroutine.
- taskC := make(chan *tq.Task, batch)
- go func() {
- defer close(taskC)
- f(taskC)
- }()
-
- // Pull tasks from our task channel and dispatch them in batches via send.
- tasks := make([]*tq.Task, 0, batch)
- var total, numSent int
- for t := range taskC {
- total++
-
- tasks = append(tasks, t)
- if len(tasks) >= batch {
- numSent += send(tasks)
- tasks = tasks[:0]
- }
-
- }
-
- // Final send, in case a not-full batch of tasks built up.
- numSent += send(tasks)
-
- if numSent != total {
- log.Fields{
- "total": total,
- "added": numSent,
- }.Errorf(c, "Not all tasks could be added.")
- return numSent, errors.New("error adding tasks")
- }
- return numSent, nil
-}
« no previous file with comments | « appengine/logdog/coordinator/backend/backend.go ('k') | appengine/logdog/coordinator/backend/util_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698