| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "time" |
| 10 |
| 11 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 12 "github.com/luci/luci-go/common/chunkstream" |
| 13 "github.com/luci/luci-go/common/logdog/protocol" |
| 14 "github.com/luci/luci-go/common/logdog/types" |
| 15 "github.com/luci/luci-go/common/proto/google" |
| 16 ) |
| 17 |
| 18 // constraints is the set of Constraints to apply when generating a LogEntry. |
| 19 type constraints struct { |
| 20 // limit is the maximum size, in bytes, of the serialized LogEntry proto
buf |
| 21 // that may be produced. |
| 22 limit int |
| 23 |
| 24 // truncate indicates that bundles should be generated to fill the speci
fied |
| 25 // space. The bundler may choose to forego bundling if the result is ver
y |
| 26 // suboptimal, but is encouraged to fill the space if it's reasonable. |
| 27 truncate bool |
| 28 |
| 29 // closed means that bundles should be aggressively generated with the |
| 30 // expectation that no further data will be buffered. It is only relevan
t |
| 31 // if truncate is also true. |
| 32 closed bool |
| 33 } |
| 34 |
| 35 // parser is a stateful presence bound to a single log stream. A parser yields |
| 36 // LogEntry messages one at a time and shapes them based on constraints. |
| 37 // |
| 38 // parser instances are owned by a single Stream and are not goroutine-safe. |
| 39 type parser interface { |
| 40 // appendData adds a data chunk to this parser's chunk.Buffer. |
| 41 appendData(Data) |
| 42 |
| 43 // nextEntry returns the next LogEntry in the stream. |
| 44 // |
| 45 // This method may return nil if there is insuffuicient data to produce
a |
| 46 // LogEntry given the |
| 47 nextEntry(*constraints) (*protocol.LogEntry, error) |
| 48 |
| 49 bufferedBytes() int64 |
| 50 |
| 51 firstChunkTime() (time.Time, bool) |
| 52 } |
| 53 |
| 54 func newParser(p *streamproto.Properties, c *counter) (parser, error) { |
| 55 base := baseParser{ |
| 56 counter: c, |
| 57 timeBase: p.Timestamp.Time(), |
| 58 } |
| 59 |
| 60 switch p.StreamType { |
| 61 case protocol.LogStreamDescriptor_TEXT: |
| 62 return &textParser{ |
| 63 baseParser: base, |
| 64 }, nil |
| 65 |
| 66 case protocol.LogStreamDescriptor_BINARY: |
| 67 return &binaryParser{ |
| 68 baseParser: base, |
| 69 }, nil |
| 70 |
| 71 case protocol.LogStreamDescriptor_DATAGRAM: |
| 72 return &datagramParser{ |
| 73 baseParser: base, |
| 74 maxSize: int64(types.MaxDatagramSize), |
| 75 }, nil |
| 76 |
| 77 default: |
| 78 return nil, fmt.Errorf("unknown stream type: %v", p.StreamType) |
| 79 } |
| 80 } |
| 81 |
| 82 // baseParser is a common set of parser capabilities. |
| 83 type baseParser struct { |
| 84 chunkstream.Buffer |
| 85 |
| 86 counter *counter |
| 87 |
| 88 timeBase time.Time |
| 89 nextIndex int64 |
| 90 } |
| 91 |
| 92 func (p *baseParser) baseLogEntry(ts time.Time) *protocol.LogEntry { |
| 93 e := protocol.LogEntry{ |
| 94 TimeOffset: google.NewDuration(ts.Sub(p.timeBase)), |
| 95 PrefixIndex: uint64(p.counter.next()), |
| 96 StreamIndex: uint64(p.nextIndex), |
| 97 } |
| 98 p.nextIndex++ |
| 99 return &e |
| 100 } |
| 101 |
| 102 func (p *baseParser) appendData(d Data) { |
| 103 p.Append(d) |
| 104 } |
| 105 |
| 106 func (p *baseParser) bufferedBytes() int64 { |
| 107 return p.Len() |
| 108 } |
| 109 |
| 110 func (p *baseParser) firstChunkTime() (time.Time, bool) { |
| 111 // Get the first data chunk in our Buffer. |
| 112 chunk := p.FirstChunk() |
| 113 if chunk == nil { |
| 114 return time.Time{}, false |
| 115 } |
| 116 |
| 117 return chunk.(Data).Timestamp(), true |
| 118 } |
| OLD | NEW |