Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(806)

Side by Side Diff: client/internal/logdog/butler/bundler/parser.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "fmt"
9 "time"
10
11 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
12 "github.com/luci/luci-go/common/chunkstream"
13 "github.com/luci/luci-go/common/logdog/protocol"
14 "github.com/luci/luci-go/common/logdog/types"
15 "github.com/luci/luci-go/common/proto/google"
16 )
17
18 // constraints is the set of Constraints to apply when generating a LogEntry.
19 type constraints struct {
20 // limit is the maximum size, in bytes, of the serialized LogEntry proto buf
21 // that may be produced.
22 limit int
23
24 // allowSplit indicates that bundles should be generated to fill as much of
25 // the specified space as possible, splitting them across multiple bundl es if
26 // necessary.
27 //
28 // The parser may choose to forego bundling if the result is very subopt imal,
29 // but is encouraged to fill the space if it's reasonable.
30 allowSplit bool
31
32 // closed means that bundles should be aggressively generated with the
33 // expectation that no further data will be buffered. It is only relevan t
34 // if allowSplit is also true.
35 closed bool
36 }
37
38 // parser is a stateful presence bound to a single log stream. A parser yields
39 // LogEntry messages one at a time and shapes them based on constraints.
40 //
41 // parser instances are owned by a single Stream and are not goroutine-safe.
42 type parser interface {
43 // appendData adds a data chunk to this parser's chunk.Buffer.
44 appendData(Data)
45
46 // nextEntry returns the next LogEntry in the stream.
47 //
48 // This method may return nil if there is insuffuicient data to produce a
49 // LogEntry given the
50 nextEntry(*constraints) (*protocol.LogEntry, error)
51
52 bufferedBytes() int64
53
54 firstChunkTime() (time.Time, bool)
55 }
56
57 func newParser(p *streamproto.Properties, c *counter) (parser, error) {
58 base := baseParser{
59 counter: c,
60 timeBase: p.Timestamp.Time(),
61 }
62
63 switch p.StreamType {
64 case protocol.LogStreamDescriptor_TEXT:
65 return &textParser{
66 baseParser: base,
67 }, nil
68
69 case protocol.LogStreamDescriptor_BINARY:
70 return &binaryParser{
71 baseParser: base,
72 }, nil
73
74 case protocol.LogStreamDescriptor_DATAGRAM:
75 return &datagramParser{
76 baseParser: base,
77 maxSize: int64(types.MaxDatagramSize),
78 }, nil
79
80 default:
81 return nil, fmt.Errorf("unknown stream type: %v", p.StreamType)
82 }
83 }
84
85 // baseParser is a common set of parser capabilities.
86 type baseParser struct {
87 chunkstream.Buffer
88
89 counter *counter
90
91 timeBase time.Time
92 nextIndex int64
93 }
94
95 func (p *baseParser) baseLogEntry(ts time.Time) *protocol.LogEntry {
96 e := protocol.LogEntry{
97 TimeOffset: google.NewDuration(ts.Sub(p.timeBase)),
98 PrefixIndex: uint64(p.counter.next()),
99 StreamIndex: uint64(p.nextIndex),
100 }
101 p.nextIndex++
102 return &e
103 }
104
105 func (p *baseParser) appendData(d Data) {
106 p.Append(d)
107 }
108
109 func (p *baseParser) bufferedBytes() int64 {
110 return p.Len()
111 }
112
113 func (p *baseParser) firstChunkTime() (time.Time, bool) {
114 // Get the first data chunk in our Buffer.
115 chunk := p.FirstChunk()
116 if chunk == nil {
117 return time.Time{}, false
118 }
119
120 return chunk.(Data).Timestamp(), true
121 }
122
123 func memoryCorruptionIf(cond bool, err error) {
124 if cond {
125 memoryCorruption(err)
126 }
127 }
128
129 func memoryCorruption(err error) {
130 if err != nil {
131 panic(fmt.Errorf("bundler: memory corruption: %s", err))
132 }
133 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/bundler/doc.go ('k') | client/internal/logdog/butler/bundler/parser_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698