OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package meter | 5 package meter |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 | 9 |
10 "github.com/luci/luci-go/common/clock" | 10 "github.com/luci/luci-go/common/clock" |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
113 default: | 113 default: |
114 break | 114 break |
115 } | 115 } |
116 } | 116 } |
117 | 117 |
118 // Main buffering function, which runs in a goroutine. | 118 // Main buffering function, which runs in a goroutine. |
119 func (m *meter) consumeWork() { | 119 func (m *meter) consumeWork() { |
120 // Acknowledge when this goroutine finishes. | 120 // Acknowledge when this goroutine finishes. |
121 defer close(m.closeAckC) | 121 defer close(m.closeAckC) |
122 | 122 |
123 ctx := log.SetFilter(m.ctx, "meter") | |
124 | |
125 timerRunning := false | 123 timerRunning := false |
126 » flushTimer := clock.NewTimer(ctx) | 124 » flushTimer := clock.NewTimer(m.ctx) |
127 defer func() { | 125 defer func() { |
128 flushTimer.Stop() | 126 flushTimer.Stop() |
129 }() | 127 }() |
130 | 128 |
131 flushCount := m.Count | 129 flushCount := m.Count |
132 flushTime := m.Delay | 130 flushTime := m.Delay |
133 | 131 |
134 // Build our work buffer. | 132 // Build our work buffer. |
135 workBufferSize := initialWorkBufferSize | 133 workBufferSize := initialWorkBufferSize |
136 if flushCount > 0 { | 134 if flushCount > 0 { |
137 // Will never buffer more than this much Work. | 135 // Will never buffer more than this much Work. |
138 workBufferSize = flushCount | 136 workBufferSize = flushCount |
139 } | 137 } |
140 bufferedWork := make([]interface{}, 0, workBufferSize) | 138 bufferedWork := make([]interface{}, 0, workBufferSize) |
141 | 139 |
142 » log.Debugf(ctx, "Starting work loop.") | 140 » log.Debugf(m.ctx, "Starting work loop.") |
143 active := true | 141 active := true |
144 for active { | 142 for active { |
145 flushRound := false | 143 flushRound := false |
146 | 144 |
147 select { | 145 select { |
148 case work, ok := <-m.workC: | 146 case work, ok := <-m.workC: |
149 if !ok { | 147 if !ok { |
150 » » » » log.Debugf(ctx, "Received instance close signal;
terminating.") | 148 » » » » log.Debugf(m.ctx, "Received instance close signa
l; terminating.") |
151 | 149 |
152 // Don't immediately exit the loop, since there
may still be buffered | 150 // Don't immediately exit the loop, since there
may still be buffered |
153 // Work to flush. | 151 // Work to flush. |
154 active = false | 152 active = false |
155 flushRound = true | 153 flushRound = true |
156 break | 154 break |
157 } | 155 } |
158 | 156 |
159 // Count the work in this group. If we're not using a gi
ven metric, try | 157 // Count the work in this group. If we're not using a gi
ven metric, try |
160 // and avoid wasting time collecting it. | 158 // and avoid wasting time collecting it. |
161 bufferedWork = append(bufferedWork, work) | 159 bufferedWork = append(bufferedWork, work) |
162 | 160 |
163 // Handle work count threshold. We do this first, since
it's trivial to | 161 // Handle work count threshold. We do this first, since
it's trivial to |
164 // setup/compute. | 162 // setup/compute. |
165 if flushCount > 0 && len(bufferedWork) >= flushCount { | 163 if flushCount > 0 && len(bufferedWork) >= flushCount { |
166 flushRound = true | 164 flushRound = true |
167 } | 165 } |
168 // Start our flush timer, if it's not already ticking. O
nly waste time | 166 // Start our flush timer, if it's not already ticking. O
nly waste time |
169 // doing this if we're not already flushing, since flush
ing will clear the | 167 // doing this if we're not already flushing, since flush
ing will clear the |
170 // timer. | 168 // timer. |
171 if !flushRound && flushTime > 0 && !timerRunning { | 169 if !flushRound && flushTime > 0 && !timerRunning { |
172 » » » » log.Infof(log.SetFields(ctx, log.Fields{ | 170 » » » » log.Infof(log.SetFields(m.ctx, log.Fields{ |
173 "flushInterval": flushTime, | 171 "flushInterval": flushTime, |
174 }), "Starting flush timer.") | 172 }), "Starting flush timer.") |
175 flushTimer.Reset(flushTime) | 173 flushTimer.Reset(flushTime) |
176 timerRunning = true | 174 timerRunning = true |
177 } | 175 } |
178 | 176 |
179 // Invoke work callback, if one is set. | 177 // Invoke work callback, if one is set. |
180 if m.IngestCallback != nil { | 178 if m.IngestCallback != nil { |
181 flushRound = m.IngestCallback(work) || flushRoun
d | 179 flushRound = m.IngestCallback(work) || flushRoun
d |
182 } | 180 } |
183 | 181 |
184 case <-m.flushNowC: | 182 case <-m.flushNowC: |
185 flushRound = true | 183 flushRound = true |
186 | 184 |
187 case <-flushTimer.GetC(): | 185 case <-flushTimer.GetC(): |
188 » » » log.Infof(ctx, "Flush timer has triggered.") | 186 » » » log.Infof(m.ctx, "Flush timer has triggered.") |
189 timerRunning = false | 187 timerRunning = false |
190 flushRound = true | 188 flushRound = true |
191 } | 189 } |
192 | 190 |
193 // Should we flush? | 191 // Should we flush? |
194 if flushRound { | 192 if flushRound { |
195 flushTimer.Stop() | 193 flushTimer.Stop() |
196 timerRunning = false | 194 timerRunning = false |
197 | 195 |
198 if len(bufferedWork) > 0 { | 196 if len(bufferedWork) > 0 { |
199 // Clone bufferedWork, since we re-use it. | 197 // Clone bufferedWork, since we re-use it. |
200 workToSend := make([]interface{}, len(bufferedWo
rk)) | 198 workToSend := make([]interface{}, len(bufferedWo
rk)) |
201 copy(workToSend, bufferedWork) | 199 copy(workToSend, bufferedWork) |
202 | 200 |
203 // Clear our Work slice for re-use. This does no
t resize the underlying | 201 // Clear our Work slice for re-use. This does no
t resize the underlying |
204 // array, since it's likely to fill again. | 202 // array, since it's likely to fill again. |
205 for idx := range bufferedWork { | 203 for idx := range bufferedWork { |
206 bufferedWork[idx] = nil | 204 bufferedWork[idx] = nil |
207 } | 205 } |
208 bufferedWork = bufferedWork[:0] | 206 bufferedWork = bufferedWork[:0] |
209 | 207 |
210 // Callback with this work. | 208 // Callback with this work. |
211 m.Callback(workToSend) | 209 m.Callback(workToSend) |
212 } | 210 } |
213 } | 211 } |
214 } | 212 } |
215 } | 213 } |
OLD | NEW |