| Index: client/internal/logdog/butler/bundler/parser.go
|
| diff --git a/client/internal/logdog/butler/bundler/parser.go b/client/internal/logdog/butler/bundler/parser.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..4cd1009f2f89d5c6c8ec6198f9a2e6eafd265040
|
| --- /dev/null
|
| +++ b/client/internal/logdog/butler/bundler/parser.go
|
| @@ -0,0 +1,133 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package bundler
|
| +
|
| +import (
|
| + "fmt"
|
| + "time"
|
| +
|
| + "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
|
| + "github.com/luci/luci-go/common/chunkstream"
|
| + "github.com/luci/luci-go/common/logdog/protocol"
|
| + "github.com/luci/luci-go/common/logdog/types"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| +)
|
| +
|
| +// constraints is the set of Constraints to apply when generating a LogEntry.
|
| +type constraints struct {
|
| + // limit is the maximum size, in bytes, of the serialized LogEntry protobuf
|
| + // that may be produced.
|
| + limit int
|
| +
|
| + // allowSplit indicates that bundles should be generated to fill as much of
|
| + // the specified space as possible, splitting them across multiple bundles if
|
| + // necessary.
|
| + //
|
| + // The parser may choose to forego bundling if the result is very suboptimal,
|
| + // but is encouraged to fill the space if it's reasonable.
|
| + allowSplit bool
|
| +
|
| + // closed means that bundles should be aggressively generated with the
|
| + // expectation that no further data will be buffered. It is only relevant
|
| + // if allowSplit is also true.
|
| + closed bool
|
| +}
|
| +
|
| +// parser is a stateful presence bound to a single log stream. A parser yields
|
| +// LogEntry messages one at a time and shapes them based on constraints.
|
| +//
|
| +// parser instances are owned by a single Stream and are not goroutine-safe.
|
| +type parser interface {
|
| + // appendData adds a data chunk to this parser's chunk.Buffer.
|
| + appendData(Data)
|
| +
|
| + // nextEntry returns the next LogEntry in the stream.
|
| + //
|
| + // This method may return nil if there is insuffuicient data to produce a
|
| + // LogEntry given the
|
| + nextEntry(*constraints) (*protocol.LogEntry, error)
|
| +
|
| + bufferedBytes() int64
|
| +
|
| + firstChunkTime() (time.Time, bool)
|
| +}
|
| +
|
| +func newParser(p *streamproto.Properties, c *counter) (parser, error) {
|
| + base := baseParser{
|
| + counter: c,
|
| + timeBase: p.Timestamp.Time(),
|
| + }
|
| +
|
| + switch p.StreamType {
|
| + case protocol.LogStreamDescriptor_TEXT:
|
| + return &textParser{
|
| + baseParser: base,
|
| + }, nil
|
| +
|
| + case protocol.LogStreamDescriptor_BINARY:
|
| + return &binaryParser{
|
| + baseParser: base,
|
| + }, nil
|
| +
|
| + case protocol.LogStreamDescriptor_DATAGRAM:
|
| + return &datagramParser{
|
| + baseParser: base,
|
| + maxSize: int64(types.MaxDatagramSize),
|
| + }, nil
|
| +
|
| + default:
|
| + return nil, fmt.Errorf("unknown stream type: %v", p.StreamType)
|
| + }
|
| +}
|
| +
|
| +// baseParser is a common set of parser capabilities.
|
| +type baseParser struct {
|
| + chunkstream.Buffer
|
| +
|
| + counter *counter
|
| +
|
| + timeBase time.Time
|
| + nextIndex int64
|
| +}
|
| +
|
| +func (p *baseParser) baseLogEntry(ts time.Time) *protocol.LogEntry {
|
| + e := protocol.LogEntry{
|
| + TimeOffset: google.NewDuration(ts.Sub(p.timeBase)),
|
| + PrefixIndex: uint64(p.counter.next()),
|
| + StreamIndex: uint64(p.nextIndex),
|
| + }
|
| + p.nextIndex++
|
| + return &e
|
| +}
|
| +
|
| +func (p *baseParser) appendData(d Data) {
|
| + p.Append(d)
|
| +}
|
| +
|
| +func (p *baseParser) bufferedBytes() int64 {
|
| + return p.Len()
|
| +}
|
| +
|
| +func (p *baseParser) firstChunkTime() (time.Time, bool) {
|
| + // Get the first data chunk in our Buffer.
|
| + chunk := p.FirstChunk()
|
| + if chunk == nil {
|
| + return time.Time{}, false
|
| + }
|
| +
|
| + return chunk.(Data).Timestamp(), true
|
| +}
|
| +
|
| +func memoryCorruptionIf(cond bool, err error) {
|
| + if cond {
|
| + memoryCorruption(err)
|
| + }
|
| +}
|
| +
|
| +func memoryCorruption(err error) {
|
| + if err != nil {
|
| + panic(fmt.Errorf("bundler: memory corruption: %s", err))
|
| + }
|
| +}
|
|
|