Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(114)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler_impl.go

Issue 1276923003: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: More documentation, simplified "bundlerStream" struct. Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sort"
9
10 "github.com/luci/luci-go/common/logdog/protocol"
11 "github.com/luci/luci-go/common/logdog/protocol/protoutil"
12 "github.com/luci/luci-go/common/logdog/types"
13 )
14
15 // bundlerStream is an aggregate buffered log stream state. It consists of
16 // a ButlerLogBundle_Entry and its aggregate logs.
17 type bundlerStream protocol.ButlerLogBundle_Entry
18
19 // mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the
20 // stream entry template.
21 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
22 if e.GetTerminal() && !s.entry().GetTerminal() {
23 s.Terminal = e.Terminal
24 s.TerminalIndex = e.TerminalIndex
25 }
26 }
27
28 func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry {
29 return (*protocol.ButlerLogBundle_Entry)(s)
30 }
31
32 // bundlerImpl is an implementation of the Bundler interface.
33 type bundlerImpl struct {
34 *Config
35
36 sizer Sizer
37 entries map[types.StreamPath]*bundlerStream
38 count int
39 }
40
41 // New instantiates a new Bundler instance.
42 func New(c Config) Bundler {
43 // The template bundle may not have entries; clear our copy if it does.
44 c.TemplateBundle.Entries = nil
45
46 b := &bundlerImpl{
47 Config: &c,
48 }
49 b.reset()
50 return b
51 }
52
53 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
54 logs := e.GetLogs()
tandrii(chromium) 2015/08/11 18:31:05 e.GetLogs() is cheap, and IMO, it's cleaner in thi
dnj (Google) 2015/08/12 03:20:09 I think having the function calls inline in otherw
dnj 2015/09/02 01:58:14 Granted it's cheap, but I don't think it's cleaner
55
56 // We first test if a given ButlerLogBundle_Entry is worth
57 // exporting. An entry is worth exporting if the infromation that it con tains
58 // provides new information about the log stream or its state.
59 //
60 // We consider a log stream worth exporting if it satisfies AT LEAST one of
61 // the following conditions:
62 // - It has log data len(e.Logs) > 0
63 // - It is terminal: e.GetTerminal() == true
64 if !(len(logs) > 0 || e.GetTerminal()) {
65 return
66 }
67
68 // Add this log to our reserve.
69 path := protoutil.DescriptorPath(e.GetDesc())
70 cur, ok := b.entries[path]
71 if !ok {
72 // Add this new entry to the bundle.
73 b.sizer.AppendBundleEntry(e)
74
75 cur = (*bundlerStream)(e)
tandrii(chromium) 2015/08/11 18:31:05 i'd merge this and following lines.
dnj (Google) 2015/08/12 03:20:09 Oh hey, I'm no longer using "cur" outside of this
dnj 2015/09/02 01:58:14 Oh good catch, I used to care about "cur" outside
76 b.entries[path] = cur
77 } else {
78 cur.mergeEntry(e)
79 cur.Logs = append(cur.Logs, logs...)
80 }
81
82 for _, le := range logs {
83 b.sizer.AppendLogEntry(e, le)
84 }
85 b.count += len(logs)
86 return
87 }
88
89 func (b *bundlerImpl) reset() {
90 b.sizer = b.newSizer(&b.TemplateBundle)
91 b.count = 0
92 b.entries = map[types.StreamPath]*bundlerStream{}
93 }
94
95 func (b *bundlerImpl) Empty() bool {
96 return len(b.entries) == 0
97 }
98
99 func (b *bundlerImpl) Size() int64 {
100 return b.sizer.Size()
101 }
102
103 func (b *bundlerImpl) GetBundles(threshold int64) []*protocol.ButlerLogBundle {
104 bundles := []*protocol.ButlerLogBundle(nil)
105 for {
106 bundle := b.getBundle(threshold)
107 if bundle == nil {
108 break
109 }
110 bundles = append(bundles, bundle)
111 }
112
113 // If we still have bundle entries, it is likely because no entries fit with
114 // the threshold. Clear them out.
115 b.reset()
116
117 return bundles
118 }
119
120 func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle {
121 // Clone our template bundle, as we intend to modify it by adding entrie s.
122 bundle := b.TemplateBundle
123
124 // For determinism, add buffered entries in order of path.
125 keys := make([]string, 0, len(b.entries))
126 for k := range b.entries {
127 keys = append(keys, string(k))
128 }
129 sort.Strings(keys)
130
131 sizer := b.newSizer(&bundle)
132 overThreshold := func() bool {
133 return threshold != 0 && threshold < sizer.Size()
134 }
135
136 for _, k := range keys {
137 // We assume that each entry in "entries" is worth exporting (se e
138 // comment in Append), else it would not have been added to the entries
139 // map by Append.
140 e := b.entries[types.StreamPath(k)]
141
142 // Can we add this entry without exceeding our size threshold?
143 sizer.AppendBundleEntry(e.entry())
144 if overThreshold() {
tandrii(chromium) 2015/08/11 18:31:05 ok, I agree this is necessary.
145 break
146 }
147
148 // Count how many logs we can add without hitting our threshold.
149 count := 0
150 for _, le := range e.Logs {
151 sizer.AppendLogEntry(e.entry(), le)
152 if overThreshold() {
153 break
154 }
155
156 count++
157 }
158
159 // If we can't add all of the logs, we will export a clone of th e template
160 // bundle entry with specifically the logs that fit.
161 if count < len(e.Logs) {
162 ec := *e.entry()
163 ec.Logs = make([]*protocol.LogEntry, count)
164 copy(ec.Logs, e.Logs)
165 bundle.Entries = append(bundle.Entries, &ec)
166
167 // Left-shift our retained logs to consume the ones that we've exported.
168 e.Logs = append(e.Logs[:0], e.Logs[count:]...)
169 break
170 }
171
172 // We've consumed all logs for this entry. Since we're deleting the
173 // bundlerStream from the map, export its template and logs slic e directly.
174 bundle.Entries = append(bundle.Entries, e.entry())
175 delete(b.entries, types.StreamPath(k))
176 }
177
178 if len(bundle.Entries) == 0 {
179 return nil
180 }
181 return &bundle
182 }
183
184 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
185 nb := b.NewSizer
186 if nb == nil {
187 nb = NewFastSizer
188 }
189 return nb(bundle)
190 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698