Chromium Code Reviews| Index: client/internal/logdog/butler/bundler/bundler_impl.go |
| diff --git a/client/internal/logdog/butler/bundler/bundler_impl.go b/client/internal/logdog/butler/bundler/bundler_impl.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8a7c50edc409e6dc1696defe5d0c75fa4bfdfe52 |
| --- /dev/null |
| +++ b/client/internal/logdog/butler/bundler/bundler_impl.go |
| @@ -0,0 +1,272 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package bundler |
| + |
| +import ( |
| + "sort" |
| + |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| + "github.com/luci/luci-go/common/logdog/protocol/protoutil" |
| +) |
| + |
| +// bundlerStream is an aggregate buffered log stream state. It consists of |
| +// a ButlerLogBundle_Entry and its aggregate logs. |
| +type bundlerStream protocol.ButlerLogBundle_Entry |
| + |
| +// mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the |
| +// stream entry template. |
| +func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) { |
| + if e.GetTerminal() && !s.entry().GetTerminal() { |
| + s.Terminal = e.Terminal |
| + s.TerminalIndex = e.TerminalIndex |
| + } |
| +} |
| + |
| +func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry { |
| + return (*protocol.ButlerLogBundle_Entry)(s) |
| +} |
| + |
| +// bundlerImpl is an implementation of the Bundler interface. |
| +type bundlerImpl struct { |
| + *Config |
| + |
| + sizer Sizer |
| + entries map[string]*bundlerStream |
| + count int |
| + |
| + omitMap map[int]int64 |
| + round int64 |
| +} |
| + |
| +// New instantiates a new Bundler instance. |
| +func New(c Config) Bundler { |
| + // The template bundle may not have entries; clear our copy if it does. |
| + c.TemplateBundle.Entries = nil |
| + |
| + b := &bundlerImpl{ |
| + Config: &c, |
| + omitMap: map[int]int64{}, |
| + round: 1, // Start at "1" because "0" is default value. |
| + } |
| + b.reset() |
| + return b |
| +} |
| + |
| +func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) { |
| + logs := e.GetLogs() |
| + |
| + // We first test if a given ButlerLogBundle_Entry is worth |
| + // exporting. An entry is worth exporting if the infromation that it contains |
| + // provides new information about the log stream or its state. |
| + // |
| + // We consider a log stream worth exporting if it satisfies AT LEAST one of |
| + // the following conditions: |
| + // - It has log data len(e.Logs) > 0 |
| + // - It is terminal: e.GetTerminal() == true |
| + if !(len(logs) > 0 || e.GetTerminal()) { |
| + return |
| + } |
| + |
| + // Add this log to our reserve. |
| + path := string(protoutil.DescriptorPath(e.GetDesc())) |
| + |
| + if cur := b.entries[path]; cur != nil { |
| + // Augment the existing stream with the new logs and state. |
| + cur.mergeEntry(e) |
| + cur.Logs = append(cur.Logs, logs...) |
| + } else { |
| + // This is a new stream. Register the entry as the template for this stream. |
| + b.sizer.Append(e, nil) |
| + b.entries[path] = (*bundlerStream)(e) |
| + } |
| + |
| + for _, le := range logs { |
| + b.sizer.Append(e, le) |
| + } |
| + b.count += len(logs) |
| + return |
| +} |
| + |
| +func (b *bundlerImpl) reset() { |
| + b.sizer = b.newSizer(&b.TemplateBundle) |
| + b.count = 0 |
| + b.entries = map[string]*bundlerStream{} |
| +} |
| + |
| +func (b *bundlerImpl) Empty() bool { |
| + return len(b.entries) == 0 |
| +} |
| + |
| +func (b *bundlerImpl) Size() int64 { |
| + return b.sizer.Size() |
| +} |
| + |
| +func (b *bundlerImpl) GetBundles() []*protocol.ButlerLogBundle { |
| + return b.getBundlesImpl(b.Threshold) |
| +} |
| + |
| +func (b *bundlerImpl) getBundlesImpl(threshold int64) []*protocol.ButlerLogBundle { |
| + bundles := []*protocol.ButlerLogBundle(nil) |
| + |
| + for { |
| + bundle := b.getBundle(threshold) |
| + if bundle == nil { |
| + break |
| + } |
| + bundles = append(bundles, bundle) |
| + } |
| + |
| + // If we still have bundle entries, it is likely because no entries can |
| + // fit within the threshold. This is a configuration error, and our only |
| + // viable response is to clear them out (drop them). |
| + b.reset() |
| + |
| + return bundles |
| +} |
| + |
| +func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle { |
| + // Short circuit if we don't have any entries. |
| + if len(b.entries) == 0 { |
| + return nil |
| + } |
| + |
| + // Clone our template bundle, as we intend to modify it by adding entries. |
| + bundle := b.TemplateBundle |
| + |
| + // For determinism, add buffered entries in order of path. |
| + keys := make([]string, 0, len(b.entries)) |
| + for k := range b.entries { |
| + keys = append(keys, string(k)) |
| + } |
| + sort.Strings(keys) |
|
iannucci
2015/09/01 03:03:21
only for testing, right?
dnj
2015/09/02 01:58:15
I like determinism and predictability :/
So think
|
| + |
| + sizer := b.newSizer(&bundle) |
| + overThreshold := func() bool { |
| + return threshold != 0 && threshold < sizer.Size() |
| + } |
| + |
| + // For efficiency, we declare a single omit map. This map is keyed off of log |
| + // index. Each time we enter the bundler loop, we increment our "round" |
| + // variable. For any given loop, an entry is omitted if its omitMap |
| + // index equals the current round. |
| + for _, k := range keys { |
| + // Get the current round. |
| + round := b.round |
| + b.round++ |
| + |
| + // We assume that each entry in "entries" is worth exporting (see |
| + // comment in Append), else it would not have been added to the entries |
| + // map by Append. |
| + e := b.entries[k] |
| + |
| + // If this entry has no logs, it is a terminal entry. Try and export it as |
|
iannucci
2015/09/01 03:03:21
hm... there's no more explicit way to denote this?
dnj
2015/09/02 01:58:14
Well, kind of. The code drops any entry that doesn
|
| + // a standalone bundle entry. |
| + if len(e.Logs) == 0 { |
| + sizer.Append(e.entry(), nil) |
| + if overThreshold() { |
| + // We don't have enough room to export a bundle entry. Our bundle is |
| + // full for this round. |
| + break |
| + } |
| + |
| + // Our bundle is empty. This means that none of the entry's logs fit in |
| + // an empty bundle! Discard the full entry (data loss). |
|
iannucci
2015/09/01 03:03:22
this needs to be logged somehow. does logdog need
dnj
2015/09/02 01:58:15
Butler will output to STDOUT/STDERR, so Swarming t
|
| + bundle.Entries = append(bundle.Entries, e.entry()) |
| + delete(b.entries, k) |
| + continue |
| + } |
| + |
| + // Our entry has logs. Export any that fit into our bundle without violating |
| + // the threshold. |
| + omitted := 0 |
| + for i, le := range e.Logs { |
| + sizer.Append(e.entry(), le) |
| + if overThreshold() { |
| + sizer.Undo() |
| + b.omitMap[i] = round |
| + omitted++ |
| + } |
|
iannucci
2015/09/01 03:03:21
shouldn't we stop after the first overage? Or is t
dnj
2015/09/02 01:58:15
Yep that.
|
| + } |
| + |
| + // If all log entries available were successfully exported, we are finished |
| + // with this entry. We will take an optimized path and hand its pointer |
| + // directly to the export bundle. |
| + if omitted == 0 { |
| + bundle.Entries = append(bundle.Entries, e.entry()) |
| + delete(b.entries, k) |
| + continue |
| + } |
| + |
| + // If none of the entry's logs were exported, we're done with it for this |
| + // round. |
| + if omitted == len(e.Logs) { |
| + // If our bundle is empty, this means that NONE of the entry's logs fit in |
| + // an empty bundle! Since none will fit individually, we will discard the |
| + // full entry (data loss). |
|
iannucci
2015/09/01 03:03:21
log
dnj
2015/09/02 01:58:14
Done.
|
| + if len(bundle.GetEntries()) == 0 { |
| + delete(b.entries, k) |
| + } |
| + continue |
| + } |
| + |
| + // We are exporting some of the entry's logs. We will do this by exporting |
| + // a clone of the entry containing just the exported logs. |
| + // |
| + // We will reuse the entry's Logs array in order to avoid unnecessary |
| + // allocations. In order to do this, we will split the array into two |
|
iannucci
2015/09/01 03:03:21
*skeptical*
I think this would all be much (much)
dnj
2015/09/02 01:58:14
I'm expecting this to get thrashed pretty heavily
|
| + // slices. The first slice will contain the exported elements, and the |
| + // second will contain the remainder. |
| + // |
| + // (It should be noted that there is a potential issue if Append() were to |
| + // be called in between getBundle calls. This is firstly not an issue, since |
| + // getBundle in practie will empty "entries". However, even if that weren't |
|
iannucci
2015/09/01 03:03:21
s/practie/practice/?
dnj
2015/09/02 01:58:15
Done.
|
| + // the case, since the retained omitted logs are at the end of the array, |
| + // calling append on that slice would properly extend it.) |
| + divider := len(e.Logs) |
| + for i := range e.Logs { |
| + if i >= divider { |
| + // Everything to the right of divider is omitted, so we're done. |
| + break |
| + } |
| + |
| + // Scan forwards until we find an omit spot. |
| + if b.omitMap[i] != round { |
| + continue |
| + } |
| + |
| + // Scan backwards from the end of our log list to find a non-omitted |
| + // entry. |
| + for j := divider - 1; j > i; j-- { |
| + if b.omitMap[j] != round { |
| + // We're omitting "i", and not omitting "j". Swap! |
| + e.Logs[i], e.Logs[j] = e.Logs[j], e.Logs[i] |
| + divider = j |
| + omitted -= 1 |
| + break |
| + } |
| + } |
| + } |
| + divider -= omitted |
| + |
| + // Since our entry has more logs, we must retain it. We will export a clone |
| + // of the entry. |
| + ec := *e.entry() |
| + ec.Logs, e.Logs = e.Logs[:divider], e.Logs[divider:] |
|
iannucci
2015/09/01 03:03:21
won't you end up just growing Logs's underlying ar
dnj
2015/09/02 01:58:15
Yes, until it empties, in which case it's unrefere
|
| + bundle.Entries = append(bundle.Entries, &ec) |
| + } |
| + |
| + if len(bundle.Entries) == 0 { |
| + return nil |
| + } |
| + return &bundle |
| +} |
| + |
| +func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer { |
| + nb := b.NewSizer |
| + if nb == nil { |
| + nb = NewFastSizer |
| + } |
| + return nb(bundle) |
| +} |