Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package bundler | |
| 6 | |
| 7 import ( | |
| 8 "sort" | |
| 9 | |
| 10 "github.com/luci/luci-go/common/logdog/protocol" | |
| 11 "github.com/luci/luci-go/common/logdog/protocol/protoutil" | |
| 12 "github.com/luci/luci-go/common/logdog/types" | |
| 13 ) | |
| 14 | |
| 15 // bundlerStream is an aggregate buffered log stream state. It consists of | |
| 16 // a ButlerLogBundle_Entry and its aggregate logs. | |
| 17 type bundlerStream protocol.ButlerLogBundle_Entry | |
| 18 | |
| 19 // mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the | |
| 20 // stream entry template. | |
| 21 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) { | |
| 22 if e.GetTerminal() && !s.entry().GetTerminal() { | |
| 23 s.Terminal = e.Terminal | |
| 24 s.TerminalIndex = e.TerminalIndex | |
| 25 } | |
| 26 } | |
| 27 | |
| 28 func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry { | |
| 29 return (*protocol.ButlerLogBundle_Entry)(s) | |
| 30 } | |
| 31 | |
| 32 // bundlerImpl is an implementation of the Bundler interface. | |
| 33 type bundlerImpl struct { | |
| 34 *Config | |
| 35 | |
| 36 sizer Sizer | |
| 37 entries map[types.StreamPath]*bundlerStream | |
| 38 count int | |
| 39 } | |
| 40 | |
| 41 // New instantiates a new Bundler instance. | |
| 42 func New(c Config) Bundler { | |
| 43 // The template bundle may not have entries; clear our copy if it does. | |
| 44 c.TemplateBundle.Entries = nil | |
| 45 | |
| 46 b := &bundlerImpl{ | |
| 47 Config: &c, | |
| 48 } | |
| 49 b.reset() | |
| 50 return b | |
| 51 } | |
| 52 | |
| 53 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) { | |
| 54 logs := e.GetLogs() | |
|
tandrii(chromium)
2015/08/11 18:31:05
e.GetLogs() is cheap, and IMO, it's cleaner in thi
dnj (Google)
2015/08/12 03:20:09
I think having the function calls inline in otherw
dnj
2015/09/02 01:58:14
Granted it's cheap, but I don't think it's cleaner
| |
| 55 | |
| 56 // We first test if a given ButlerLogBundle_Entry is worth | |
| 57 // exporting. An entry is worth exporting if the infromation that it con tains | |
| 58 // provides new information about the log stream or its state. | |
| 59 // | |
| 60 // We consider a log stream worth exporting if it satisfies AT LEAST one of | |
| 61 // the following conditions: | |
| 62 // - It has log data len(e.Logs) > 0 | |
| 63 // - It is terminal: e.GetTerminal() == true | |
| 64 if !(len(logs) > 0 || e.GetTerminal()) { | |
| 65 return | |
| 66 } | |
| 67 | |
| 68 // Add this log to our reserve. | |
| 69 path := protoutil.DescriptorPath(e.GetDesc()) | |
| 70 cur, ok := b.entries[path] | |
| 71 if !ok { | |
| 72 // Add this new entry to the bundle. | |
| 73 b.sizer.AppendBundleEntry(e) | |
| 74 | |
| 75 cur = (*bundlerStream)(e) | |
|
tandrii(chromium)
2015/08/11 18:31:05
i'd merge this and following lines.
dnj (Google)
2015/08/12 03:20:09
Oh hey, I'm no longer using "cur" outside of this
dnj
2015/09/02 01:58:14
Oh good catch, I used to care about "cur" outside
| |
| 76 b.entries[path] = cur | |
| 77 } else { | |
| 78 cur.mergeEntry(e) | |
| 79 cur.Logs = append(cur.Logs, logs...) | |
| 80 } | |
| 81 | |
| 82 for _, le := range logs { | |
| 83 b.sizer.AppendLogEntry(e, le) | |
| 84 } | |
| 85 b.count += len(logs) | |
| 86 return | |
| 87 } | |
| 88 | |
| 89 func (b *bundlerImpl) reset() { | |
| 90 b.sizer = b.newSizer(&b.TemplateBundle) | |
| 91 b.count = 0 | |
| 92 b.entries = map[types.StreamPath]*bundlerStream{} | |
| 93 } | |
| 94 | |
| 95 func (b *bundlerImpl) Empty() bool { | |
| 96 return len(b.entries) == 0 | |
| 97 } | |
| 98 | |
| 99 func (b *bundlerImpl) Size() int64 { | |
| 100 return b.sizer.Size() | |
| 101 } | |
| 102 | |
| 103 func (b *bundlerImpl) GetBundles(threshold int64) []*protocol.ButlerLogBundle { | |
| 104 bundles := []*protocol.ButlerLogBundle(nil) | |
| 105 for { | |
| 106 bundle := b.getBundle(threshold) | |
| 107 if bundle == nil { | |
| 108 break | |
| 109 } | |
| 110 bundles = append(bundles, bundle) | |
| 111 } | |
| 112 | |
| 113 // If we still have bundle entries, it is likely because no entries fit with | |
| 114 // the threshold. Clear them out. | |
| 115 b.reset() | |
| 116 | |
| 117 return bundles | |
| 118 } | |
| 119 | |
| 120 func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle { | |
| 121 // Clone our template bundle, as we intend to modify it by adding entrie s. | |
| 122 bundle := b.TemplateBundle | |
| 123 | |
| 124 // For determinism, add buffered entries in order of path. | |
| 125 keys := make([]string, 0, len(b.entries)) | |
| 126 for k := range b.entries { | |
| 127 keys = append(keys, string(k)) | |
| 128 } | |
| 129 sort.Strings(keys) | |
| 130 | |
| 131 sizer := b.newSizer(&bundle) | |
| 132 overThreshold := func() bool { | |
| 133 return threshold != 0 && threshold < sizer.Size() | |
| 134 } | |
| 135 | |
| 136 for _, k := range keys { | |
| 137 // We assume that each entry in "entries" is worth exporting (se e | |
| 138 // comment in Append), else it would not have been added to the entries | |
| 139 // map by Append. | |
| 140 e := b.entries[types.StreamPath(k)] | |
| 141 | |
| 142 // Can we add this entry without exceeding our size threshold? | |
| 143 sizer.AppendBundleEntry(e.entry()) | |
| 144 if overThreshold() { | |
|
tandrii(chromium)
2015/08/11 18:31:05
ok, I agree this is necessary.
| |
| 145 break | |
| 146 } | |
| 147 | |
| 148 // Count how many logs we can add without hitting our threshold. | |
| 149 count := 0 | |
| 150 for _, le := range e.Logs { | |
| 151 sizer.AppendLogEntry(e.entry(), le) | |
| 152 if overThreshold() { | |
| 153 break | |
| 154 } | |
| 155 | |
| 156 count++ | |
| 157 } | |
| 158 | |
| 159 // If we can't add all of the logs, we will export a clone of th e template | |
| 160 // bundle entry with specifically the logs that fit. | |
| 161 if count < len(e.Logs) { | |
| 162 ec := *e.entry() | |
| 163 ec.Logs = make([]*protocol.LogEntry, count) | |
| 164 copy(ec.Logs, e.Logs) | |
| 165 bundle.Entries = append(bundle.Entries, &ec) | |
| 166 | |
| 167 // Left-shift our retained logs to consume the ones that we've exported. | |
| 168 e.Logs = append(e.Logs[:0], e.Logs[count:]...) | |
| 169 break | |
| 170 } | |
| 171 | |
| 172 // We've consumed all logs for this entry. Since we're deleting the | |
| 173 // bundlerStream from the map, export its template and logs slic e directly. | |
| 174 bundle.Entries = append(bundle.Entries, e.entry()) | |
| 175 delete(b.entries, types.StreamPath(k)) | |
| 176 } | |
| 177 | |
| 178 if len(bundle.Entries) == 0 { | |
| 179 return nil | |
| 180 } | |
| 181 return &bundle | |
| 182 } | |
| 183 | |
| 184 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer { | |
| 185 nb := b.NewSizer | |
| 186 if nb == nil { | |
| 187 nb = NewFastSizer | |
| 188 } | |
| 189 return nb(bundle) | |
| 190 } | |
| OLD | NEW |