Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package bundler | |
| 6 | |
| 7 import ( | |
| 8 "sort" | |
| 9 | |
| 10 "github.com/luci/luci-go/common/logdog/protocol" | |
| 11 "github.com/luci/luci-go/common/logdog/protocol/protoutil" | |
| 12 ) | |
| 13 | |
| 14 // bundlerStream is an aggregate buffered log stream state. It consists of | |
| 15 // a ButlerLogBundle_Entry and its aggregate logs. | |
| 16 type bundlerStream protocol.ButlerLogBundle_Entry | |
| 17 | |
| 18 // mergeEntry merges the state of the supplied ButlerLogBundle_Entry into the | |
| 19 // stream entry template. | |
| 20 func (s *bundlerStream) mergeEntry(e *protocol.ButlerLogBundle_Entry) { | |
| 21 if e.GetTerminal() && !s.entry().GetTerminal() { | |
| 22 s.Terminal = e.Terminal | |
| 23 s.TerminalIndex = e.TerminalIndex | |
| 24 } | |
| 25 } | |
| 26 | |
| 27 func (s *bundlerStream) entry() *protocol.ButlerLogBundle_Entry { | |
| 28 return (*protocol.ButlerLogBundle_Entry)(s) | |
| 29 } | |
| 30 | |
| 31 // bundlerImpl is an implementation of the Bundler interface. | |
| 32 type bundlerImpl struct { | |
| 33 *Config | |
| 34 | |
| 35 sizer Sizer | |
| 36 entries map[string]*bundlerStream | |
| 37 count int | |
| 38 | |
| 39 omitMap map[int]int64 | |
| 40 round int64 | |
| 41 } | |
| 42 | |
| 43 // New instantiates a new Bundler instance. | |
| 44 func New(c Config) Bundler { | |
| 45 // The template bundle may not have entries; clear our copy if it does. | |
| 46 c.TemplateBundle.Entries = nil | |
| 47 | |
| 48 b := &bundlerImpl{ | |
| 49 Config: &c, | |
| 50 omitMap: map[int]int64{}, | |
| 51 round: 1, // Start at "1" because "0" is default value. | |
| 52 } | |
| 53 b.reset() | |
| 54 return b | |
| 55 } | |
| 56 | |
| 57 func (b *bundlerImpl) Append(e *protocol.ButlerLogBundle_Entry) { | |
| 58 logs := e.GetLogs() | |
| 59 | |
| 60 // We first test if a given ButlerLogBundle_Entry is worth | |
| 61 // exporting. An entry is worth exporting if the infromation that it con tains | |
| 62 // provides new information about the log stream or its state. | |
| 63 // | |
| 64 // We consider a log stream worth exporting if it satisfies AT LEAST one of | |
| 65 // the following conditions: | |
| 66 // - It has log data len(e.Logs) > 0 | |
| 67 // - It is terminal: e.GetTerminal() == true | |
| 68 if !(len(logs) > 0 || e.GetTerminal()) { | |
| 69 return | |
| 70 } | |
| 71 | |
| 72 // Add this log to our reserve. | |
| 73 path := string(protoutil.DescriptorPath(e.GetDesc())) | |
| 74 | |
| 75 if cur := b.entries[path]; cur != nil { | |
| 76 // Augment the existing stream with the new logs and state. | |
| 77 cur.mergeEntry(e) | |
| 78 cur.Logs = append(cur.Logs, logs...) | |
| 79 } else { | |
| 80 // This is a new stream. Register the entry as the template for this stream. | |
| 81 b.sizer.Append(e, nil) | |
| 82 b.entries[path] = (*bundlerStream)(e) | |
| 83 } | |
| 84 | |
| 85 for _, le := range logs { | |
| 86 b.sizer.Append(e, le) | |
| 87 } | |
| 88 b.count += len(logs) | |
| 89 return | |
| 90 } | |
| 91 | |
| 92 func (b *bundlerImpl) reset() { | |
| 93 b.sizer = b.newSizer(&b.TemplateBundle) | |
| 94 b.count = 0 | |
| 95 b.entries = map[string]*bundlerStream{} | |
| 96 } | |
| 97 | |
| 98 func (b *bundlerImpl) Empty() bool { | |
| 99 return len(b.entries) == 0 | |
| 100 } | |
| 101 | |
| 102 func (b *bundlerImpl) Size() int64 { | |
| 103 return b.sizer.Size() | |
| 104 } | |
| 105 | |
| 106 func (b *bundlerImpl) GetBundles() []*protocol.ButlerLogBundle { | |
| 107 return b.getBundlesImpl(b.Threshold) | |
| 108 } | |
| 109 | |
| 110 func (b *bundlerImpl) getBundlesImpl(threshold int64) []*protocol.ButlerLogBundl e { | |
| 111 bundles := []*protocol.ButlerLogBundle(nil) | |
| 112 | |
| 113 for { | |
| 114 bundle := b.getBundle(threshold) | |
| 115 if bundle == nil { | |
| 116 break | |
| 117 } | |
| 118 bundles = append(bundles, bundle) | |
| 119 } | |
| 120 | |
| 121 // If we still have bundle entries, it is likely because no entries can | |
| 122 // fit within the threshold. This is a configuration error, and our only | |
| 123 // viable response is to clear them out (drop them). | |
| 124 b.reset() | |
| 125 | |
| 126 return bundles | |
| 127 } | |
| 128 | |
| 129 func (b *bundlerImpl) getBundle(threshold int64) *protocol.ButlerLogBundle { | |
| 130 // Short circuit if we don't have any entries. | |
| 131 if len(b.entries) == 0 { | |
| 132 return nil | |
| 133 } | |
| 134 | |
| 135 // Clone our template bundle, as we intend to modify it by adding entrie s. | |
| 136 bundle := b.TemplateBundle | |
| 137 | |
| 138 // For determinism, add buffered entries in order of path. | |
| 139 keys := make([]string, 0, len(b.entries)) | |
| 140 for k := range b.entries { | |
| 141 keys = append(keys, string(k)) | |
| 142 } | |
| 143 sort.Strings(keys) | |
|
iannucci
2015/09/01 03:03:21
only for testing, right?
dnj
2015/09/02 01:58:15
I like determinism and predictability :/
So think
| |
| 144 | |
| 145 sizer := b.newSizer(&bundle) | |
| 146 overThreshold := func() bool { | |
| 147 return threshold != 0 && threshold < sizer.Size() | |
| 148 } | |
| 149 | |
| 150 // For efficiency, we declare a single omit map. This map is keyed off o f log | |
| 151 // index. Each time we enter the bundler loop, we increment our "round" | |
| 152 // variable. For any given loop, an entry is omitted if its omitMap | |
| 153 // index equals the current round. | |
| 154 for _, k := range keys { | |
| 155 // Get the current round. | |
| 156 round := b.round | |
| 157 b.round++ | |
| 158 | |
| 159 // We assume that each entry in "entries" is worth exporting (se e | |
| 160 // comment in Append), else it would not have been added to the entries | |
| 161 // map by Append. | |
| 162 e := b.entries[k] | |
| 163 | |
| 164 // If this entry has no logs, it is a terminal entry. Try and ex port it as | |
|
iannucci
2015/09/01 03:03:21
hm... there's no more explicit way to denote this?
dnj
2015/09/02 01:58:14
Well, kind of. The code drops any entry that doesn
| |
| 165 // a standalone bundle entry. | |
| 166 if len(e.Logs) == 0 { | |
| 167 sizer.Append(e.entry(), nil) | |
| 168 if overThreshold() { | |
| 169 // We don't have enough room to export a bundle entry. Our bundle is | |
| 170 // full for this round. | |
| 171 break | |
| 172 } | |
| 173 | |
| 174 // Our bundle is empty. This means that none of the entr y's logs fit in | |
| 175 // an empty bundle! Discard the full entry (data loss). | |
|
iannucci
2015/09/01 03:03:22
this needs to be logged somehow. does logdog need
dnj
2015/09/02 01:58:15
Butler will output to STDOUT/STDERR, so Swarming t
| |
| 176 bundle.Entries = append(bundle.Entries, e.entry()) | |
| 177 delete(b.entries, k) | |
| 178 continue | |
| 179 } | |
| 180 | |
| 181 // Our entry has logs. Export any that fit into our bundle witho ut violating | |
| 182 // the threshold. | |
| 183 omitted := 0 | |
| 184 for i, le := range e.Logs { | |
| 185 sizer.Append(e.entry(), le) | |
| 186 if overThreshold() { | |
| 187 sizer.Undo() | |
| 188 b.omitMap[i] = round | |
| 189 omitted++ | |
| 190 } | |
|
iannucci
2015/09/01 03:03:21
shouldn't we stop after the first overage? Or is t
dnj
2015/09/02 01:58:15
Yep that.
| |
| 191 } | |
| 192 | |
| 193 // If all log entries available were successfully exported, we a re finished | |
| 194 // with this entry. We will take an optimized path and hand its pointer | |
| 195 // directly to the export bundle. | |
| 196 if omitted == 0 { | |
| 197 bundle.Entries = append(bundle.Entries, e.entry()) | |
| 198 delete(b.entries, k) | |
| 199 continue | |
| 200 } | |
| 201 | |
| 202 // If none of the entry's logs were exported, we're done with it for this | |
| 203 // round. | |
| 204 if omitted == len(e.Logs) { | |
| 205 // If our bundle is empty, this means that NONE of the e ntry's logs fit in | |
| 206 // an empty bundle! Since none will fit individually, we will discard the | |
| 207 // full entry (data loss). | |
|
iannucci
2015/09/01 03:03:21
log
dnj
2015/09/02 01:58:14
Done.
| |
| 208 if len(bundle.GetEntries()) == 0 { | |
| 209 delete(b.entries, k) | |
| 210 } | |
| 211 continue | |
| 212 } | |
| 213 | |
| 214 // We are exporting some of the entry's logs. We will do this by exporting | |
| 215 // a clone of the entry containing just the exported logs. | |
| 216 // | |
| 217 // We will reuse the entry's Logs array in order to avoid unnece ssary | |
| 218 // allocations. In order to do this, we will split the array int o two | |
|
iannucci
2015/09/01 03:03:21
*skeptical*
I think this would all be much (much)
dnj
2015/09/02 01:58:14
I'm expecting this to get thrashed pretty heavily
| |
| 219 // slices. The first slice will contain the exported elements, a nd the | |
| 220 // second will contain the remainder. | |
| 221 // | |
| 222 // (It should be noted that there is a potential issue if Append () were to | |
| 223 // be called in between getBundle calls. This is firstly not an issue, since | |
| 224 // getBundle in practie will empty "entries". However, even if t hat weren't | |
|
iannucci
2015/09/01 03:03:21
s/practie/practice/?
dnj
2015/09/02 01:58:15
Done.
| |
| 225 // the case, since the retained omitted logs are at the end of t he array, | |
| 226 // calling append on that slice would properly extend it.) | |
| 227 divider := len(e.Logs) | |
| 228 for i := range e.Logs { | |
| 229 if i >= divider { | |
| 230 // Everything to the right of divider is omitted , so we're done. | |
| 231 break | |
| 232 } | |
| 233 | |
| 234 // Scan forwards until we find an omit spot. | |
| 235 if b.omitMap[i] != round { | |
| 236 continue | |
| 237 } | |
| 238 | |
| 239 // Scan backwards from the end of our log list to find a non-omitted | |
| 240 // entry. | |
| 241 for j := divider - 1; j > i; j-- { | |
| 242 if b.omitMap[j] != round { | |
| 243 // We're omitting "i", and not omitting "j". Swap! | |
| 244 e.Logs[i], e.Logs[j] = e.Logs[j], e.Logs [i] | |
| 245 divider = j | |
| 246 omitted -= 1 | |
| 247 break | |
| 248 } | |
| 249 } | |
| 250 } | |
| 251 divider -= omitted | |
| 252 | |
| 253 // Since our entry has more logs, we must retain it. We will exp ort a clone | |
| 254 // of the entry. | |
| 255 ec := *e.entry() | |
| 256 ec.Logs, e.Logs = e.Logs[:divider], e.Logs[divider:] | |
|
iannucci
2015/09/01 03:03:21
won't you end up just growing Logs's underlying ar
dnj
2015/09/02 01:58:15
Yes, until it empties, in which case it's unrefere
| |
| 257 bundle.Entries = append(bundle.Entries, &ec) | |
| 258 } | |
| 259 | |
| 260 if len(bundle.Entries) == 0 { | |
| 261 return nil | |
| 262 } | |
| 263 return &bundle | |
| 264 } | |
| 265 | |
| 266 func (b *bundlerImpl) newSizer(bundle *protocol.ButlerLogBundle) Sizer { | |
| 267 nb := b.NewSizer | |
| 268 if nb == nil { | |
| 269 nb = NewFastSizer | |
| 270 } | |
| 271 return nb(bundle) | |
| 272 } | |
| OLD | NEW |