Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package bundler | |
| 6 | |
| 7 import ( | |
| 8 "container/heap" | |
| 9 "fmt" | |
| 10 "sync" | |
| 11 "time" | |
| 12 | |
| 13 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | |
| 14 "github.com/luci/luci-go/common/clock" | |
| 15 "github.com/luci/luci-go/common/logdog/protocol" | |
| 16 "github.com/luci/luci-go/common/logdog/types" | |
| 17 "github.com/luci/luci-go/common/proto/google" | |
| 18 ) | |
| 19 | |
| 20 // Config is the Bundler configuration. | |
| 21 type Config struct { | |
| 22 // Clock is the clock instance that will be used for Bundler and stream | |
| 23 // timing. | |
| 24 Clock clock.Clock | |
| 25 | |
| 26 // Source is the bundle source string to use. This can be empty if there is no | |
| 27 // source information to include. | |
| 28 Source string | |
| 29 | |
| 30 // MaxBufferedBytes is the maximum number of bytes to buffer in memory p er | |
| 31 // stream. | |
| 32 MaxBufferedBytes int64 | |
| 33 | |
| 34 // MaxBundleSize is the maximum bundle size in bytes that may be generat ed. | |
| 35 // | |
| 36 // If this value is zero, no size constraint will be applied to generate d | |
| 37 // bundles. | |
| 38 MaxBundleSize int | |
| 39 | |
| 40 // MaxBufferDelay is the maximum amount of time we're willing to buffer | |
| 41 // bundled data. Other factors can cause the bundle to be sent before th is, | |
| 42 // but it is an upper bound. | |
| 43 MaxBufferDelay time.Duration | |
| 44 } | |
| 45 | |
| 46 type bundlerStream interface { | |
| 47 isDrained() bool | |
| 48 name() string | |
| 49 expireTime() (time.Time, bool) | |
| 50 nextBundleEntry(*builder, bool) bool | |
| 51 } | |
| 52 | |
| 53 // Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for | |
| 54 // stream registration and bundle consumption. | |
| 55 type Bundler interface { | |
| 56 Register(streamproto.Properties) (Stream, error) | |
| 57 | |
| 58 // Next causes the Bundler to pack a new ButlerLogBundle message for | |
| 59 // transmission. | |
| 60 // | |
| 61 // The maximum marshalled byte size of the bundle's protobuf is supplied ; the | |
| 62 // generated bundle will not exceed this size. | |
| 63 Next() *protocol.ButlerLogBundle | |
| 64 | |
| 65 // CloseAndFlush flushes the Bundler, blocking until all registered Stre ams | |
| 66 // have closed and all buffered data has been output. | |
| 67 CloseAndFlush() | |
| 68 } | |
| 69 | |
| 70 type bundlerImpl struct { | |
| 71 *Config | |
| 72 | |
| 73 // finishedC is closed when makeBundles goroutine has terminated. | |
| 74 finishedC chan struct{} | |
| 75 bundleC chan *protocol.ButlerLogBundle | |
| 76 | |
| 77 // streamsLock is a lock around the `streams` map and its contents. | |
| 78 streamsLock sync.Mutex | |
| 79 // streamsCond is a Cond bound to streamsLock, used to signal Next() whe n new | |
| 80 // data is available. | |
| 81 streamsCond *timeoutCond | |
| 82 // streams is the set of currently-registered Streams. | |
| 83 streams map[string]bundlerStream | |
| 84 // flushing is true if we're blocking on CloseAndFlush(). | |
| 85 flushing bool | |
| 86 | |
| 87 // prefixCounter is a global counter for Prefix-wide streams. | |
| 88 prefixCounter counter | |
| 89 } | |
| 90 | |
| 91 // New instantiates a new Bundler instance. | |
| 92 func New(c Config) Bundler { | |
| 93 b := bundlerImpl{ | |
| 94 Config: &c, | |
| 95 finishedC: make(chan struct{}), | |
| 96 bundleC: make(chan *protocol.ButlerLogBundle), | |
| 97 streams: map[string]bundlerStream{}, | |
| 98 } | |
| 99 b.streamsCond = newTimeoutCond(b.getClock(), &b.streamsLock) | |
| 100 | |
| 101 go b.makeBundles() | |
| 102 return &b | |
| 103 } | |
| 104 | |
| 105 func (b *bundlerImpl) Register(p streamproto.Properties) (Stream, error) { | |
| 106 b.streamsLock.Lock() | |
| 107 defer b.streamsLock.Unlock() | |
| 108 | |
| 109 if s := b.streams[p.Name]; s != nil { | |
| 110 return nil, fmt.Errorf("a Stream is already registered for %q", p.Name) | |
| 111 } | |
| 112 | |
| 113 // Our Properties must validate. | |
| 114 if err := p.Validate(); err != nil { | |
| 115 return nil, err | |
| 116 } | |
|
iannucci
2015/11/13 07:16:38
can we do this before the lock? actually... can't
dnj
2015/11/14 00:30:36
Yep, Done.
| |
| 117 | |
| 118 // Construct a parser for this stream. | |
| 119 c := streamConfig{ | |
| 120 name: p.Name, | |
| 121 template: protocol.ButlerLogBundle_Entry{ | |
| 122 Desc: &p.LogStreamDescriptor, | |
| 123 }, | |
| 124 maximumBufferDuration: b.MaxBufferDelay, | |
| 125 maximumBufferedBytes: b.MaxBufferedBytes, | |
| 126 onAppend: func(appended bool) { | |
| 127 if appended { | |
| 128 b.signalStreamUpdate() | |
| 129 } | |
| 130 }, | |
| 131 } | |
| 132 | |
| 133 err := error(nil) | |
| 134 c.parser, err = newParser(&p, &b.prefixCounter) | |
| 135 if err != nil { | |
| 136 return nil, fmt.Errorf("failed to create stream parser: %s", err ) | |
| 137 } | |
| 138 | |
| 139 // Generate a secret for this Stream instance. | |
| 140 c.template.Secret, err = types.NewStreamSecret() | |
| 141 if err != nil { | |
| 142 return nil, fmt.Errorf("failed to generate stream secret: %s", e rr) | |
| 143 } | |
| 144 | |
| 145 // Create a new stream. This will kick off its processing goroutine, whi ch | |
| 146 // will not stop until it is closed. | |
| 147 s := newStream(c) | |
| 148 b.registerStreamLocked(s) | |
| 149 return s, nil | |
| 150 } | |
| 151 | |
| 152 func (b *bundlerImpl) CloseAndFlush() { | |
| 153 // Mark that we're flushing. This will cause us to perform more aggressi ve | |
| 154 // bundling in Next(). | |
| 155 b.startFlushing() | |
| 156 <-b.finishedC | |
| 157 } | |
| 158 | |
| 159 func (b *bundlerImpl) Next() *protocol.ButlerLogBundle { | |
| 160 return <-b.bundleC | |
| 161 } | |
| 162 | |
| 163 func (b *bundlerImpl) startFlushing() { | |
| 164 b.streamsLock.Lock() | |
| 165 defer b.streamsLock.Unlock() | |
| 166 | |
| 167 if !b.flushing { | |
| 168 b.flushing = true | |
| 169 b.signalStreamUpdateLocked() | |
| 170 } | |
| 171 } | |
| 172 | |
| 173 // makeBundles is run in its own goroutine. It runs continuously, responding | |
| 174 // to Stream constraints and availability and sending ButlerLogBundles through | |
| 175 // bundleC when available. | |
| 176 // | |
| 177 // makeBundles will terminate when closeC is closed and all streams are drained. | |
| 178 func (b *bundlerImpl) makeBundles() { | |
| 179 defer close(b.finishedC) | |
| 180 defer close(b.bundleC) | |
| 181 | |
| 182 b.streamsLock.Lock() | |
| 183 defer b.streamsLock.Unlock() | |
| 184 | |
| 185 bb := (*builder)(nil) | |
| 186 defer func() { | |
| 187 if bb != nil && bb.hasContent() { | |
| 188 b.bundleC <- bb.bundle() | |
| 189 } | |
| 190 }() | |
| 191 | |
| 192 for { | |
| 193 bb = &builder{ | |
| 194 template: protocol.ButlerLogBundle{ | |
| 195 Source: b.Source, | |
| 196 Timestamp: google.NewTimestamp(b.getClock().Now( )), | |
| 197 }, | |
| 198 size: b.MaxBundleSize, | |
| 199 } | |
| 200 oldestContentTime := time.Time{} | |
| 201 | |
| 202 for { | |
| 203 state := b.getStreamStateLocked() | |
| 204 | |
| 205 // Attempt to create more bundles. | |
| 206 sendNow := b.bundleRoundLocked(bb, state) | |
| 207 | |
| 208 // Prune any drained streams. | |
| 209 state.forEachStream(func(s bundlerStream) bool { | |
| 210 if s.isDrained() { | |
| 211 state.removeStream(s.name()) | |
| 212 b.unregisterStreamLocked(s) | |
| 213 } | |
| 214 | |
| 215 return true | |
| 216 }) | |
| 217 | |
| 218 if b.flushing && len(state.streams) == 0 { | |
| 219 // We're flushing, and there are no more streams , so we're completely | |
| 220 // finished. | |
| 221 // | |
| 222 // If we have any content in our builder, it wil l be exported via defer. | |
| 223 return | |
| 224 } | |
| 225 | |
| 226 // If we have content, consider emitting this bundle. | |
| 227 if bb.hasContent() { | |
| 228 if b.MaxBufferDelay == 0 || sendNow || bb.ready( ) { | |
| 229 break | |
| 230 } | |
| 231 } | |
| 232 | |
| 233 // Mark the first time this round where we actually saw data. | |
| 234 if oldestContentTime.IsZero() && bb.hasContent() { | |
| 235 oldestContentTime = state.now | |
| 236 } | |
| 237 | |
| 238 // We will yield our stream lock and sleep, waiting for either: | |
| 239 // 1) The earliest expiration time. | |
| 240 // 2) A streams channel signal. | |
| 241 // | |
| 242 // We use a Cond here because we want Streams to be able to be added | |
| 243 // while we're waiting for stream data. | |
| 244 nextExpire, has := state.nextExpire() | |
| 245 | |
| 246 // If we have an oldest content time, that also means th at we have | |
| 247 // content. Factor this constraint in. | |
| 248 if !oldestContentTime.IsZero() { | |
| 249 roundExpire := oldestContentTime.Add(b.MaxBuffer Delay) | |
| 250 if !roundExpire.After(state.now) { | |
| 251 break | |
| 252 } | |
| 253 | |
| 254 if !has || roundExpire.Before(nextExpire) { | |
| 255 nextExpire = roundExpire | |
| 256 has = true | |
| 257 } | |
| 258 } | |
| 259 | |
| 260 if has { | |
| 261 b.streamsCond.waitTimeout(nextExpire.Sub(state.n ow)) | |
| 262 } else { | |
| 263 // No data, no expire constraints. Wait indefini tely for something | |
| 264 // to change. | |
| 265 b.streamsCond.Wait() | |
| 266 } | |
| 267 } | |
| 268 | |
| 269 // If our bundler has contents, send them. | |
| 270 if bb.hasContent() { | |
| 271 b.bundleC <- bb.bundle() | |
| 272 bb = nil | |
| 273 } | |
| 274 } | |
| 275 } | |
| 276 | |
| 277 // Performs a round of bundling with a stream state. polling stream state and ad ding | |
|
iannucci
2015/11/13 07:16:38
this seems incomplete?
dnj
2015/11/14 00:30:36
Done.
| |
| 278 // | |
| 279 // This method will block until a suitable bundle is available. Availability | |
| 280 // is subject both to time and data constraints: | |
| 281 // - If data has exceeded its buffer duration threshold, a Bundle will be cut | |
| 282 // immediately. | |
|
iannucci
2015/11/13 07:16:38
are thresholds calculated absolutely, or from the
dnj
2015/11/14 00:30:36
They are relative to when the buffered data was in
| |
| 283 // - If no data is set to expire, the Bundler may wait for more data to | |
| 284 // produce a more optimally-packed bundle. | |
| 285 // | |
| 286 // At a high level, Next operates as follows: | |
| 287 // 1) Freeze all stream state. | |
| 288 // 2) Scan streams for data that has exceeded its threshold; if data is found: | |
| 289 // - Aggressively pack expired data into a Bundle. | |
| 290 // - Optimally pack the remainder of the Bundle with any available data. | |
| 291 // - Return the Bundle. | |
| 292 // | |
| 293 // 3) Examine the remaining data sizes, waiting for either: | |
| 294 // - Enough stream data to fill our Bundle. | |
| 295 // - Our timeout, if the Bundler is not closed. | |
| 296 // 4) Pack a Bundle with the remaining data optimally, emphasizing streams | |
| 297 // with older data. | |
| 298 // | |
| 299 // Returns true if bundle some data was added that should be sent immediately. | |
| 300 func (b *bundlerImpl) bundleRoundLocked(bb *builder, state *streamState) bool { | |
| 301 sendNow := false | |
| 302 | |
| 303 // First pass: non-blocking data that has exceeded its storage threshold . | |
| 304 for bb.remaining() > 0 { | |
| 305 s := state.next() | |
| 306 if s == nil { | |
| 307 break | |
| 308 } | |
| 309 | |
| 310 if et, has := s.expireTime(); !has || et.After(state.now) { | |
| 311 // This stream (and all other streams, since we're sorte d) expires in | |
| 312 // the future, so we're done with the first pass. | |
| 313 break | |
| 314 } | |
| 315 | |
| 316 // Pull bundles from this stream. | |
| 317 if modified := s.nextBundleEntry(bb, true); modified { | |
| 318 state.streamUpdated(s.name()) | |
| 319 | |
| 320 // We have at least one time-sensitive bundle, so send t his round. | |
| 321 sendNow = true | |
| 322 } | |
| 323 } | |
| 324 | |
| 325 // Second pass: bundle any available data. | |
| 326 state.forEachStream(func(s bundlerStream) bool { | |
| 327 if bb.remaining() == 0 { | |
| 328 return false | |
| 329 } | |
| 330 | |
| 331 if modified := s.nextBundleEntry(bb, b.flushing); modified { | |
| 332 state.streamUpdated(s.name()) | |
| 333 } | |
| 334 return true | |
| 335 }) | |
| 336 | |
| 337 return sendNow | |
| 338 } | |
| 339 | |
| 340 func (b *bundlerImpl) getStreamStateLocked() *streamState { | |
| 341 // Lock and collect each stream. | |
| 342 state := &streamState{ | |
| 343 streams: make([]bundlerStream, 0, len(b.streams)), | |
| 344 now: b.getClock().Now(), | |
| 345 } | |
| 346 | |
| 347 for _, s := range b.streams { | |
| 348 state.streams = append(state.streams, s) | |
| 349 } | |
| 350 heap.Init(state) | |
| 351 | |
| 352 return state | |
| 353 } | |
| 354 | |
| 355 func (b *bundlerImpl) registerStreamLocked(s bundlerStream) { | |
| 356 b.streams[s.name()] = s | |
| 357 b.signalStreamUpdateLocked() | |
| 358 } | |
| 359 | |
| 360 func (b *bundlerImpl) unregisterStreamLocked(s bundlerStream) { | |
| 361 delete(b.streams, s.name()) | |
| 362 } | |
| 363 | |
| 364 func (b *bundlerImpl) signalStreamUpdate() { | |
| 365 b.streamsLock.Lock() | |
| 366 defer b.streamsLock.Unlock() | |
| 367 | |
| 368 b.signalStreamUpdateLocked() | |
| 369 } | |
| 370 | |
| 371 func (b *bundlerImpl) signalStreamUpdateLocked() { | |
| 372 b.streamsCond.Broadcast() | |
| 373 } | |
| 374 | |
| 375 // nextPrefixIndex is a goroutine-safe method that returns the next prefix index | |
| 376 // for the given Bundler. | |
| 377 func (b *bundlerImpl) nextPrefixIndex() uint64 { | |
| 378 return uint64(b.prefixCounter.next()) | |
| 379 } | |
| 380 | |
| 381 func (b *bundlerImpl) getClock() clock.Clock { | |
| 382 c := b.Clock | |
| 383 if c != nil { | |
| 384 return c | |
| 385 } | |
| 386 return clock.GetSystemClock() | |
| 387 } | |
| 388 | |
| 389 // streamState is a snapshot of the current stream registration. All operations | |
| 390 // performed on the state require streamLock to be held. | |
| 391 // | |
| 392 // streamState implements heap.Interface for its streams array. Streams without | |
| 393 // data times (nil) are considered to be greater than those with times. | |
| 394 type streamState struct { | |
| 395 streams []bundlerStream | |
| 396 now time.Time | |
| 397 } | |
| 398 | |
| 399 var _ heap.Interface = (*streamState)(nil) | |
| 400 | |
| 401 func (s *streamState) next() bundlerStream { | |
| 402 if len(s.streams) == 0 { | |
| 403 return nil | |
| 404 } | |
| 405 return s.streams[0] | |
| 406 } | |
| 407 | |
| 408 func (s *streamState) nextExpire() (time.Time, bool) { | |
| 409 if next := s.next(); next != nil { | |
| 410 if ts, ok := next.expireTime(); ok { | |
| 411 return ts, true | |
| 412 } | |
| 413 } | |
| 414 return time.Time{}, false | |
| 415 } | |
| 416 | |
| 417 func (s *streamState) streamUpdated(name string) { | |
| 418 if si, idx := s.streamIndex(name); si != nil { | |
| 419 heap.Fix(s, idx) | |
| 420 } | |
| 421 } | |
| 422 | |
| 423 func (s *streamState) forEachStream(f func(bundlerStream) bool) { | |
| 424 // Clone our streams, since the callback may mutate their order. | |
| 425 streams := make([]bundlerStream, len(s.streams)) | |
| 426 for i, s := range s.streams { | |
| 427 streams[i] = s | |
| 428 } | |
| 429 | |
| 430 for _, s := range streams { | |
| 431 if !f(s) { | |
| 432 break | |
| 433 } | |
| 434 } | |
| 435 } | |
| 436 | |
| 437 // removeStream removes a stream from the stream state. | |
| 438 func (s *streamState) removeStream(name string) bundlerStream { | |
| 439 if si, idx := s.streamIndex(name); si != nil { | |
| 440 heap.Remove(s, idx) | |
| 441 return si | |
| 442 } | |
| 443 return nil | |
| 444 } | |
| 445 | |
| 446 func (s *streamState) streamIndex(name string) (bundlerStream, int) { | |
| 447 for i, si := range s.streams { | |
| 448 if si.name() == name { | |
| 449 return si, i | |
| 450 } | |
| 451 } | |
| 452 return nil, -1 | |
| 453 } | |
| 454 | |
| 455 func (s *streamState) Len() int { | |
| 456 return len(s.streams) | |
| 457 } | |
| 458 | |
| 459 func (s *streamState) Less(i, j int) bool { | |
| 460 si, sj := s.streams[i], s.streams[j] | |
| 461 | |
| 462 if it, ok := si.expireTime(); ok { | |
| 463 if jt, ok := sj.expireTime(); ok { | |
| 464 return it.Before(jt) | |
| 465 } | |
| 466 | |
| 467 // i has data, but j does not, so i < j. | |
| 468 return true | |
| 469 } | |
| 470 | |
| 471 // i has no data, so i us greater than all other streams. | |
| 472 return false | |
| 473 } | |
| 474 | |
| 475 func (s *streamState) Swap(i, j int) { | |
| 476 s.streams[i], s.streams[j] = s.streams[j], s.streams[i] | |
| 477 } | |
| 478 | |
| 479 func (s *streamState) Push(x interface{}) { | |
| 480 s.streams = append(s.streams, x.(bundlerStream)) | |
| 481 } | |
| 482 | |
| 483 func (s *streamState) Pop() interface{} { | |
| 484 last := s.streams[len(s.streams)-1] | |
| 485 s.streams = s.streams[:len(s.streams)-1] | |
| 486 return last | |
| 487 } | |
| OLD | NEW |