Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(32)

Side by Side Diff: client/internal/logdog/butler/bundler/bundler_impl.go

Issue 1276923003: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Rewrote bundle logic (and associated updates). Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sort"
9
10 "github.com/luci/luci-go/common/logdog/protocol"
11 "github.com/luci/luci-go/common/logdog/protocol/protoutil"
12 )
13
14 // bundlerStream is an aggregate buffered log stream state. It consists of
15 // a ButlerLogBundle_Entry and its aggregate logs.
16 type bundlerStream protocol.ButlerLogBundle_Entry
17
18 // mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the
19 // stream entry template.
20 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
21 if e.GetTerminal() && !s.entry().GetTerminal() {
22 s.Terminal = e.Terminal
23 s.TerminalIndex = e.TerminalIndex
24 }
25 }
26
27 func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry {
28 return (*protocol.ButlerLogBundle_Entry)(s)
29 }
30
31 // bundlerImpl is an implementation of the Bundler interface.
32 type bundlerImpl struct {
33 *Config
34
35 sizer Sizer
36 entries map[string]*bundlerStream
37 count int
38
39 omitMap map[int]int64
40 round int64
41 }
42
43 // New instantiates a new Bundler instance.
44 func New(c Config) Bundler {
45 // The template bundle may not have entries; clear our copy if it does.
46 c.TemplateBundle.Entries = nil
47
48 b := &bundlerImpl{
49 Config: &c,
50 omitMap: map[int]int64{},
51 round: 1, // Start at "1" because "0" is default value.
52 }
53 b.reset()
54 return b
55 }
56
57 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
58 logs := e.GetLogs()
59
60 // We first test if a given ButlerLogBundle_Entry is worth
61 // exporting. An entry is worth exporting if the infromation that it con tains
62 // provides new information about the log stream or its state.
63 //
64 // We consider a log stream worth exporting if it satisfies AT LEAST one of
65 // the following conditions:
66 // - It has log data len(e.Logs) > 0
67 // - It is terminal: e.GetTerminal() == true
68 if !(len(logs) > 0 || e.GetTerminal()) {
69 return
70 }
71
72 // Add this log to our reserve.
73 path := string(protoutil.DescriptorPath(e.GetDesc()))
74
75 if cur := b.entries[path]; cur != nil {
76 // Augment the existing stream with the new logs and state.
77 cur.mergeEntry(e)
78 cur.Logs = append(cur.Logs, logs...)
79 } else {
80 // This is a new stream. Register the entry as the template for this stream.
81 b.sizer.Append(e, nil)
82 b.entries[path] = (*bundlerStream)(e)
83 }
84
85 for _, le := range logs {
86 b.sizer.Append(e, le)
87 }
88 b.count += len(logs)
89 return
90 }
91
92 func (b *bundlerImpl) reset() {
93 b.sizer = b.newSizer(&b.TemplateBundle)
94 b.count = 0
95 b.entries = map[string]*bundlerStream{}
96 }
97
98 func (b *bundlerImpl) Empty() bool {
99 return len(b.entries) == 0
100 }
101
102 func (b *bundlerImpl) Size() int64 {
103 return b.sizer.Size()
104 }
105
106 func (b *bundlerImpl) GetBundles() []*protocol.ButlerLogBundle {
107 return b.getBundlesImpl(b.Threshold)
108 }
109
110 func (b *bundlerImpl) getBundlesImpl(threshold int64) []*protocol.ButlerLogBundl e {
111 bundles := []*protocol.ButlerLogBundle(nil)
112
113 for {
114 bundle := b.getBundle(threshold)
115 if bundle == nil {
116 break
117 }
118 bundles = append(bundles, bundle)
119 }
120
121 // If we still have bundle entries, it is likely because no entries can
122 // fit within the threshold. This is a configuration error, and our only
123 // viable response is to clear them out (drop them).
124 b.reset()
125
126 return bundles
127 }
128
129 func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle {
130 // Short circuit if we don't have any entries.
131 if len(b.entries) == 0 {
132 return nil
133 }
134
135 // Clone our template bundle, as we intend to modify it by adding entrie s.
136 bundle := b.TemplateBundle
137
138 // For determinism, add buffered entries in order of path.
139 keys := make([]string, 0, len(b.entries))
140 for k := range b.entries {
141 keys = append(keys, string(k))
142 }
143 sort.Strings(keys)
iannucci 2015/09/01 03:03:21 only for testing, right?
dnj 2015/09/02 01:58:15 I like determinism and predictability :/ So think
144
145 sizer := b.newSizer(&bundle)
146 overThreshold := func() bool {
147 return threshold != 0 && threshold < sizer.Size()
148 }
149
150 // For efficiency, we declare a single omit map. This map is keyed off o f log
151 // index. Each time we enter the bundler loop, we increment our "round"
152 // variable. For any given loop, an entry is omitted if its omitMap
153 // index equals the current round.
154 for _, k := range keys {
155 // Get the current round.
156 round := b.round
157 b.round++
158
159 // We assume that each entry in "entries" is worth exporting (se e
160 // comment in Append), else it would not have been added to the entries
161 // map by Append.
162 e := b.entries[k]
163
164 // If this entry has no logs, it is a terminal entry. Try and ex port it as
iannucci 2015/09/01 03:03:21 hm... there's no more explicit way to denote this?
dnj 2015/09/02 01:58:14 Well, kind of. The code drops any entry that doesn
165 // a standalone bundle entry.
166 if len(e.Logs) == 0 {
167 sizer.Append(e.entry(), nil)
168 if overThreshold() {
169 // We don't have enough room to export a bundle entry. Our bundle is
170 // full for this round.
171 break
172 }
173
174 // Our bundle is empty. This means that none of the entr y's logs fit in
175 // an empty bundle! Discard the full entry (data loss).
iannucci 2015/09/01 03:03:22 this needs to be logged somehow. does logdog need
dnj 2015/09/02 01:58:15 Butler will output to STDOUT/STDERR, so Swarming t
176 bundle.Entries = append(bundle.Entries, e.entry())
177 delete(b.entries, k)
178 continue
179 }
180
181 // Our entry has logs. Export any that fit into our bundle witho ut violating
182 // the threshold.
183 omitted := 0
184 for i, le := range e.Logs {
185 sizer.Append(e.entry(), le)
186 if overThreshold() {
187 sizer.Undo()
188 b.omitMap[i] = round
189 omitted++
190 }
iannucci 2015/09/01 03:03:21 shouldn't we stop after the first overage? Or is t
dnj 2015/09/02 01:58:15 Yep that.
191 }
192
193 // If all log entries available were successfully exported, we a re finished
194 // with this entry. We will take an optimized path and hand its pointer
195 // directly to the export bundle.
196 if omitted == 0 {
197 bundle.Entries = append(bundle.Entries, e.entry())
198 delete(b.entries, k)
199 continue
200 }
201
202 // If none of the entry's logs were exported, we're done with it for this
203 // round.
204 if omitted == len(e.Logs) {
205 // If our bundle is empty, this means that NONE of the e ntry's logs fit in
206 // an empty bundle! Since none will fit individually, we will discard the
207 // full entry (data loss).
iannucci 2015/09/01 03:03:21 log
dnj 2015/09/02 01:58:14 Done.
208 if len(bundle.GetEntries()) == 0 {
209 delete(b.entries, k)
210 }
211 continue
212 }
213
214 // We are exporting some of the entry's logs. We will do this by exporting
215 // a clone of the entry containing just the exported logs.
216 //
217 // We will reuse the entry's Logs array in order to avoid unnece ssary
218 // allocations. In order to do this, we will split the array int o two
iannucci 2015/09/01 03:03:21 *skeptical* I think this would all be much (much)
dnj 2015/09/02 01:58:14 I'm expecting this to get thrashed pretty heavily
219 // slices. The first slice will contain the exported elements, a nd the
220 // second will contain the remainder.
221 //
222 // (It should be noted that there is a potential issue if Append () were to
223 // be called in between getBundle calls. This is firstly not an issue, since
224 // getBundle in practie will empty "entries". However, even if t hat weren't
iannucci 2015/09/01 03:03:21 s/practie/practice/?
dnj 2015/09/02 01:58:15 Done.
225 // the case, since the retained omitted logs are at the end of t he array,
226 // calling append on that slice would properly extend it.)
227 divider := len(e.Logs)
228 for i := range e.Logs {
229 if i >= divider {
230 // Everything to the right of divider is omitted , so we're done.
231 break
232 }
233
234 // Scan forwards until we find an omit spot.
235 if b.omitMap[i] != round {
236 continue
237 }
238
239 // Scan backwards from the end of our log list to find a non-omitted
240 // entry.
241 for j := divider - 1; j > i; j-- {
242 if b.omitMap[j] != round {
243 // We're omitting "i", and not omitting "j". Swap!
244 e.Logs[i], e.Logs[j] = e.Logs[j], e.Logs [i]
245 divider = j
246 omitted -= 1
247 break
248 }
249 }
250 }
251 divider -= omitted
252
253 // Since our entry has more logs, we must retain it. We will exp ort a clone
254 // of the entry.
255 ec := *e.entry()
256 ec.Logs, e.Logs = e.Logs[:divider], e.Logs[divider:]
iannucci 2015/09/01 03:03:21 won't you end up just growing Logs's underlying ar
dnj 2015/09/02 01:58:15 Yes, until it empties, in which case it's unrefere
257 bundle.Entries = append(bundle.Entries, &ec)
258 }
259
260 if len(bundle.Entries) == 0 {
261 return nil
262 }
263 return &bundle
264 }
265
266 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
267 nb := b.NewSizer
268 if nb == nil {
269 nb = NewFastSizer
270 }
271 return nb(bundle)
272 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698