Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(55)

Side by Side Diff: common/meter/meter.go

Issue 1622553005: Remove log filtering and add stringsetflag. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: error on empty Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/logging/gologger/logger.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package meter 5 package meter
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 9
10 "github.com/luci/luci-go/common/clock" 10 "github.com/luci/luci-go/common/clock"
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
113 default: 113 default:
114 break 114 break
115 } 115 }
116 } 116 }
117 117
118 // Main buffering function, which runs in a goroutine. 118 // Main buffering function, which runs in a goroutine.
119 func (m *meter) consumeWork() { 119 func (m *meter) consumeWork() {
120 // Acknowledge when this goroutine finishes. 120 // Acknowledge when this goroutine finishes.
121 defer close(m.closeAckC) 121 defer close(m.closeAckC)
122 122
123 ctx := log.SetFilter(m.ctx, "meter")
124
125 timerRunning := false 123 timerRunning := false
126 » flushTimer := clock.NewTimer(ctx) 124 » flushTimer := clock.NewTimer(m.ctx)
127 defer func() { 125 defer func() {
128 flushTimer.Stop() 126 flushTimer.Stop()
129 }() 127 }()
130 128
131 flushCount := m.Count 129 flushCount := m.Count
132 flushTime := m.Delay 130 flushTime := m.Delay
133 131
134 // Build our work buffer. 132 // Build our work buffer.
135 workBufferSize := initialWorkBufferSize 133 workBufferSize := initialWorkBufferSize
136 if flushCount > 0 { 134 if flushCount > 0 {
137 // Will never buffer more than this much Work. 135 // Will never buffer more than this much Work.
138 workBufferSize = flushCount 136 workBufferSize = flushCount
139 } 137 }
140 bufferedWork := make([]interface{}, 0, workBufferSize) 138 bufferedWork := make([]interface{}, 0, workBufferSize)
141 139
142 » log.Debugf(ctx, "Starting work loop.") 140 » log.Debugf(m.ctx, "Starting work loop.")
143 active := true 141 active := true
144 for active { 142 for active {
145 flushRound := false 143 flushRound := false
146 144
147 select { 145 select {
148 case work, ok := <-m.workC: 146 case work, ok := <-m.workC:
149 if !ok { 147 if !ok {
150 » » » » log.Debugf(ctx, "Received instance close signal; terminating.") 148 » » » » log.Debugf(m.ctx, "Received instance close signa l; terminating.")
151 149
152 // Don't immediately exit the loop, since there may still be buffered 150 // Don't immediately exit the loop, since there may still be buffered
153 // Work to flush. 151 // Work to flush.
154 active = false 152 active = false
155 flushRound = true 153 flushRound = true
156 break 154 break
157 } 155 }
158 156
159 // Count the work in this group. If we're not using a gi ven metric, try 157 // Count the work in this group. If we're not using a gi ven metric, try
160 // and avoid wasting time collecting it. 158 // and avoid wasting time collecting it.
161 bufferedWork = append(bufferedWork, work) 159 bufferedWork = append(bufferedWork, work)
162 160
163 // Handle work count threshold. We do this first, since it's trivial to 161 // Handle work count threshold. We do this first, since it's trivial to
164 // setup/compute. 162 // setup/compute.
165 if flushCount > 0 && len(bufferedWork) >= flushCount { 163 if flushCount > 0 && len(bufferedWork) >= flushCount {
166 flushRound = true 164 flushRound = true
167 } 165 }
168 // Start our flush timer, if it's not already ticking. O nly waste time 166 // Start our flush timer, if it's not already ticking. O nly waste time
169 // doing this if we're not already flushing, since flush ing will clear the 167 // doing this if we're not already flushing, since flush ing will clear the
170 // timer. 168 // timer.
171 if !flushRound && flushTime > 0 && !timerRunning { 169 if !flushRound && flushTime > 0 && !timerRunning {
172 » » » » log.Infof(log.SetFields(ctx, log.Fields{ 170 » » » » log.Infof(log.SetFields(m.ctx, log.Fields{
173 "flushInterval": flushTime, 171 "flushInterval": flushTime,
174 }), "Starting flush timer.") 172 }), "Starting flush timer.")
175 flushTimer.Reset(flushTime) 173 flushTimer.Reset(flushTime)
176 timerRunning = true 174 timerRunning = true
177 } 175 }
178 176
179 // Invoke work callback, if one is set. 177 // Invoke work callback, if one is set.
180 if m.IngestCallback != nil { 178 if m.IngestCallback != nil {
181 flushRound = m.IngestCallback(work) || flushRoun d 179 flushRound = m.IngestCallback(work) || flushRoun d
182 } 180 }
183 181
184 case <-m.flushNowC: 182 case <-m.flushNowC:
185 flushRound = true 183 flushRound = true
186 184
187 case <-flushTimer.GetC(): 185 case <-flushTimer.GetC():
188 » » » log.Infof(ctx, "Flush timer has triggered.") 186 » » » log.Infof(m.ctx, "Flush timer has triggered.")
189 timerRunning = false 187 timerRunning = false
190 flushRound = true 188 flushRound = true
191 } 189 }
192 190
193 // Should we flush? 191 // Should we flush?
194 if flushRound { 192 if flushRound {
195 flushTimer.Stop() 193 flushTimer.Stop()
196 timerRunning = false 194 timerRunning = false
197 195
198 if len(bufferedWork) > 0 { 196 if len(bufferedWork) > 0 {
199 // Clone bufferedWork, since we re-use it. 197 // Clone bufferedWork, since we re-use it.
200 workToSend := make([]interface{}, len(bufferedWo rk)) 198 workToSend := make([]interface{}, len(bufferedWo rk))
201 copy(workToSend, bufferedWork) 199 copy(workToSend, bufferedWork)
202 200
203 // Clear our Work slice for re-use. This does no t resize the underlying 201 // Clear our Work slice for re-use. This does no t resize the underlying
204 // array, since it's likely to fill again. 202 // array, since it's likely to fill again.
205 for idx := range bufferedWork { 203 for idx := range bufferedWork {
206 bufferedWork[idx] = nil 204 bufferedWork[idx] = nil
207 } 205 }
208 bufferedWork = bufferedWork[:0] 206 bufferedWork = bufferedWork[:0]
209 207
210 // Callback with this work. 208 // Callback with this work.
211 m.Callback(workToSend) 209 m.Callback(workToSend)
212 } 210 }
213 } 211 }
214 } 212 }
215 } 213 }
OLDNEW
« no previous file with comments | « common/logging/gologger/logger.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698