Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(31)

Unified Diff: client/internal/logdog/butler/bundler/bundler_impl.go

Issue 1276923003: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Rewrote bundle logic (and associated updates). Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/bundler_impl.go
diff --git a/client/internal/logdog/butler/bundler/bundler_impl.go b/client/internal/logdog/butler/bundler/bundler_impl.go
new file mode 100644
index 0000000000000000000000000000000000000000..8a7c50edc409e6dc1696defe5d0c75fa4bfdfe52
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/bundler_impl.go
@@ -0,0 +1,272 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "sort"
+
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/logdog/protocol/protoutil"
+)
+
+// bundlerStream is an aggregate buffered log stream state. It consists of
+// a ButlerLogBundle_Entry and its aggregate logs.
+type bundlerStream protocol.ButlerLogBundle_Entry
+
+// mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the
+// stream entry template.
+func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) {
+ if e.GetTerminal() && !s.entry().GetTerminal() {
+ s.Terminal = e.Terminal
+ s.TerminalIndex = e.TerminalIndex
+ }
+}
+
+func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry {
+ return (*protocol.ButlerLogBundle_Entry)(s)
+}
+
+// bundlerImpl is an implementation of the Bundler interface.
+type bundlerImpl struct {
+ *Config
+
+ sizer Sizer
+ entries map[string]*bundlerStream
+ count int
+
+ omitMap map[int]int64
+ round int64
+}
+
+// New instantiates a new Bundler instance.
+func New(c Config) Bundler {
+ // The template bundle may not have entries; clear our copy if it does.
+ c.TemplateBundle.Entries = nil
+
+ b := &bundlerImpl{
+ Config: &c,
+ omitMap: map[int]int64{},
+ round: 1, // Start at "1" because "0" is default value.
+ }
+ b.reset()
+ return b
+}
+
+func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) {
+ logs := e.GetLogs()
+
+ // We first test if a given ButlerLogBundle_Entry is worth
+ // exporting. An entry is worth exporting if the infromation that it contains
+ // provides new information about the log stream or its state.
+ //
+ // We consider a log stream worth exporting if it satisfies AT LEAST one of
+ // the following conditions:
+ // - It has log data len(e.Logs) > 0
+ // - It is terminal: e.GetTerminal() == true
+ if !(len(logs) > 0 || e.GetTerminal()) {
+ return
+ }
+
+ // Add this log to our reserve.
+ path := string(protoutil.DescriptorPath(e.GetDesc()))
+
+ if cur := b.entries[path]; cur != nil {
+ // Augment the existing stream with the new logs and state.
+ cur.mergeEntry(e)
+ cur.Logs = append(cur.Logs, logs...)
+ } else {
+ // This is a new stream. Register the entry as the template for this stream.
+ b.sizer.Append(e, nil)
+ b.entries[path] = (*bundlerStream)(e)
+ }
+
+ for _, le := range logs {
+ b.sizer.Append(e, le)
+ }
+ b.count += len(logs)
+ return
+}
+
+func (b *bundlerImpl) reset() {
+ b.sizer = b.newSizer(&b.TemplateBundle)
+ b.count = 0
+ b.entries = map[string]*bundlerStream{}
+}
+
+func (b *bundlerImpl) Empty() bool {
+ return len(b.entries) == 0
+}
+
+func (b *bundlerImpl) Size() int64 {
+ return b.sizer.Size()
+}
+
+func (b *bundlerImpl) GetBundles() []*protocol.ButlerLogBundle {
+ return b.getBundlesImpl(b.Threshold)
+}
+
+func (b *bundlerImpl) getBundlesImpl(threshold int64) []*protocol.ButlerLogBundle {
+ bundles := []*protocol.ButlerLogBundle(nil)
+
+ for {
+ bundle := b.getBundle(threshold)
+ if bundle == nil {
+ break
+ }
+ bundles = append(bundles, bundle)
+ }
+
+ // If we still have bundle entries, it is likely because no entries can
+ // fit within the threshold. This is a configuration error, and our only
+ // viable response is to clear them out (drop them).
+ b.reset()
+
+ return bundles
+}
+
+func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle {
+ // Short circuit if we don't have any entries.
+ if len(b.entries) == 0 {
+ return nil
+ }
+
+ // Clone our template bundle, as we intend to modify it by adding entries.
+ bundle := b.TemplateBundle
+
+ // For determinism, add buffered entries in order of path.
+ keys := make([]string, 0, len(b.entries))
+ for k := range b.entries {
+ keys = append(keys, string(k))
+ }
+ sort.Strings(keys)
iannucci 2015/09/01 03:03:21 only for testing, right?
dnj 2015/09/02 01:58:15 I like determinism and predictability :/ So think
+
+ sizer := b.newSizer(&bundle)
+ overThreshold := func() bool {
+ return threshold != 0 && threshold < sizer.Size()
+ }
+
+ // For efficiency, we declare a single omit map. This map is keyed off of log
+ // index. Each time we enter the bundler loop, we increment our "round"
+ // variable. For any given loop, an entry is omitted if its omitMap
+ // index equals the current round.
+ for _, k := range keys {
+ // Get the current round.
+ round := b.round
+ b.round++
+
+ // We assume that each entry in "entries" is worth exporting (see
+ // comment in Append), else it would not have been added to the entries
+ // map by Append.
+ e := b.entries[k]
+
+ // If this entry has no logs, it is a terminal entry. Try and export it as
iannucci 2015/09/01 03:03:21 hm... there's no more explicit way to denote this?
dnj 2015/09/02 01:58:14 Well, kind of. The code drops any entry that doesn
+ // a standalone bundle entry.
+ if len(e.Logs) == 0 {
+ sizer.Append(e.entry(), nil)
+ if overThreshold() {
+ // We don't have enough room to export a bundle entry. Our bundle is
+ // full for this round.
+ break
+ }
+
+ // Our bundle is empty. This means that none of the entry's logs fit in
+ // an empty bundle! Discard the full entry (data loss).
iannucci 2015/09/01 03:03:22 this needs to be logged somehow. does logdog need
dnj 2015/09/02 01:58:15 Butler will output to STDOUT/STDERR, so Swarming t
+ bundle.Entries = append(bundle.Entries, e.entry())
+ delete(b.entries, k)
+ continue
+ }
+
+ // Our entry has logs. Export any that fit into our bundle without violating
+ // the threshold.
+ omitted := 0
+ for i, le := range e.Logs {
+ sizer.Append(e.entry(), le)
+ if overThreshold() {
+ sizer.Undo()
+ b.omitMap[i] = round
+ omitted++
+ }
iannucci 2015/09/01 03:03:21 shouldn't we stop after the first overage? Or is t
dnj 2015/09/02 01:58:15 Yep that.
+ }
+
+ // If all log entries available were successfully exported, we are finished
+ // with this entry. We will take an optimized path and hand its pointer
+ // directly to the export bundle.
+ if omitted == 0 {
+ bundle.Entries = append(bundle.Entries, e.entry())
+ delete(b.entries, k)
+ continue
+ }
+
+ // If none of the entry's logs were exported, we're done with it for this
+ // round.
+ if omitted == len(e.Logs) {
+ // If our bundle is empty, this means that NONE of the entry's logs fit in
+ // an empty bundle! Since none will fit individually, we will discard the
+ // full entry (data loss).
iannucci 2015/09/01 03:03:21 log
dnj 2015/09/02 01:58:14 Done.
+ if len(bundle.GetEntries()) == 0 {
+ delete(b.entries, k)
+ }
+ continue
+ }
+
+ // We are exporting some of the entry's logs. We will do this by exporting
+ // a clone of the entry containing just the exported logs.
+ //
+ // We will reuse the entry's Logs array in order to avoid unnecessary
+ // allocations. In order to do this, we will split the array into two
iannucci 2015/09/01 03:03:21 *skeptical* I think this would all be much (much)
dnj 2015/09/02 01:58:14 I'm expecting this to get thrashed pretty heavily
+ // slices. The first slice will contain the exported elements, and the
+ // second will contain the remainder.
+ //
+ // (It should be noted that there is a potential issue if Append() were to
+ // be called in between getBundle calls. This is firstly not an issue, since
+ // getBundle in practie will empty "entries". However, even if that weren't
iannucci 2015/09/01 03:03:21 s/practie/practice/?
dnj 2015/09/02 01:58:15 Done.
+ // the case, since the retained omitted logs are at the end of the array,
+ // calling append on that slice would properly extend it.)
+ divider := len(e.Logs)
+ for i := range e.Logs {
+ if i >= divider {
+ // Everything to the right of divider is omitted, so we're done.
+ break
+ }
+
+ // Scan forwards until we find an omit spot.
+ if b.omitMap[i] != round {
+ continue
+ }
+
+ // Scan backwards from the end of our log list to find a non-omitted
+ // entry.
+ for j := divider - 1; j > i; j-- {
+ if b.omitMap[j] != round {
+ // We're omitting "i", and not omitting "j". Swap!
+ e.Logs[i], e.Logs[j] = e.Logs[j], e.Logs[i]
+ divider = j
+ omitted -= 1
+ break
+ }
+ }
+ }
+ divider -= omitted
+
+ // Since our entry has more logs, we must retain it. We will export a clone
+ // of the entry.
+ ec := *e.entry()
+ ec.Logs, e.Logs = e.Logs[:divider], e.Logs[divider:]
iannucci 2015/09/01 03:03:21 won't you end up just growing Logs's underlying ar
dnj 2015/09/02 01:58:15 Yes, until it empties, in which case it's unrefere
+ bundle.Entries = append(bundle.Entries, &ec)
+ }
+
+ if len(bundle.Entries) == 0 {
+ return nil
+ }
+ return &bundle
+}
+
+func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer {
+ nb := b.NewSizer
+ if nb == nil {
+ nb = NewFastSizer
+ }
+ return nb(bundle)
+}

Powered by Google App Engine
This is Rietveld 408576698