| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "time" |
| 10 |
| 11 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 12 "github.com/luci/luci-go/common/chunkstream" |
| 13 "github.com/luci/luci-go/common/logdog/protocol" |
| 14 "github.com/luci/luci-go/common/logdog/types" |
| 15 "github.com/luci/luci-go/common/proto/google" |
| 16 ) |
| 17 |
| 18 // constraints is the set of Constraints to apply when generating a LogEntry. |
| 19 type constraints struct { |
| 20 // limit is the maximum size, in bytes, of the serialized LogEntry proto
buf |
| 21 // that may be produced. |
| 22 limit int |
| 23 |
| 24 // allowSplit indicates that bundles should be generated to fill as much
of |
| 25 // the specified space as possible, splitting them across multiple bundl
es if |
| 26 // necessary. |
| 27 // |
| 28 // The parser may choose to forego bundling if the result is very subopt
imal, |
| 29 // but is encouraged to fill the space if it's reasonable. |
| 30 allowSplit bool |
| 31 |
| 32 // closed means that bundles should be aggressively generated with the |
| 33 // expectation that no further data will be buffered. It is only relevan
t |
| 34 // if allowSplit is also true. |
| 35 closed bool |
| 36 } |
| 37 |
| 38 // parser is a stateful presence bound to a single log stream. A parser yields |
| 39 // LogEntry messages one at a time and shapes them based on constraints. |
| 40 // |
| 41 // parser instances are owned by a single Stream and are not goroutine-safe. |
| 42 type parser interface { |
| 43 // appendData adds a data chunk to this parser's chunk.Buffer. |
| 44 appendData(Data) |
| 45 |
| 46 // nextEntry returns the next LogEntry in the stream. |
| 47 // |
| 48 // This method may return nil if there is insuffuicient data to produce
a |
| 49 // LogEntry given the |
| 50 nextEntry(*constraints) (*protocol.LogEntry, error) |
| 51 |
| 52 bufferedBytes() int64 |
| 53 |
| 54 firstChunkTime() (time.Time, bool) |
| 55 } |
| 56 |
| 57 func newParser(p *streamproto.Properties, c *counter) (parser, error) { |
| 58 base := baseParser{ |
| 59 counter: c, |
| 60 timeBase: p.Timestamp.Time(), |
| 61 } |
| 62 |
| 63 switch p.StreamType { |
| 64 case protocol.LogStreamDescriptor_TEXT: |
| 65 return &textParser{ |
| 66 baseParser: base, |
| 67 }, nil |
| 68 |
| 69 case protocol.LogStreamDescriptor_BINARY: |
| 70 return &binaryParser{ |
| 71 baseParser: base, |
| 72 }, nil |
| 73 |
| 74 case protocol.LogStreamDescriptor_DATAGRAM: |
| 75 return &datagramParser{ |
| 76 baseParser: base, |
| 77 maxSize: int64(types.MaxDatagramSize), |
| 78 }, nil |
| 79 |
| 80 default: |
| 81 return nil, fmt.Errorf("unknown stream type: %v", p.StreamType) |
| 82 } |
| 83 } |
| 84 |
| 85 // baseParser is a common set of parser capabilities. |
| 86 type baseParser struct { |
| 87 chunkstream.Buffer |
| 88 |
| 89 counter *counter |
| 90 |
| 91 timeBase time.Time |
| 92 nextIndex int64 |
| 93 } |
| 94 |
| 95 func (p *baseParser) baseLogEntry(ts time.Time) *protocol.LogEntry { |
| 96 e := protocol.LogEntry{ |
| 97 TimeOffset: google.NewDuration(ts.Sub(p.timeBase)), |
| 98 PrefixIndex: uint64(p.counter.next()), |
| 99 StreamIndex: uint64(p.nextIndex), |
| 100 } |
| 101 p.nextIndex++ |
| 102 return &e |
| 103 } |
| 104 |
| 105 func (p *baseParser) appendData(d Data) { |
| 106 p.Append(d) |
| 107 } |
| 108 |
| 109 func (p *baseParser) bufferedBytes() int64 { |
| 110 return p.Len() |
| 111 } |
| 112 |
| 113 func (p *baseParser) firstChunkTime() (time.Time, bool) { |
| 114 // Get the first data chunk in our Buffer. |
| 115 chunk := p.FirstChunk() |
| 116 if chunk == nil { |
| 117 return time.Time{}, false |
| 118 } |
| 119 |
| 120 return chunk.(Data).Timestamp(), true |
| 121 } |
| 122 |
| 123 func memoryCorruptionIf(cond bool, err error) { |
| 124 if cond { |
| 125 memoryCorruption(err) |
| 126 } |
| 127 } |
| 128 |
| 129 func memoryCorruption(err error) { |
| 130 if err != nil { |
| 131 panic(fmt.Errorf("bundler: memory corruption: %s", err)) |
| 132 } |
| 133 } |
| OLD | NEW |