| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "sort" |
| 10 |
| 11 "github.com/luci/luci-go/common/logdog/protocol" |
| 12 ) |
| 13 |
| 14 // builderStream is builder data that is tracked for each individual stream. |
| 15 type builderStream struct { |
| 16 // ButlerLogBundle_Entry is the stream's in-progress bundle entry. |
| 17 protocol.ButlerLogBundle_Entry |
| 18 // size incrementally tracks the size of the stream's entry. |
| 19 size int |
| 20 } |
| 21 |
| 22 // builder incrementally constructs ButlerLogBundle entries. |
| 23 type builder struct { |
| 24 // size is the maximum permitted bundle size. |
| 25 size int |
| 26 |
| 27 // template is the base bundle template. |
| 28 template protocol.ButlerLogBundle |
| 29 // templateCachedSize is the cached size of the ButlerLogBundle template
. |
| 30 templateCachedSize int |
| 31 |
| 32 // smap maps the builder state for each individual stream by stream name
. |
| 33 streams map[string]*builderStream |
| 34 } |
| 35 |
| 36 func (b *builder) remaining() int { |
| 37 return b.size - b.bundleSize() |
| 38 } |
| 39 |
| 40 func (b *builder) ready() bool { |
| 41 // Have we reached our desired size? |
| 42 return b.hasContent() && (b.bundleSize() >= b.size) |
| 43 } |
| 44 |
| 45 func (b *builder) bundleSize() int { |
| 46 if b.templateCachedSize == 0 { |
| 47 b.templateCachedSize = protoSize(&b.template) |
| 48 } |
| 49 |
| 50 size := b.templateCachedSize |
| 51 for _, bs := range b.streams { |
| 52 size += sizeOfBundleEntryTag + varintLength(uint64(bs.size)) + b
s.size |
| 53 } |
| 54 |
| 55 return size |
| 56 } |
| 57 |
| 58 func (b *builder) hasContent() bool { |
| 59 return len(b.streams) > 0 |
| 60 } |
| 61 |
| 62 func (b *builder) add(template *protocol.ButlerLogBundle_Entry, le *protocol.Log
Entry) { |
| 63 bs := b.getCreateBuilderStream(template) |
| 64 |
| 65 bs.Logs = append(bs.Logs, le) |
| 66 psize := protoSize(le) |
| 67 |
| 68 // Pay the cost of the additional LogEntry. |
| 69 bs.size += sizeOfLogEntryTag + varintLength(uint64(psize)) + psize |
| 70 } |
| 71 |
| 72 func (b *builder) setStreamTerminal(template *protocol.ButlerLogBundle_Entry, ti
dx uint64) { |
| 73 bs := b.getCreateBuilderStream(template) |
| 74 if bs.Terminal { |
| 75 if bs.TerminalIndex != tidx { |
| 76 panic(fmt.Errorf("attempt to change terminal index %d =>
%d", bs.TerminalIndex, tidx)) |
| 77 } |
| 78 return |
| 79 } |
| 80 |
| 81 bs.Terminal = true |
| 82 bs.TerminalIndex = tidx |
| 83 |
| 84 // Pay the cost of the additional terminal fields. |
| 85 bs.size += ((sizeOfTerminalTag + sizeOfBoolTrue) + |
| 86 (sizeOfTerminalIndexTag + varintLength(bs.TerminalIndex))) |
| 87 } |
| 88 |
| 89 func (b *builder) bundle() *protocol.ButlerLogBundle { |
| 90 bundle := b.template |
| 91 |
| 92 names := make([]string, 0, len(b.streams)) |
| 93 for k := range b.streams { |
| 94 names = append(names, k) |
| 95 } |
| 96 sort.Strings(names) |
| 97 |
| 98 bundle.Entries = make([]*protocol.ButlerLogBundle_Entry, len(names)) |
| 99 for idx, name := range names { |
| 100 bundle.Entries[idx] = &b.streams[name].ButlerLogBundle_Entry |
| 101 } |
| 102 |
| 103 return &bundle |
| 104 } |
| 105 |
| 106 func (b *builder) getCreateBuilderStream(template *protocol.ButlerLogBundle_Entr
y) *builderStream { |
| 107 if bs := b.streams[template.Desc.Name]; bs != nil { |
| 108 return bs |
| 109 } |
| 110 |
| 111 // Initialize our maps (first time only). |
| 112 if b.streams == nil { |
| 113 b.streams = map[string]*builderStream{} |
| 114 } |
| 115 |
| 116 bs := builderStream{ |
| 117 ButlerLogBundle_Entry: *template, |
| 118 size: protoSize(template), |
| 119 } |
| 120 b.streams[template.Desc.Name] = &bs |
| 121 return &bs |
| 122 } |
| OLD | NEW |