Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Enhanced doc.go. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "container/heap"
9 "fmt"
10 "sync"
11 "time"
12
13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
14 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/logdog/protocol"
16 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/google"
18 )
19
20 // Config is the Bundler configuration.
21 type Config struct {
22 // Clock is the clock instance that will be used for Bundler and stream
23 // timing.
24 Clock clock.Clock
25
26 // Source is the bundle source string to use. This can be empty if there is no
27 // source information to include.
28 Source string
29
30 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p er
31 // stream.
32 MaxBufferedBytes int64
33
34 // MaxBundleSize is the maximum bundle size in bytes that may be generat ed.
35 //
36 // If this value is zero, no size constraint will be applied to generate d
37 // bundles.
38 MaxBundleSize int
39
40 // MaxBufferDelay is the maximum amount of time we're willing to buffer
41 // bundled data. Other factors can cause the bundle to be sent before th is,
42 // but it is an upper bound.
43 MaxBufferDelay time.Duration
44 }
45
46 type bundlerStream interface {
47 isDrained() bool
48 name() string
49 expireTime() (time.Time, bool)
50 nextBundleEntry(*builder, bool) bool
51 }
52
53 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for
54 // stream registration and bundle consumption.
55 type Bundler interface {
56 Register(streamproto.Properties) (Stream, error)
57
58 // Next causes the Bundler to pack a new ButlerLogBundle message for
59 // transmission.
60 //
61 // The maximum marshalled byte size of the bundle's protobuf is supplied ; the
62 // generated bundle will not exceed this size.
63 Next() *protocol.ButlerLogBundle
64
65 // CloseAndFlush flushes the Bundler, blocking until all registered Stre ams
66 // have closed and all buffered data has been output.
67 CloseAndFlush()
68 }
69
70 type bundlerImpl struct {
71 *Config
72
73 // finishedC is closed when makeBundles goroutine has terminated.
74 finishedC chan struct{}
75 bundleC chan *protocol.ButlerLogBundle
76
77 // streamsLock is a lock around the `streams` map and its contents.
78 streamsLock sync.Mutex
79 // streamsCond is a Cond bound to streamsLock, used to signal Next() whe n new
80 // data is available.
81 streamsCond *timeoutCond
82 // streams is the set of currently-registered Streams.
83 streams map[string]bundlerStream
84 // flushing is true if we're blocking on CloseAndFlush().
85 flushing bool
86
87 // prefixCounter is a global counter for Prefix-wide streams.
88 prefixCounter counter
89 }
90
91 // New instantiates a new Bundler instance.
92 func New(c Config) Bundler {
93 b := bundlerImpl{
94 Config: &c,
95 finishedC: make(chan struct{}),
96 bundleC: make(chan *protocol.ButlerLogBundle),
97 streams: map[string]bundlerStream{},
98 }
99 b.streamsCond = newTimeoutCond(b.getClock(), &b.streamsLock)
100
101 go b.makeBundles()
102 return &b
103 }
104
105 func (b *bundlerImpl) Register(p streamproto.Properties) (Stream, error) {
106 b.streamsLock.Lock()
107 defer b.streamsLock.Unlock()
108
109 if s := b.streams[p.Name]; s != nil {
110 return nil, fmt.Errorf("a Stream is already registered for %q", p.Name)
111 }
112
113 // Our Properties must validate.
114 if err := p.Validate(); err != nil {
115 return nil, err
116 }
iannucci 2015/11/13 07:16:38 can we do this before the lock? actually... can't
dnj 2015/11/14 00:30:36 Yep, Done.
117
118 // Construct a parser for this stream.
119 c := streamConfig{
120 name: p.Name,
121 template: protocol.ButlerLogBundle_Entry{
122 Desc: &p.LogStreamDescriptor,
123 },
124 maximumBufferDuration: b.MaxBufferDelay,
125 maximumBufferedBytes: b.MaxBufferedBytes,
126 onAppend: func(appended bool) {
127 if appended {
128 b.signalStreamUpdate()
129 }
130 },
131 }
132
133 err := error(nil)
134 c.parser, err = newParser(&p, &b.prefixCounter)
135 if err != nil {
136 return nil, fmt.Errorf("failed to create stream parser: %s", err )
137 }
138
139 // Generate a secret for this Stream instance.
140 c.template.Secret, err = types.NewStreamSecret()
141 if err != nil {
142 return nil, fmt.Errorf("failed to generate stream secret: %s", e rr)
143 }
144
145 // Create a new stream. This will kick off its processing goroutine, whi ch
146 // will not stop until it is closed.
147 s := newStream(c)
148 b.registerStreamLocked(s)
149 return s, nil
150 }
151
152 func (b *bundlerImpl) CloseAndFlush() {
153 // Mark that we're flushing. This will cause us to perform more aggressi ve
154 // bundling in Next().
155 b.startFlushing()
156 <-b.finishedC
157 }
158
159 func (b *bundlerImpl) Next() *protocol.ButlerLogBundle {
160 return <-b.bundleC
161 }
162
163 func (b *bundlerImpl) startFlushing() {
164 b.streamsLock.Lock()
165 defer b.streamsLock.Unlock()
166
167 if !b.flushing {
168 b.flushing = true
169 b.signalStreamUpdateLocked()
170 }
171 }
172
173 // makeBundles is run in its own goroutine. It runs continuously, responding
174 // to Stream constraints and availability and sending ButlerLogBundles through
175 // bundleC when available.
176 //
177 // makeBundles will terminate when closeC is closed and all streams are drained.
178 func (b *bundlerImpl) makeBundles() {
179 defer close(b.finishedC)
180 defer close(b.bundleC)
181
182 b.streamsLock.Lock()
183 defer b.streamsLock.Unlock()
184
185 bb := (*builder)(nil)
186 defer func() {
187 if bb != nil && bb.hasContent() {
188 b.bundleC <- bb.bundle()
189 }
190 }()
191
192 for {
193 bb = &builder{
194 template: protocol.ButlerLogBundle{
195 Source: b.Source,
196 Timestamp: google.NewTimestamp(b.getClock().Now( )),
197 },
198 size: b.MaxBundleSize,
199 }
200 oldestContentTime := time.Time{}
201
202 for {
203 state := b.getStreamStateLocked()
204
205 // Attempt to create more bundles.
206 sendNow := b.bundleRoundLocked(bb, state)
207
208 // Prune any drained streams.
209 state.forEachStream(func(s bundlerStream) bool {
210 if s.isDrained() {
211 state.removeStream(s.name())
212 b.unregisterStreamLocked(s)
213 }
214
215 return true
216 })
217
218 if b.flushing && len(state.streams) == 0 {
219 // We're flushing, and there are no more streams , so we're completely
220 // finished.
221 //
222 // If we have any content in our builder, it wil l be exported via defer.
223 return
224 }
225
226 // If we have content, consider emitting this bundle.
227 if bb.hasContent() {
228 if b.MaxBufferDelay == 0 || sendNow || bb.ready( ) {
229 break
230 }
231 }
232
233 // Mark the first time this round where we actually saw data.
234 if oldestContentTime.IsZero() && bb.hasContent() {
235 oldestContentTime = state.now
236 }
237
238 // We will yield our stream lock and sleep, waiting for either:
239 // 1) The earliest expiration time.
240 // 2) A streams channel signal.
241 //
242 // We use a Cond here because we want Streams to be able to be added
243 // while we're waiting for stream data.
244 nextExpire, has := state.nextExpire()
245
246 // If we have an oldest content time, that also means th at we have
247 // content. Factor this constraint in.
248 if !oldestContentTime.IsZero() {
249 roundExpire := oldestContentTime.Add(b.MaxBuffer Delay)
250 if !roundExpire.After(state.now) {
251 break
252 }
253
254 if !has || roundExpire.Before(nextExpire) {
255 nextExpire = roundExpire
256 has = true
257 }
258 }
259
260 if has {
261 b.streamsCond.waitTimeout(nextExpire.Sub(state.n ow))
262 } else {
263 // No data, no expire constraints. Wait indefini tely for something
264 // to change.
265 b.streamsCond.Wait()
266 }
267 }
268
269 // If our bundler has contents, send them.
270 if bb.hasContent() {
271 b.bundleC <- bb.bundle()
272 bb = nil
273 }
274 }
275 }
276
277 // Performs a round of bundling with a stream state. polling stream state and ad ding
iannucci 2015/11/13 07:16:38 this seems incomplete?
dnj 2015/11/14 00:30:36 Done.
278 //
279 // This method will block until a suitable bundle is available. Availability
280 // is subject both to time and data constraints:
281 // - If data has exceeded its buffer duration threshold, a Bundle will be cut
282 // immediately.
iannucci 2015/11/13 07:16:38 are thresholds calculated absolutely, or from the
dnj 2015/11/14 00:30:36 They are relative to when the buffered data was in
283 // - If no data is set to expire, the Bundler may wait for more data to
284 // produce a more optimally-packed bundle.
285 //
286 // At a high level, Next operates as follows:
287 // 1) Freeze all stream state.
288 // 2) Scan streams for data that has exceeded its threshold; if data is found:
289 // - Aggressively pack expired data into a Bundle.
290 // - Optimally pack the remainder of the Bundle with any available data.
291 // - Return the Bundle.
292 //
293 // 3) Examine the remaining data sizes, waiting for either:
294 // - Enough stream data to fill our Bundle.
295 // - Our timeout, if the Bundler is not closed.
296 // 4) Pack a Bundle with the remaining data optimally, emphasizing streams
297 // with older data.
298 //
299 // Returns true if bundle some data was added that should be sent immediately.
300 func (b *bundlerImpl) bundleRoundLocked(bb *builder, state *streamState) bool {
301 sendNow := false
302
303 // First pass: non-blocking data that has exceeded its storage threshold .
304 for bb.remaining() > 0 {
305 s := state.next()
306 if s == nil {
307 break
308 }
309
310 if et, has := s.expireTime(); !has || et.After(state.now) {
311 // This stream (and all other streams, since we're sorte d) expires in
312 // the future, so we're done with the first pass.
313 break
314 }
315
316 // Pull bundles from this stream.
317 if modified := s.nextBundleEntry(bb, true); modified {
318 state.streamUpdated(s.name())
319
320 // We have at least one time-sensitive bundle, so send t his round.
321 sendNow = true
322 }
323 }
324
325 // Second pass: bundle any available data.
326 state.forEachStream(func(s bundlerStream) bool {
327 if bb.remaining() == 0 {
328 return false
329 }
330
331 if modified := s.nextBundleEntry(bb, b.flushing); modified {
332 state.streamUpdated(s.name())
333 }
334 return true
335 })
336
337 return sendNow
338 }
339
340 func (b *bundlerImpl) getStreamStateLocked() *streamState {
341 // Lock and collect each stream.
342 state := &streamState{
343 streams: make([]bundlerStream, 0, len(b.streams)),
344 now: b.getClock().Now(),
345 }
346
347 for _, s := range b.streams {
348 state.streams = append(state.streams, s)
349 }
350 heap.Init(state)
351
352 return state
353 }
354
355 func (b *bundlerImpl) registerStreamLocked(s bundlerStream) {
356 b.streams[s.name()] = s
357 b.signalStreamUpdateLocked()
358 }
359
360 func (b *bundlerImpl) unregisterStreamLocked(s bundlerStream) {
361 delete(b.streams, s.name())
362 }
363
364 func (b *bundlerImpl) signalStreamUpdate() {
365 b.streamsLock.Lock()
366 defer b.streamsLock.Unlock()
367
368 b.signalStreamUpdateLocked()
369 }
370
371 func (b *bundlerImpl) signalStreamUpdateLocked() {
372 b.streamsCond.Broadcast()
373 }
374
375 // nextPrefixIndex is a goroutine-safe method that returns the next prefix index
376 // for the given Bundler.
377 func (b *bundlerImpl) nextPrefixIndex() uint64 {
378 return uint64(b.prefixCounter.next())
379 }
380
381 func (b *bundlerImpl) getClock() clock.Clock {
382 c := b.Clock
383 if c != nil {
384 return c
385 }
386 return clock.GetSystemClock()
387 }
388
389 // streamState is a snapshot of the current stream registration. All operations
390 // performed on the state require streamLock to be held.
391 //
392 // streamState implements heap.Interface for its streams array. Streams without
393 // data times (nil) are considered to be greater than those with times.
394 type streamState struct {
395 streams []bundlerStream
396 now time.Time
397 }
398
399 var _ heap.Interface = (*streamState)(nil)
400
401 func (s *streamState) next() bundlerStream {
402 if len(s.streams) == 0 {
403 return nil
404 }
405 return s.streams[0]
406 }
407
408 func (s *streamState) nextExpire() (time.Time, bool) {
409 if next := s.next(); next != nil {
410 if ts, ok := next.expireTime(); ok {
411 return ts, true
412 }
413 }
414 return time.Time{}, false
415 }
416
417 func (s *streamState) streamUpdated(name string) {
418 if si, idx := s.streamIndex(name); si != nil {
419 heap.Fix(s, idx)
420 }
421 }
422
423 func (s *streamState) forEachStream(f func(bundlerStream) bool) {
424 // Clone our streams, since the callback may mutate their order.
425 streams := make([]bundlerStream, len(s.streams))
426 for i, s := range s.streams {
427 streams[i] = s
428 }
429
430 for _, s := range streams {
431 if !f(s) {
432 break
433 }
434 }
435 }
436
437 // removeStream removes a stream from the stream state.
438 func (s *streamState) removeStream(name string) bundlerStream {
439 if si, idx := s.streamIndex(name); si != nil {
440 heap.Remove(s, idx)
441 return si
442 }
443 return nil
444 }
445
446 func (s *streamState) streamIndex(name string) (bundlerStream, int) {
447 for i, si := range s.streams {
448 if si.name() == name {
449 return si, i
450 }
451 }
452 return nil, -1
453 }
454
455 func (s *streamState) Len() int {
456 return len(s.streams)
457 }
458
459 func (s *streamState) Less(i, j int) bool {
460 si, sj := s.streams[i], s.streams[j]
461
462 if it, ok := si.expireTime(); ok {
463 if jt, ok := sj.expireTime(); ok {
464 return it.Before(jt)
465 }
466
467 // i has data, but j does not, so i < j.
468 return true
469 }
470
471 // i has no data, so i us greater than all other streams.
472 return false
473 }
474
475 func (s *streamState) Swap(i, j int) {
476 s.streams[i], s.streams[j] = s.streams[j], s.streams[i]
477 }
478
479 func (s *streamState) Push(x interface{}) {
480 s.streams = append(s.streams, x.(bundlerStream))
481 }
482
483 func (s *streamState) Pop() interface{} {
484 last := s.streams[len(s.streams)-1]
485 s.streams = s.streams[:len(s.streams)-1]
486 return last
487 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698