| Index: appengine/logdog/coordinator/backend/util.go
|
| diff --git a/appengine/logdog/coordinator/backend/util.go b/appengine/logdog/coordinator/backend/util.go
|
| index 10d2a5a5442042d05e6f60d081cc155ef8c2352f..adac00752bf23d605c561060ee69e4008df251f7 100644
|
| --- a/appengine/logdog/coordinator/backend/util.go
|
| +++ b/appengine/logdog/coordinator/backend/util.go
|
| @@ -11,17 +11,10 @@ import (
|
|
|
| "github.com/golang/protobuf/proto"
|
| tq "github.com/luci/gae/service/taskqueue"
|
| - "github.com/luci/luci-go/common/errors"
|
| log "github.com/luci/luci-go/common/logging"
|
| "golang.org/x/net/context"
|
| )
|
|
|
| -const (
|
| - // defaultMultiTaskBatchSize is the default value for Backend's
|
| - // multiTaskBatchSize parameter.
|
| - defaultMultiTaskBatchSize = 100
|
| -)
|
| -
|
| // httpError
|
| type httpError struct {
|
| reason error
|
| @@ -91,77 +84,3 @@ func createPullTask(msg proto.Message) (*tq.Task, error) {
|
| }
|
| return &t, nil
|
| }
|
| -
|
| -func (b *Backend) multiTask(c context.Context, q string, f func(chan<- *tq.Task)) (int, error) {
|
| - batch := b.multiTaskBatchSize
|
| - if batch <= 0 {
|
| - batch = defaultMultiTaskBatchSize
|
| - }
|
| -
|
| - ti := tq.Get(c)
|
| - send := func(tasks []*tq.Task) int {
|
| - sent := len(tasks)
|
| - if sent == 0 {
|
| - return 0
|
| - }
|
| -
|
| - // Add the tasks. If an error occurs, log each specific error.
|
| - if err := errors.Filter(ti.AddMulti(tasks, q), tq.ErrTaskAlreadyAdded); err != nil {
|
| - switch t := err.(type) {
|
| - case errors.MultiError:
|
| - // Some tasks failed to be added.
|
| - for i, e := range t {
|
| - if e != nil {
|
| - log.Fields{
|
| - log.ErrorKey: e,
|
| - "index": i,
|
| - "taskPath": tasks[i].Path,
|
| - "taskParams": string(tasks[i].Payload),
|
| - }.Errorf(c, "Failed to add task queue task.")
|
| - sent--
|
| - }
|
| - }
|
| -
|
| - default:
|
| - // General AddMulti error.
|
| - log.WithError(t).Errorf(c, "Failed to add task queue tasks.")
|
| - return 0
|
| - }
|
| - }
|
| -
|
| - return sent
|
| - }
|
| -
|
| - // Run our generator function in a separate goroutine.
|
| - taskC := make(chan *tq.Task, batch)
|
| - go func() {
|
| - defer close(taskC)
|
| - f(taskC)
|
| - }()
|
| -
|
| - // Pull tasks from our task channel and dispatch them in batches via send.
|
| - tasks := make([]*tq.Task, 0, batch)
|
| - var total, numSent int
|
| - for t := range taskC {
|
| - total++
|
| -
|
| - tasks = append(tasks, t)
|
| - if len(tasks) >= batch {
|
| - numSent += send(tasks)
|
| - tasks = tasks[:0]
|
| - }
|
| -
|
| - }
|
| -
|
| - // Final send, in case a not-full batch of tasks built up.
|
| - numSent += send(tasks)
|
| -
|
| - if numSent != total {
|
| - log.Fields{
|
| - "total": total,
|
| - "added": numSent,
|
| - }.Errorf(c, "Not all tasks could be added.")
|
| - return numSent, errors.New("error adding tasks")
|
| - }
|
| - return numSent, nil
|
| -}
|
|
|