Index: client/internal/logdog/butler/bundler/stream_test.go |
diff --git a/client/internal/logdog/butler/bundler/stream_test.go b/client/internal/logdog/butler/bundler/stream_test.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..dffea4dac8f2aba7810f9f61595ed3549ad11c16 |
--- /dev/null |
+++ b/client/internal/logdog/butler/bundler/stream_test.go |
@@ -0,0 +1,452 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package bundler |
+ |
+import ( |
+ "bytes" |
+ "errors" |
+ "fmt" |
+ "strconv" |
+ "testing" |
+ "time" |
+ |
+ "github.com/luci/luci-go/common/clock/testclock" |
+ "github.com/luci/luci-go/common/logdog/protocol" |
+ . "github.com/smartystreets/goconvey/convey" |
+) |
+ |
+type testParserCommand struct { |
+ // data is the data content of this command. |
+ data []byte |
+ // ts is the timestamp, which is valid if this is a data command. |
+ ts time.Time |
+ // truncate causes this command to only return if configured to truncate. |
+ truncateToggle *bool |
+ // closed causes this command to only return if configured to truncate and closed. |
+ closedToggle *bool |
+ // err, if not nil, is returned when this command is encountered. |
+ err error |
+} |
+ |
+var errTestInduced = errors.New("test error") |
+ |
+// testParser is a parser implementation that allows specifically-configured |
+// data to be emitted. It consumes commands, some of which alter its behavior |
+// and others of which present data. The resulting state affects how it emits |
+// LogEntry records via nextEntry. |
+type testParser struct { |
+ commands []*testParserCommand |
+ |
+ appendedData []byte |
+ truncateOn bool |
+ closedOn bool |
+ err error |
+} |
+ |
+func (p *testParser) addCommand(r *testParserCommand) { |
+ p.commands = append(p.commands, r) |
+} |
+ |
+func (p *testParser) nextCommand(pop bool) *testParserCommand { |
+ if len(p.commands) == 0 { |
+ return nil |
+ } |
+ cmd := p.commands[0] |
+ if pop { |
+ p.commands = p.commands[1:] |
+ } |
+ return cmd |
+} |
+ |
+func (p *testParser) popData() (r *testParserCommand) { |
+ for i, cmd := range p.commands { |
+ if cmd.data != nil { |
+ p.commands = p.commands[i+1:] |
+ return cmd |
+ } |
+ } |
+ return nil |
+} |
+ |
+func (p *testParser) tags(ts time.Time, commands ...string) { |
+ for _, c := range commands { |
+ p.addTag(c, ts) |
+ } |
+} |
+ |
+func (p *testParser) addError(err error) { |
+ p.addCommand(&testParserCommand{ |
+ err: err, |
+ }) |
+} |
+ |
+func (p *testParser) addTag(tag string, ts time.Time) { |
+ p.addData([]byte(tag), ts) |
+} |
+ |
+func (p *testParser) addData(d []byte, ts time.Time) { |
+ p.addCommand(&testParserCommand{ |
+ data: d, |
+ ts: ts, |
+ }) |
+} |
+ |
+func (p *testParser) setTrunc(value bool) { |
+ p.addCommand(&testParserCommand{ |
+ truncateToggle: &value, |
+ }) |
+} |
+ |
+func (p *testParser) setClosed(value bool) { |
+ p.addCommand(&testParserCommand{ |
+ closedToggle: &value, |
+ }) |
+} |
+ |
+func (p *testParser) appendData(d Data) { |
+ p.addData(d.Bytes(), d.Timestamp()) |
+} |
+ |
+func (p *testParser) nextEntry(c *constraints) (*protocol.LogEntry, error) { |
+ // Process records until we hit data or run out. |
+ for p.err == nil { |
+ rec := p.nextCommand(false) |
+ if rec == nil { |
+ return nil, p.err |
+ } |
+ |
+ // If this is a data record, process. |
+ if rec.data != nil { |
+ break |
+ } |
+ |
+ // Ingest commands, repeat. |
+ if rec.err != nil { |
+ p.err = rec.err |
+ break |
+ } |
+ |
+ if rec.truncateToggle != nil { |
+ p.truncateOn = *rec.truncateToggle |
+ } |
+ if rec.closedToggle != nil { |
+ p.closedOn = *rec.closedToggle |
+ } |
+ p.nextCommand(true) |
+ } |
+ |
+ if p.err != nil { |
+ return nil, p.err |
+ } |
+ |
+ // This is a data record. If we're configured to not yield it, leave it and |
+ // return nil. |
+ if p.truncateOn && (!c.truncate || (p.closedOn && !c.closed)) { |
+ return nil, nil |
+ } |
+ |
+ // Consume this record. |
+ rec := p.nextCommand(true) |
+ return &protocol.LogEntry{ |
+ Content: &protocol.LogEntry_Text{Text: &protocol.Text{ |
+ Lines: []*protocol.Text_Line{ |
+ {Value: string(rec.data)}, |
+ }, |
+ }}, |
+ }, nil |
+} |
+ |
+func (p *testParser) bufferedBytes() (r int64) { |
+ for _, rec := range p.commands { |
+ r += int64(len(rec.data)) |
+ } |
+ return |
+} |
+ |
+func (p *testParser) firstChunkTime() (time.Time, bool) { |
+ for _, c := range p.commands { |
+ if c.data != nil { |
+ return c.ts, true |
+ } |
+ } |
+ return time.Time{}, false |
+} |
+ |
+func TestStream(t *testing.T) { |
+ Convey(`A testing stream config`, t, func() { |
+ tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) |
+ tp := testParser{} |
+ c := streamConfig{ |
+ name: "test", |
+ parser: &tp, |
+ template: protocol.ButlerLogBundle_Entry{ |
+ Desc: &protocol.LogStreamDescriptor{ |
+ Prefix: "test-prefix", |
+ Name: "test", |
+ }, |
+ }, |
+ } |
+ |
+ Convey(`With a 64-byte maximum buffer and 1 second maximum duration`, func() { |
+ c.maximumBufferedBytes = 64 |
+ c.maximumBufferDuration = time.Second |
+ s := newStream(c) |
+ |
+ Convey(`Is not drained by default`, func() { |
+ So(s.isDrained(), ShouldBeFalse) |
+ |
+ Convey(`When closed, is drained.`, func() { |
+ s.Close() |
+ So(s.isDrained(), ShouldBeTrue) |
+ |
+ Convey(`When closed again, is still drained.`, func() { |
+ s.Close() |
+ So(s.isDrained(), ShouldBeTrue) |
+ }) |
+ }) |
+ }) |
+ |
+ Convey(`With no data, has no expiration time.`, func() { |
+ _, has := s.expireTime() |
+ So(has, ShouldBeFalse) |
+ }) |
+ |
+ Convey(`Append will ignore a 0-byte chunk.`, func() { |
+ d := data(tc.Now()) |
+ So(s.Append(d), ShouldBeNil) |
+ So(d.released, ShouldBeTrue) |
+ }) |
+ |
+ Convey(`Append will add two 32-byte chunks.`, func() { |
+ content := bytes.Repeat([]byte{0xAA}, 32) |
+ So(s.Append(data(tc.Now(), content...)), ShouldBeNil) |
+ So(s.Append(data(tc.Now(), content...)), ShouldBeNil) |
+ }) |
+ |
+ Convey(`Append will add a large chunk when there are no other Data blocks.`, func() { |
+ d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 128)...) |
+ So(s.Append(d), ShouldBeNil) |
+ |
+ Convey(`Will use that data's timestamp as expiration time.`, func() { |
+ t, has := s.expireTime() |
+ So(has, ShouldBeTrue) |
+ So(t.Equal(tc.Now().Add(time.Second)), ShouldBeTrue) |
+ }) |
+ }) |
+ |
+ Convey(`Append will block if the chunk exceeds the buffer size.`, func() { |
+ signalC := make(chan struct{}) |
+ s.c.onAppend = func(appended bool) { |
+ if !appended { |
+ // We're waiting. |
+ close(signalC) |
+ } |
+ } |
+ |
+ // Add one chunk so we don't hit the "only byte" condition. |
+ So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0xAA}, 34)...)), ShouldBeNil) |
+ |
+ // Wait until we get the signal that Append() will block, then consume |
+ // some data and unblock Append(). |
+ blocked := false |
+ go func() { |
+ <-signalC |
+ |
+ s.withParserLock(func() { |
+ tp.popData() |
+ }) |
+ blocked = true |
+ s.signalDataConsumed() |
+ }() |
+ |
+ // Add one chunk so we don't hit the "only byte" condition. |
+ So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0xBB}, 32)...)), ShouldBeNil) |
+ So(blocked, ShouldBeTrue) |
+ }) |
+ |
+ Convey(`Append in an error state`, func() { |
+ terr := errors.New("test error") |
+ |
+ Convey(`Will return the error state.`, func() { |
+ s.setAppendError(terr) |
+ |
+ d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 32)...) |
+ So(s.Append(d), ShouldEqual, terr) |
+ So(d.released, ShouldBeTrue) |
+ }) |
+ |
+ Convey(`Will block if the chunk exceeds buffer size, and return error state.`, func() { |
+ signalC := make(chan struct{}) |
+ s.c.onAppend = func(appended bool) { |
+ if !appended { |
+ // Waiting, notify our goroutine that we're going to be waiting. |
+ close(signalC) |
+ } |
+ } |
+ |
+ // Add one chunk so we don't hit the "only byte" condition. |
+ So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0xAA}, 34)...)), ShouldBeNil) |
+ |
+ // Wait until we get the signal that Append() will block, then consume |
+ // some data and unblock Append(). |
+ go func() { |
+ <-signalC |
+ s.setAppendError(terr) |
+ }() |
+ |
+ // Add one chunk so we don't hit the "only byte" condition. |
+ for _, sz := range []int{32, 1, 0} { |
+ d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, sz)...) |
+ So(s.Append(d), ShouldEqual, terr) |
+ So(d.released, ShouldBeTrue) |
+ } |
+ }) |
+ }) |
+ }) |
+ |
+ Convey(`When building bundle entries`, func() { |
+ bb := &builder{ |
+ size: 1024, |
+ } |
+ s := newStream(c) |
+ |
+ Convey(`Returns nil with no buffered data.`, func() { |
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
+ So(bb.bundle(), shouldHaveBundleEntries) |
+ }) |
+ |
+ Convey(`With a single record, returns that entry.`, func() { |
+ tp.tags(tc.Now(), "a", "b") |
+ |
+ So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
+ So(bb.bundle(), shouldHaveBundleEntries, "test:a:b") |
+ }) |
+ |
+ Convey(`When truncate is required, returns nil.`, func() { |
+ tp.tags(tc.Now(), "a", "b") |
+ tp.setTrunc(true) |
+ tp.tags(tc.Now(), "c") |
+ |
+ So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
+ So(bb.bundle(), shouldHaveBundleEntries, "test:a:b") |
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
+ |
+ So(s.nextBundleEntry(bb, true), ShouldBeTrue) |
+ So(bb.bundle(), shouldHaveBundleEntries, "test:a:b:c") |
+ }) |
+ |
+ Convey(`When an error occurs during stream parsing, drains stream.`, func() { |
+ So(s.isDrained(), ShouldBeFalse) |
+ tp.tags(tc.Now(), "a") |
+ tp.addError(errTestInduced) |
+ tp.tags(tc.Now(), "b") |
+ |
+ So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
+ So(s.isDrained(), ShouldBeTrue) |
+ So(bb.bundle(), shouldHaveBundleEntries, "+test:a") |
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
+ }) |
+ |
+ Convey(`With only an error, returns no bundle entry.`, func() { |
+ So(s.isDrained(), ShouldBeFalse) |
+ tp.addError(errTestInduced) |
+ tp.tags(tc.Now(), "a") |
+ tp.tags(tc.Now(), "b") |
+ |
+ So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
+ So(bb.bundle(), shouldHaveBundleEntries) |
+ So(s.isDrained(), ShouldBeTrue) |
+ }) |
+ }) |
+ }) |
+} |
+ |
+// TestStreamSmoke tests a Stream in an actual multi-goroutine workflow. |
+func TestStreamSmoke(t *testing.T) { |
+ Convey(`When running a smoke test`, t, func() { |
+ tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) |
+ tp := testParser{} |
+ c := streamConfig{ |
+ name: "test", |
+ parser: &tp, |
+ template: protocol.ButlerLogBundle_Entry{ |
+ Desc: &protocol.LogStreamDescriptor{ |
+ Prefix: "test-prefix", |
+ Name: "test", |
+ }, |
+ }, |
+ } |
+ s := newStream(c) |
+ |
+ // Appender goroutine, constantly appends data. |
+ // |
+ // This will be inherently throttled by the nextBundle consumption. |
+ go func() { |
+ defer s.Close() |
+ |
+ for i := 0; i < 512; i++ { |
+ s.Append(data(tc.Now(), []byte(fmt.Sprintf("%d", i))...)) |
+ tc.Add(time.Second) |
+ } |
+ }() |
+ |
+ // The consumer goroutine will consume bundles from the stream. |
+ consumerC := make(chan struct{}) |
+ bundleC := make(chan *protocol.ButlerLogBundle) |
+ for i := 0; i < 32; i++ { |
+ go func() { |
+ defer func() { |
+ consumerC <- struct{}{} |
+ }() |
+ |
+ b := (*builder)(nil) |
+ for !s.isDrained() { |
+ if b == nil { |
+ b = &builder{ |
+ size: 128, |
+ } |
+ } |
+ |
+ s.nextBundleEntry(b, false) |
+ if b.hasContent() { |
+ bundleC <- b.bundle() |
+ b = nil |
+ } else { |
+ // No content! Sleep for a second and check again. |
+ tc.Sleep(time.Second) |
+ } |
+ } |
+ }() |
+ } |
+ |
+ // Collect all bundles. |
+ gotIt := map[int]struct{}{} |
+ collectDoneC := make(chan struct{}) |
+ go func() { |
+ defer close(collectDoneC) |
+ |
+ for bundle := range bundleC { |
+ for _, be := range bundle.Entries { |
+ for _, le := range be.Logs { |
+ idx, _ := strconv.Atoi(logEntryName(le)) |
+ gotIt[idx] = struct{}{} |
+ } |
+ } |
+ } |
+ }() |
+ |
+ for i := 0; i < 32; i++ { |
+ <-consumerC |
+ } |
+ close(bundleC) |
+ |
+ // Did we get them all? |
+ <-collectDoneC |
+ for i := 0; i < 512; i++ { |
+ _, ok := gotIt[i] |
+ So(ok, ShouldBeTrue) |
+ } |
+ }) |
+} |