| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "net/url" | 10 "net/url" |
| 11 | 11 |
| 12 "github.com/golang/protobuf/proto" | 12 "github.com/golang/protobuf/proto" |
| 13 tq "github.com/luci/gae/service/taskqueue" | 13 tq "github.com/luci/gae/service/taskqueue" |
| 14 "github.com/luci/luci-go/common/errors" | |
| 15 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 16 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 17 ) | 16 ) |
| 18 | 17 |
| 19 const ( | |
| 20 // defaultMultiTaskBatchSize is the default value for Backend's | |
| 21 // multiTaskBatchSize parameter. | |
| 22 defaultMultiTaskBatchSize = 100 | |
| 23 ) | |
| 24 | |
| 25 // httpError | 18 // httpError |
| 26 type httpError struct { | 19 type httpError struct { |
| 27 reason error | 20 reason error |
| 28 code int | 21 code int |
| 29 } | 22 } |
| 30 | 23 |
| 31 func (e *httpError) Error() string { | 24 func (e *httpError) Error() string { |
| 32 if r := e.reason; r != nil { | 25 if r := e.reason; r != nil { |
| 33 return fmt.Sprintf("%v: %q", e.reason, http.StatusText(e.code)) | 26 return fmt.Sprintf("%v: %q", e.reason, http.StatusText(e.code)) |
| 34 } | 27 } |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 84 | 77 |
| 85 if msg != nil { | 78 if msg != nil { |
| 86 var err error | 79 var err error |
| 87 t.Payload, err = proto.Marshal(msg) | 80 t.Payload, err = proto.Marshal(msg) |
| 88 if err != nil { | 81 if err != nil { |
| 89 return nil, err | 82 return nil, err |
| 90 } | 83 } |
| 91 } | 84 } |
| 92 return &t, nil | 85 return &t, nil |
| 93 } | 86 } |
| 94 | |
| 95 func (b *Backend) multiTask(c context.Context, q string, f func(chan<- *tq.Task)
) (int, error) { | |
| 96 batch := b.multiTaskBatchSize | |
| 97 if batch <= 0 { | |
| 98 batch = defaultMultiTaskBatchSize | |
| 99 } | |
| 100 | |
| 101 ti := tq.Get(c) | |
| 102 send := func(tasks []*tq.Task) int { | |
| 103 sent := len(tasks) | |
| 104 if sent == 0 { | |
| 105 return 0 | |
| 106 } | |
| 107 | |
| 108 // Add the tasks. If an error occurs, log each specific error. | |
| 109 if err := errors.Filter(ti.AddMulti(tasks, q), tq.ErrTaskAlready
Added); err != nil { | |
| 110 switch t := err.(type) { | |
| 111 case errors.MultiError: | |
| 112 // Some tasks failed to be added. | |
| 113 for i, e := range t { | |
| 114 if e != nil { | |
| 115 log.Fields{ | |
| 116 log.ErrorKey: e, | |
| 117 "index": i, | |
| 118 "taskPath": tasks[i].P
ath, | |
| 119 "taskParams": string(tas
ks[i].Payload), | |
| 120 }.Errorf(c, "Failed to add task
queue task.") | |
| 121 sent-- | |
| 122 } | |
| 123 } | |
| 124 | |
| 125 default: | |
| 126 // General AddMulti error. | |
| 127 log.WithError(t).Errorf(c, "Failed to add task q
ueue tasks.") | |
| 128 return 0 | |
| 129 } | |
| 130 } | |
| 131 | |
| 132 return sent | |
| 133 } | |
| 134 | |
| 135 // Run our generator function in a separate goroutine. | |
| 136 taskC := make(chan *tq.Task, batch) | |
| 137 go func() { | |
| 138 defer close(taskC) | |
| 139 f(taskC) | |
| 140 }() | |
| 141 | |
| 142 // Pull tasks from our task channel and dispatch them in batches via sen
d. | |
| 143 tasks := make([]*tq.Task, 0, batch) | |
| 144 var total, numSent int | |
| 145 for t := range taskC { | |
| 146 total++ | |
| 147 | |
| 148 tasks = append(tasks, t) | |
| 149 if len(tasks) >= batch { | |
| 150 numSent += send(tasks) | |
| 151 tasks = tasks[:0] | |
| 152 } | |
| 153 | |
| 154 } | |
| 155 | |
| 156 // Final send, in case a not-full batch of tasks built up. | |
| 157 numSent += send(tasks) | |
| 158 | |
| 159 if numSent != total { | |
| 160 log.Fields{ | |
| 161 "total": total, | |
| 162 "added": numSent, | |
| 163 }.Errorf(c, "Not all tasks could be added.") | |
| 164 return numSent, errors.New("error adding tasks") | |
| 165 } | |
| 166 return numSent, nil | |
| 167 } | |
| OLD | NEW |