| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "sync" |
| 9 "sync/atomic" |
| 10 "time" |
| 11 |
| 12 "github.com/luci/luci-go/common/logdog/protocol" |
| 13 ) |
| 14 |
| 15 var ( |
| 16 // dataBufferSize is the size (in bytes) of the Data objects that a Stre
am |
| 17 // will lease. |
| 18 dataBufferSize = 4096 |
| 19 ) |
| 20 |
| 21 // Stream is an individual Bundler Stream. Data is added to the Stream as a |
| 22 // series of ordered binary chunks. |
| 23 // |
| 24 // A Stream is not goroutine-safe. |
| 25 type Stream interface { |
| 26 // LeaseData allocates and returns a Data block that stream data can be |
| 27 // loaded into. The caller should Release() the Data, or transfer owners
hip to |
| 28 // something that will (e.g., Append()). |
| 29 LeaseData() Data |
| 30 |
| 31 // Append adds a sequential chunk of data to the Stream. Append may bloc
k if |
| 32 // the data isn't ready to be consumed. |
| 33 // |
| 34 // Append takes possession of the supplied Data, and will Release it whe
n |
| 35 // finished. |
| 36 Append(Data) error |
| 37 |
| 38 // Close closes the Stream, flushing any remaining data. |
| 39 Close() |
| 40 } |
| 41 |
| 42 // streamConfig is the set of static configuration parameters for the stream. |
| 43 type streamConfig struct { |
| 44 // name is the name of this stream. |
| 45 name string |
| 46 |
| 47 // parser is the stream parser to use. |
| 48 parser parser |
| 49 |
| 50 // maximumBufferedBytes is the maximum number of bytes that this stream
will |
| 51 // retain in its parser before blocking subsequent Append attempts. |
| 52 maximumBufferedBytes int64 |
| 53 // maximumBufferDuration is the maximum amount of time that a block of d
ata |
| 54 // can be comfortably buffered in the stream. |
| 55 maximumBufferDuration time.Duration |
| 56 |
| 57 // template is the minimally-populated Butler log bundle entry. |
| 58 template protocol.ButlerLogBundle_Entry |
| 59 |
| 60 // onAppend, if not nil, is invoked when an attempt to append data to th
e |
| 61 // stream occurs. If true is passed, the data was successfully appended.
If |
| 62 // false was passed, the data could not be appended immediately and the
stream |
| 63 // will block pending data consumption. |
| 64 // |
| 65 // The stream's append lock will be held when this method is called. |
| 66 onAppend func(bool) |
| 67 } |
| 68 |
| 69 // streamImpl is a Stream implementation that is bound to a Bundler. |
| 70 type streamImpl struct { |
| 71 c *streamConfig |
| 72 |
| 73 // drained is true if the stream is finished emitting data. |
| 74 // |
| 75 // It is an atomic value, with zero indicating not drained and non-zero |
| 76 // indicating drained. It should be accessed via isDrained, and set with |
| 77 // setDrained. |
| 78 drained int32 |
| 79 |
| 80 // parserLock is a Mutex protecting the stream's parser instance and its |
| 81 // underlying chunk.Buffer. Any access to either of these fields must be
done |
| 82 // while holding this lock. |
| 83 parserLock sync.Mutex |
| 84 |
| 85 // dataConsumedSignalC is a channel that can be used to signal when data
has |
| 86 // been consumed. It is set via signalDataConsumed. |
| 87 dataConsumedSignalC chan struct{} |
| 88 // appendErrValue is an atomically-set error. It will be returned by App
end if |
| 89 // an error occurs during stream processing. |
| 90 appendErrValue atomic.Value |
| 91 |
| 92 // stateLock protects stream state against concurrent access. |
| 93 stateLock sync.Mutex |
| 94 // closed, if non-zero, indicates that we have been closed and our strea
m has |
| 95 // finished reading. |
| 96 // |
| 97 // stateLock must be held when accessing this field. |
| 98 closed bool |
| 99 // lastLogEntry is a pointer to the last LogEntry that was exported. |
| 100 // |
| 101 // stateLock must be held when accessing this field. |
| 102 lastLogEntry *protocol.LogEntry |
| 103 |
| 104 // testAppendWaitCallback, if not nil, is called before Append blocks. |
| 105 // This callback is used for testing coordination. |
| 106 testAppendWaitCallback func() |
| 107 } |
| 108 |
| 109 func newStream(c streamConfig) *streamImpl { |
| 110 return &streamImpl{ |
| 111 c: &c, |
| 112 |
| 113 dataConsumedSignalC: make(chan struct{}, 1), |
| 114 } |
| 115 } |
| 116 |
| 117 func (s *streamImpl) LeaseData() Data { |
| 118 return globalDataPoolRegistry.getPool(dataBufferSize).getData() |
| 119 } |
| 120 |
| 121 func (s *streamImpl) Append(d Data) error { |
| 122 defer func() { |
| 123 if d != nil { |
| 124 d.Release() |
| 125 } |
| 126 }() |
| 127 |
| 128 // Block/loop until we've successfully appended the data. |
| 129 for { |
| 130 dLen := int64(len(d.Bytes())) |
| 131 if err := s.appendError(); err != nil || dLen == 0 { |
| 132 return err |
| 133 } |
| 134 |
| 135 s.withParserLock(func() { |
| 136 if s.c.parser.bufferedBytes() == 0 || |
| 137 s.c.parser.bufferedBytes()+dLen <= s.c.maximumBu
fferedBytes { |
| 138 s.c.parser.appendData(d) |
| 139 d = nil |
| 140 } |
| 141 }) |
| 142 |
| 143 // The data was appended; we're done. |
| 144 if s.c.onAppend != nil { |
| 145 s.c.onAppend(d == nil) |
| 146 } |
| 147 if d == nil { |
| 148 break |
| 149 } |
| 150 |
| 151 // Not ready to append; wait for a data event and re-evaluate. |
| 152 <-s.dataConsumedSignalC |
| 153 } |
| 154 |
| 155 return nil |
| 156 } |
| 157 |
| 158 // Signals our Append loop that data has been consumed. |
| 159 func (s *streamImpl) signalDataConsumed() { |
| 160 select { |
| 161 case s.dataConsumedSignalC <- struct{}{}: |
| 162 break |
| 163 |
| 164 default: |
| 165 break |
| 166 } |
| 167 } |
| 168 |
| 169 func (s *streamImpl) Close() { |
| 170 s.stateLock.Lock() |
| 171 defer s.stateLock.Unlock() |
| 172 |
| 173 s.closed = true |
| 174 s.maybeSetDrainedLocked() |
| 175 } |
| 176 |
| 177 func (s *streamImpl) name() string { |
| 178 return s.c.name |
| 179 } |
| 180 |
| 181 // isDrained returns true if this stream is finished emitting data. |
| 182 // |
| 183 // This can happen if either: |
| 184 // - The stream is closed and has no more buffered data, or |
| 185 // - The strema has encountered a fatal error during processing. |
| 186 func (s *streamImpl) isDrained() bool { |
| 187 return atomic.LoadInt32(&s.drained) != 0 |
| 188 } |
| 189 |
| 190 // setDrained marks this stream as drained. |
| 191 func (s *streamImpl) setDrained() { |
| 192 atomic.StoreInt32(&s.drained, 1) |
| 193 } |
| 194 |
| 195 // maybeSetDrainedLocked evaluates our buffer stream status. If the stream is |
| 196 // closed and our buffer is empty, it will set the drained state to true. |
| 197 // |
| 198 // The stream's stateLock must be held when calling this method. |
| 199 // |
| 200 // The resulting drained state will be returned. |
| 201 func (s *streamImpl) maybeSetDrainedLocked() bool { |
| 202 if s.isDrained() { |
| 203 return true |
| 204 } |
| 205 |
| 206 // Not drained ... should we be? |
| 207 if s.closed { |
| 208 bufSize := int64(0) |
| 209 s.withParserLock(func() { |
| 210 bufSize = s.c.parser.bufferedBytes() |
| 211 }) |
| 212 if bufSize == 0 { |
| 213 s.setDrained() |
| 214 return true |
| 215 } |
| 216 } |
| 217 |
| 218 return false |
| 219 } |
| 220 |
| 221 // expireTime returns the Time when the oldest chunk in the stream will expire. |
| 222 // |
| 223 // This is calculated ask: |
| 224 // oldest.Timestamp + stream.maximumBufferDuration |
| 225 // If there is no buffered data, oldest will return nil. |
| 226 func (s *streamImpl) expireTime() (t time.Time, has bool) { |
| 227 s.withParserLock(func() { |
| 228 t, has = s.c.parser.firstChunkTime() |
| 229 }) |
| 230 |
| 231 if has { |
| 232 t = t.Add(s.c.maximumBufferDuration) |
| 233 } |
| 234 return |
| 235 } |
| 236 |
| 237 // nextBundleEntry generates bundles for this stream. The total bundle data size |
| 238 // must not exceed the supplied size. |
| 239 // |
| 240 // If no bundle entry could be generated given the constraints, nil will be |
| 241 // returned. |
| 242 // |
| 243 // It is possible for some entries to be returned alongside an error. |
| 244 func (s *streamImpl) nextBundleEntry(bb *builder, aggressive bool) bool { |
| 245 s.stateLock.Lock() |
| 246 defer s.stateLock.Unlock() |
| 247 |
| 248 // If we're not drained, try and get the next bundle. |
| 249 modified := false |
| 250 if !s.maybeSetDrainedLocked() { |
| 251 err := error(nil) |
| 252 modified, err = s.nextBundleEntryLocked(bb, aggressive) |
| 253 if err != nil { |
| 254 s.setAppendError(err) |
| 255 s.setDrained() |
| 256 } |
| 257 |
| 258 if modified { |
| 259 s.signalDataConsumed() |
| 260 } |
| 261 } |
| 262 |
| 263 // If we're drained, populate our terminal state. |
| 264 if s.maybeSetDrainedLocked() && s.lastLogEntry != nil { |
| 265 bb.setStreamTerminal(&s.c.template, s.lastLogEntry.StreamIndex) |
| 266 } |
| 267 |
| 268 return modified |
| 269 } |
| 270 |
| 271 func (s *streamImpl) nextBundleEntryLocked(bb *builder, aggressive bool) (bool,
error) { |
| 272 c := constraints{ |
| 273 allowSplit: aggressive, |
| 274 closed: s.closed, |
| 275 } |
| 276 |
| 277 // Extract as many entries as possible from the stream. As we extract, a
djust |
| 278 // our byte size. |
| 279 // |
| 280 // If we're closed, this will continue to consume until finished. If an
error |
| 281 // occurs, shut down data collection. |
| 282 modified := false |
| 283 ierr := error(nil) |
| 284 |
| 285 for c.limit = bb.remaining(); c.limit > 0; c.limit = bb.remaining() { |
| 286 emittedLog := false |
| 287 s.withParserLock(func() { |
| 288 le, err := s.c.parser.nextEntry(&c) |
| 289 if err != nil { |
| 290 ierr = err |
| 291 return |
| 292 } |
| 293 |
| 294 if le == nil { |
| 295 return |
| 296 } |
| 297 |
| 298 emittedLog = true |
| 299 modified = true |
| 300 s.lastLogEntry = le |
| 301 bb.add(&s.c.template, le) |
| 302 }) |
| 303 |
| 304 if !emittedLog { |
| 305 break |
| 306 } |
| 307 } |
| 308 |
| 309 return modified, ierr |
| 310 } |
| 311 |
| 312 func (s *streamImpl) withParserLock(f func()) { |
| 313 s.parserLock.Lock() |
| 314 defer s.parserLock.Unlock() |
| 315 |
| 316 f() |
| 317 } |
| 318 |
| 319 func (s *streamImpl) appendError() error { |
| 320 if err := s.appendErrValue.Load(); err != nil { |
| 321 return err.(error) |
| 322 } |
| 323 return nil |
| 324 } |
| 325 |
| 326 func (s *streamImpl) setAppendError(err error) { |
| 327 s.appendErrValue.Store(err) |
| 328 s.signalDataConsumed() |
| 329 } |
| OLD | NEW |