Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(46)

Side by Side Diff: client/internal/logdog/butler/bundler/stream.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Enhanced doc.go. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sync"
9 "sync/atomic"
10 "time"
11
12 "github.com/luci/luci-go/common/logdog/protocol"
13 )
14
15 var (
16 // dataBufferSize is the size (in bytes) of the Data objects that a Stre am
17 // will lease.
18 dataBufferSize = 4096
19 )
20
21 // Stream is an individual Bundler Stream. Data is added to the Stream as a
22 // series of ordered binary chunks.
23 //
24 // A Stream is not goroutine-safe.
25 type Stream interface {
26 // LeaseData allocates and returns a Data block that stream data can be
27 // loaded into. The caller should Release() the Data, or transfer owners hip to
28 // something that will (e.g., Append()).
29 LeaseData() Data
30
31 // Append adds a sequential chunk of data to the Stream. Append may bloc k if
32 // the data isn't ready to be consumed.
33 //
34 // Append takes possession of the supplied Data, and will Release it whe n
35 // finished.
36 Append(Data) error
37
38 // Close closes the Stream, flushing any remaining data.
39 Close()
40 }
41
42 // streamConfig is the set of static configuration parameters for the stream.
43 type streamConfig struct {
44 // name is the name of this stream.
45 name string
46
47 // parser is the stream parser to use.
48 parser parser
49
50 // maximumBufferedBytes is the maximum number of bytes that this stream will
51 // retain in its parser before blocking subsequent Append attempts.
52 maximumBufferedBytes int64
53 // maximumBufferDuration is the maximum amount of time that a block of d ata
54 // can be comfortably buffered in the stream.
55 maximumBufferDuration time.Duration
56
57 // template is the minimally-populated Butler log bundle entry.
58 template protocol.ButlerLogBundle_Entry
59
60 // onAppend, if not nil, is invoked when an attempt to append data to th e
61 // stream occurs. If true is passed, the data was successfully appended. If
62 // false was passed, the data could not be appended immediately and the stream
63 // will block pending data consumption.
64 //
65 // The stream's append lock will be held when this method is called.
66 onAppend func(bool)
67 }
68
69 // streamImpl is a Stream implementation that is bound to a Bundler.
70 type streamImpl struct {
71 c *streamConfig
72
73 // drained is true if the stream is finished emitting data.
74 //
75 // It is an atomic value, with zero indicating not drained and non-zero
76 // indicating drained. It should be accessed via isDrained, and set with
77 // setDrained.
78 drained int32
79
80 // parserLock is a Mutex protecting the stream's parser instance and its
81 // underlying chunk.Buffer. Any access to either of these fields must be done
82 // while holding this lock.
83 parserLock sync.Mutex
84
85 // dataConsumedSignalC is a channel that can be used to signal when data has
86 // been consumed. It is set via signalDataConsumed.
87 dataConsumedSignalC chan struct{}
88 // appendErrValue is an atomically-set error. It will be returned by App end if
89 // an error occurs during stream processing.
90 appendErrValue atomic.Value
91
92 // stateLock protects stream state against concurrent access.
93 stateLock sync.Mutex
94 // closed, if non-zero, indicates that we have been closed and our strea m has
95 // finished reading.
96 //
97 // stateLock must be held when accessing this field.
98 closed bool
99 // lastLogEntry is a pointer to the last LogEntry that was exported.
100 //
101 // stateLock must be held when accessing this field.
102 lastLogEntry *protocol.LogEntry
103
104 // testAppendWaitCallback, if not nil, is called before Append blocks.
105 // This callback is used for testing coordination.
106 testAppendWaitCallback func()
107 }
108
109 func newStream(c streamConfig) *streamImpl {
110 return &streamImpl{
111 c: &c,
112
113 dataConsumedSignalC: make(chan struct{}, 1),
114 }
115 }
116
117 func (s *streamImpl) LeaseData() Data {
118 return globalDataPoolRegistry.getPool(dataBufferSize).getData()
119 }
120
121 func (s *streamImpl) Append(d Data) error {
122 defer func() {
123 if d != nil {
124 d.Release()
125 }
126 }()
127
128 // Block/loop until we've successfully appended the data.
129 for {
130 if err := s.appendError(); err != nil || d.Len() == 0 {
131 return err
132 }
133
134 s.withParserLock(func() {
135 if s.c.parser.bufferedBytes() == 0 ||
136 s.c.parser.bufferedBytes()+int64(d.Len()) <= s.c .maximumBufferedBytes {
137 s.c.parser.appendData(d)
138 d = nil
139 }
140 })
141
142 // The data was appended; we're done.
143 if s.c.onAppend != nil {
144 s.c.onAppend(d == nil)
145 }
146 if d == nil {
147 break
148 }
149
150 // Not ready to append; wait for a data event and re-evaluate.
151 <-s.dataConsumedSignalC
152 }
153
154 return nil
155 }
156
157 // Signals our Append loop that data has been consumed.
158 func (s *streamImpl) signalDataConsumed() {
159 select {
160 case s.dataConsumedSignalC <- struct{}{}:
161 break
162
163 default:
164 break
165 }
166 }
167
168 func (s *streamImpl) Close() {
169 s.stateLock.Lock()
170 defer s.stateLock.Unlock()
171
172 s.closed = true
173 s.maybeSetDrainedLocked()
174 }
175
176 func (s *streamImpl) name() string {
177 return s.c.name
178 }
179
180 // isDrained returns true if this stream is finished emitting data.
181 //
182 // This can happen if either:
183 // - The stream is closed and has no more buffered data, or
184 // - The strema has encountered a fatal error during processing.
185 func (s *streamImpl) isDrained() bool {
186 return atomic.LoadInt32(&s.drained) != 0
187 }
188
189 // setDrained marks this stream as drained.
190 func (s *streamImpl) setDrained() {
191 atomic.StoreInt32(&s.drained, 1)
192 }
193
194 // maybeSetDrainedLocked evaluates our buffer stream status. If the stream is
195 // closed and our buffer is empty, it will set the drained state to true.
196 //
197 // The stream's stateLock must be held when calling this method.
198 //
199 // The resulting drained state will be returned.
200 func (s *streamImpl) maybeSetDrainedLocked() bool {
201 if s.isDrained() {
202 return true
203 }
204
205 // Not drained ... should we be?
206 if s.closed {
207 bufSize := int64(0)
208 s.withParserLock(func() {
209 bufSize = s.c.parser.bufferedBytes()
210 })
211 if bufSize == 0 {
212 s.setDrained()
213 return true
214 }
215 }
216
217 return false
218 }
219
220 // expireTime returns the Time when the oldest chunk in the stream will expire.
221 //
222 // This is calculated ask:
223 // oldest.Timestamp + stream.maximumBufferDuration
224 // If there is no buffered data, oldest will return nil.
225 func (s *streamImpl) expireTime() (t time.Time, has bool) {
226 s.withParserLock(func() {
227 t, has = s.c.parser.firstChunkTime()
228 })
229
230 if has {
231 t = t.Add(s.c.maximumBufferDuration)
232 }
233 return
234 }
235
236 // nextBundleEntry generates bundles for this stream. The total bundle data size
237 // must not exceed the supplied size.
238 //
239 // If no bundle entry could be generated given the constraints, nil will be
240 // returned.
241 //
242 // It is possible for some entries to be returned alongside an error.
243 func (s *streamImpl) nextBundleEntry(bb *builder, aggressive bool) bool {
244 s.stateLock.Lock()
245 defer s.stateLock.Unlock()
246
247 // If we're not drained, try and get the next bundle.
248 modified := false
249 if !s.maybeSetDrainedLocked() {
250 err := error(nil)
251 modified, err = s.nextBundleEntryLocked(bb, aggressive)
252 if err != nil {
253 s.setAppendError(err)
254 s.setDrained()
255 }
256
257 if modified {
258 s.signalDataConsumed()
259 }
260 }
261
262 // If we're drained, populate our terminal state.
263 if s.maybeSetDrainedLocked() && s.lastLogEntry != nil {
264 bb.setStreamTerminal(&s.c.template, s.lastLogEntry.StreamIndex)
265 }
266
267 return modified
268 }
269
270 func (s *streamImpl) nextBundleEntryLocked(bb *builder, aggressive bool) (bool, error) {
271 c := constraints{
272 truncate: aggressive,
273 closed: s.closed,
274 }
275
276 // Extract as many entries as possible from the stream. As we extract, a djust
277 // our byte size.
278 //
279 // If we're closed, this will continue to consume until finished. If an error
280 // occurs, shut down data collection.
281 modified := false
282 ierr := error(nil)
283
284 for c.limit = bb.remaining(); c.limit > 0; c.limit = bb.remaining() {
285 emittedLog := false
286 s.withParserLock(func() {
287 le, err := s.c.parser.nextEntry(&c)
288 if err != nil {
289 ierr = err
290 return
291 }
292
293 if le == nil {
294 return
295 }
296
297 emittedLog = true
298 modified = true
299 s.lastLogEntry = le
300 bb.add(&s.c.template, le)
301 })
302
303 if !emittedLog {
304 break
305 }
306 }
307
308 return modified, ierr
309 }
310
311 func (s *streamImpl) withParserLock(f func()) {
312 s.parserLock.Lock()
313 defer s.parserLock.Unlock()
314
315 f()
316 }
317
318 func (s *streamImpl) appendError() error {
319 if err := s.appendErrValue.Load(); err != nil {
320 return err.(error)
321 }
322 return nil
323 }
324
325 func (s *streamImpl) setAppendError(err error) {
326 s.appendErrValue.Store(err)
327 s.signalDataConsumed()
328 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698