Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(96)

Side by Side Diff: client/internal/logdog/butler/bundler/parser.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Enhanced doc.go. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "fmt"
9 "time"
10
11 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
12 "github.com/luci/luci-go/common/chunkstream"
13 "github.com/luci/luci-go/common/logdog/protocol"
14 "github.com/luci/luci-go/common/logdog/types"
15 "github.com/luci/luci-go/common/proto/google"
16 )
17
18 // constraints is the set of Constraints to apply when generating a LogEntry.
19 type constraints struct {
20 // limit is the maximum size, in bytes, of the serialized LogEntry proto buf
21 // that may be produced.
22 limit int
23
24 // truncate indicates that bundles should be generated to fill the speci fied
25 // space. The bundler may choose to forego bundling if the result is ver y
26 // suboptimal, but is encouraged to fill the space if it's reasonable.
27 truncate bool
28
29 // closed means that bundles should be aggressively generated with the
30 // expectation that no further data will be buffered. It is only relevan t
31 // if truncate is also true.
32 closed bool
33 }
34
35 // parser is a stateful presence bound to a single log stream. A parser yields
36 // LogEntry messages one at a time and shapes them based on constraints.
37 //
38 // parser instances are owned by a single Stream and are not goroutine-safe.
39 type parser interface {
40 // appendData adds a data chunk to this parser's chunk.Buffer.
41 appendData(Data)
42
43 // nextEntry returns the next LogEntry in the stream.
44 //
45 // This method may return nil if there is insuffuicient data to produce a
46 // LogEntry given the
47 nextEntry(*constraints) (*protocol.LogEntry, error)
48
49 bufferedBytes() int64
50
51 firstChunkTime() (time.Time, bool)
52 }
53
54 func newParser(p *streamproto.Properties, c *counter) (parser, error) {
55 base := baseParser{
56 counter: c,
57 timeBase: p.Timestamp.Time(),
58 }
59
60 switch p.StreamType {
61 case protocol.LogStreamDescriptor_TEXT:
62 return &textParser{
63 baseParser: base,
64 }, nil
65
66 case protocol.LogStreamDescriptor_BINARY:
67 return &binaryParser{
68 baseParser: base,
69 }, nil
70
71 case protocol.LogStreamDescriptor_DATAGRAM:
72 return &datagramParser{
73 baseParser: base,
74 maxSize: int64(types.MaxDatagramSize),
75 }, nil
76
77 default:
78 return nil, fmt.Errorf("unknown stream type: %v", p.StreamType)
79 }
80 }
81
82 // baseParser is a common set of parser capabilities.
83 type baseParser struct {
84 chunkstream.Buffer
85
86 counter *counter
87
88 timeBase time.Time
89 nextIndex int64
90 }
91
92 func (p *baseParser) baseLogEntry(ts time.Time) *protocol.LogEntry {
93 e := protocol.LogEntry{
94 TimeOffset: google.NewDuration(ts.Sub(p.timeBase)),
95 PrefixIndex: uint64(p.counter.next()),
96 StreamIndex: uint64(p.nextIndex),
97 }
98 p.nextIndex++
99 return &e
100 }
101
102 func (p *baseParser) appendData(d Data) {
103 p.Append(d)
104 }
105
106 func (p *baseParser) bufferedBytes() int64 {
107 return p.Len()
108 }
109
110 func (p *baseParser) firstChunkTime() (time.Time, bool) {
111 // Get the first data chunk in our Buffer.
112 chunk := p.FirstChunk()
113 if chunk == nil {
114 return time.Time{}, false
115 }
116
117 return chunk.(Data).Timestamp(), true
118 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698