Chromium Code Reviews| Index: client/internal/logdog/butler/bundler/bundler_impl.go |
| diff --git a/client/internal/logdog/butler/bundler/bundler_impl.go b/client/internal/logdog/butler/bundler/bundler_impl.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..1f0aa07bd6beb65ee1fc103e0685c2488122368a |
| --- /dev/null |
| +++ b/client/internal/logdog/butler/bundler/bundler_impl.go |
| @@ -0,0 +1,190 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package bundler |
| + |
| +import ( |
| + "sort" |
| + |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| + "github.com/luci/luci-go/common/logdog/protocol/protoutil" |
| + "github.com/luci/luci-go/common/logdog/types" |
| +) |
| + |
| +// bundlerStream is an aggregate buffered log stream state. It consists of |
| +// a ButlerLogBundle_Entry and its aggregate logs. |
| +type bundlerStream protocol.ButlerLogBundle_Entry |
| + |
| +// mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the |
| +// stream entry template. |
| +func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) { |
| + if e.GetTerminal() && !s.entry().GetTerminal() { |
| + s.Terminal = e.Terminal |
| + s.TerminalIndex = e.TerminalIndex |
| + } |
| +} |
| + |
| +func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry { |
| + return (*protocol.ButlerLogBundle_Entry)(s) |
| +} |
| + |
| +// bundlerImpl is an implementation of the Bundler interface. |
| +type bundlerImpl struct { |
| + *Config |
| + |
| + sizer Sizer |
| + entries map[types.StreamPath]*bundlerStream |
| + count int |
| +} |
| + |
| +// New instantiates a new Bundler instance. |
| +func New(c Config) Bundler { |
| + // The template bundle may not have entries; clear our copy if it does. |
| + c.TemplateBundle.Entries = nil |
| + |
| + b := &bundlerImpl{ |
| + Config: &c, |
| + } |
| + b.reset() |
| + return b |
| +} |
| + |
| +func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) { |
| + logs := e.GetLogs() |
|
tandrii(chromium)
2015/08/11 18:31:05
e.GetLogs() is cheap, and IMO, it's cleaner in thi
dnj (Google)
2015/08/12 03:20:09
I think having the function calls inline in otherw
dnj
2015/09/02 01:58:14
Granted it's cheap, but I don't think it's cleaner
|
| + |
| + // We first test if a given ButlerLogBundle_Entry is worth |
| + // exporting. An entry is worth exporting if the infromation that it contains |
| + // provides new information about the log stream or its state. |
| + // |
| + // We consider a log stream worth exporting if it satisfies AT LEAST one of |
| + // the following conditions: |
| + // - It has log data len(e.Logs) > 0 |
| + // - It is terminal: e.GetTerminal() == true |
| + if !(len(logs) > 0 || e.GetTerminal()) { |
| + return |
| + } |
| + |
| + // Add this log to our reserve. |
| + path := protoutil.DescriptorPath(e.GetDesc()) |
| + cur, ok := b.entries[path] |
| + if !ok { |
| + // Add this new entry to the bundle. |
| + b.sizer.AppendBundleEntry(e) |
| + |
| + cur = (*bundlerStream)(e) |
|
tandrii(chromium)
2015/08/11 18:31:05
i'd merge this and following lines.
dnj (Google)
2015/08/12 03:20:09
Oh hey, I'm no longer using "cur" outside of this
dnj
2015/09/02 01:58:14
Oh good catch, I used to care about "cur" outside
|
| + b.entries[path] = cur |
| + } else { |
| + cur.mergeEntry(e) |
| + cur.Logs = append(cur.Logs, logs...) |
| + } |
| + |
| + for _, le := range logs { |
| + b.sizer.AppendLogEntry(e, le) |
| + } |
| + b.count += len(logs) |
| + return |
| +} |
| + |
| +func (b *bundlerImpl) reset() { |
| + b.sizer = b.newSizer(&b.TemplateBundle) |
| + b.count = 0 |
| + b.entries = map[types.StreamPath]*bundlerStream{} |
| +} |
| + |
| +func (b *bundlerImpl) Empty() bool { |
| + return len(b.entries) == 0 |
| +} |
| + |
| +func (b *bundlerImpl) Size() int64 { |
| + return b.sizer.Size() |
| +} |
| + |
| +func (b *bundlerImpl) GetBundles(threshold int64) []*protocol.ButlerLogBundle { |
| + bundles := []*protocol.ButlerLogBundle(nil) |
| + for { |
| + bundle := b.getBundle(threshold) |
| + if bundle == nil { |
| + break |
| + } |
| + bundles = append(bundles, bundle) |
| + } |
| + |
| + // If we still have bundle entries, it is likely because no entries fit with |
| + // the threshold. Clear them out. |
| + b.reset() |
| + |
| + return bundles |
| +} |
| + |
| +func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle { |
| + // Clone our template bundle, as we intend to modify it by adding entries. |
| + bundle := b.TemplateBundle |
| + |
| + // For determinism, add buffered entries in order of path. |
| + keys := make([]string, 0, len(b.entries)) |
| + for k := range b.entries { |
| + keys = append(keys, string(k)) |
| + } |
| + sort.Strings(keys) |
| + |
| + sizer := b.newSizer(&bundle) |
| + overThreshold := func() bool { |
| + return threshold != 0 && threshold < sizer.Size() |
| + } |
| + |
| + for _, k := range keys { |
| + // We assume that each entry in "entries" is worth exporting (see |
| + // comment in Append), else it would not have been added to the entries |
| + // map by Append. |
| + e := b.entries[types.StreamPath(k)] |
| + |
| + // Can we add this entry without exceeding our size threshold? |
| + sizer.AppendBundleEntry(e.entry()) |
| + if overThreshold() { |
|
tandrii(chromium)
2015/08/11 18:31:05
ok, I agree this is necessary.
|
| + break |
| + } |
| + |
| + // Count how many logs we can add without hitting our threshold. |
| + count := 0 |
| + for _, le := range e.Logs { |
| + sizer.AppendLogEntry(e.entry(), le) |
| + if overThreshold() { |
| + break |
| + } |
| + |
| + count++ |
| + } |
| + |
| + // If we can't add all of the logs, we will export a clone of the template |
| + // bundle entry with specifically the logs that fit. |
| + if count < len(e.Logs) { |
| + ec := *e.entry() |
| + ec.Logs = make([]*protocol.LogEntry, count) |
| + copy(ec.Logs, e.Logs) |
| + bundle.Entries = append(bundle.Entries, &ec) |
| + |
| + // Left-shift our retained logs to consume the ones that we've exported. |
| + e.Logs = append(e.Logs[:0], e.Logs[count:]...) |
| + break |
| + } |
| + |
| + // We've consumed all logs for this entry. Since we're deleting the |
| + // bundlerStream from the map, export its template and logs slice directly. |
| + bundle.Entries = append(bundle.Entries, e.entry()) |
| + delete(b.entries, types.StreamPath(k)) |
| + } |
| + |
| + if len(bundle.Entries) == 0 { |
| + return nil |
| + } |
| + return &bundle |
| +} |
| + |
| +func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer { |
| + nb := b.NewSizer |
| + if nb == nil { |
| + nb = NewFastSizer |
| + } |
| + return nb(bundle) |
| +} |