Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(296)

Side by Side Diff: appengine/logdog/coordinator/backend/util.go

Issue 1844963002: Iterate archive query alongside task queue. (Closed) Base URL: https://github.com/luci/luci-go@collector-gae-classic
Patch Set: Respond to code review comments. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 "net/url" 10 "net/url"
11 11
12 "github.com/golang/protobuf/proto" 12 "github.com/golang/protobuf/proto"
13 tq "github.com/luci/gae/service/taskqueue" 13 tq "github.com/luci/gae/service/taskqueue"
14 "github.com/luci/luci-go/common/errors"
15 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
16 "golang.org/x/net/context" 15 "golang.org/x/net/context"
17 ) 16 )
18 17
19 const (
20 // defaultMultiTaskBatchSize is the default value for Backend's
21 // multiTaskBatchSize parameter.
22 defaultMultiTaskBatchSize = 100
23 )
24
25 // httpError 18 // httpError
26 type httpError struct { 19 type httpError struct {
27 reason error 20 reason error
28 code int 21 code int
29 } 22 }
30 23
31 func (e *httpError) Error() string { 24 func (e *httpError) Error() string {
32 if r := e.reason; r != nil { 25 if r := e.reason; r != nil {
33 return fmt.Sprintf("%v: %q", e.reason, http.StatusText(e.code)) 26 return fmt.Sprintf("%v: %q", e.reason, http.StatusText(e.code))
34 } 27 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
84 77
85 if msg != nil { 78 if msg != nil {
86 var err error 79 var err error
87 t.Payload, err = proto.Marshal(msg) 80 t.Payload, err = proto.Marshal(msg)
88 if err != nil { 81 if err != nil {
89 return nil, err 82 return nil, err
90 } 83 }
91 } 84 }
92 return &t, nil 85 return &t, nil
93 } 86 }
94
95 func (b *Backend) multiTask(c context.Context, q string, f func(chan<- *tq.Task) ) (int, error) {
96 batch := b.multiTaskBatchSize
97 if batch <= 0 {
98 batch = defaultMultiTaskBatchSize
99 }
100
101 ti := tq.Get(c)
102 send := func(tasks []*tq.Task) int {
103 sent := len(tasks)
104 if sent == 0 {
105 return 0
106 }
107
108 // Add the tasks. If an error occurs, log each specific error.
109 if err := errors.Filter(ti.AddMulti(tasks, q), tq.ErrTaskAlready Added); err != nil {
110 switch t := err.(type) {
111 case errors.MultiError:
112 // Some tasks failed to be added.
113 for i, e := range t {
114 if e != nil {
115 log.Fields{
116 log.ErrorKey: e,
117 "index": i,
118 "taskPath": tasks[i].P ath,
119 "taskParams": string(tas ks[i].Payload),
120 }.Errorf(c, "Failed to add task queue task.")
121 sent--
122 }
123 }
124
125 default:
126 // General AddMulti error.
127 log.WithError(t).Errorf(c, "Failed to add task q ueue tasks.")
128 return 0
129 }
130 }
131
132 return sent
133 }
134
135 // Run our generator function in a separate goroutine.
136 taskC := make(chan *tq.Task, batch)
137 go func() {
138 defer close(taskC)
139 f(taskC)
140 }()
141
142 // Pull tasks from our task channel and dispatch them in batches via sen d.
143 tasks := make([]*tq.Task, 0, batch)
144 var total, numSent int
145 for t := range taskC {
146 total++
147
148 tasks = append(tasks, t)
149 if len(tasks) >= batch {
150 numSent += send(tasks)
151 tasks = tasks[:0]
152 }
153
154 }
155
156 // Final send, in case a not-full batch of tasks built up.
157 numSent += send(tasks)
158
159 if numSent != total {
160 log.Fields{
161 "total": total,
162 "added": numSent,
163 }.Errorf(c, "Not all tasks could be added.")
164 return numSent, errors.New("error adding tasks")
165 }
166 return numSent, nil
167 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/backend/backend.go ('k') | appengine/logdog/coordinator/backend/util_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698