Chromium Code Reviews| Index: server/internal/logdog/collector/collector_test.go |
| diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..4aaa68c4478e1a1e1614f3992e4918b61440a31d |
| --- /dev/null |
| +++ b/server/internal/logdog/collector/collector_test.go |
| @@ -0,0 +1,490 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package collector |
| + |
| +import ( |
| + "bytes" |
| + "fmt" |
| + "sort" |
| + "strings" |
| + "sync/atomic" |
| + "testing" |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/clock/testclock" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logdog/butlerproto" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + "github.com/luci/luci-go/common/proto/google" |
| + "github.com/luci/luci-go/common/proto/logdog/logpb" |
| + "github.com/luci/luci-go/server/logdog/storage" |
| + "github.com/luci/luci-go/server/logdog/storage/memory" |
| + "golang.org/x/net/context" |
| + |
| + . "github.com/smartystreets/goconvey/convey" |
| +) |
| + |
| +var testSecret = bytes.Repeat([]byte{0x55}, types.StreamSecretLength) |
| + |
| +type bundleBuilder struct { |
|
martiniss
2016/01/27 22:19:44
What does this do? Build bundles I guess, but what
dnj
2016/01/29 20:46:52
Done.
|
| + context.Context |
| + |
| + base time.Time |
| + entries []*logpb.ButlerLogBundle_Entry |
| +} |
| + |
| +func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { |
| + if b.base.IsZero() { |
| + b.base = clock.Now(b) |
|
martiniss
2016/01/27 22:19:44
This confused me at first, but b is a context.
dnj
2016/01/29 20:46:52
Acknowledged.
|
| + } |
| + |
| + b.entries = append(b.entries, be) |
| +} |
| + |
| +func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logpb.ButlerLogBundle_Entry { |
| + p, n := types.StreamPath(name).Split() |
| + be := logpb.ButlerLogBundle_Entry{ |
| + Secret: testSecret, |
| + Desc: &logpb.LogStreamDescriptor{ |
| + Prefix: string(p), |
| + Name: string(n), |
| + ContentType: "application/test-message", |
| + StreamType: logpb.LogStreamDescriptor_TEXT, |
| + Timestamp: google.NewTimestamp(clock.Now(b)), |
| + }, |
| + } |
| + |
| + if len(idxs) > 0 { |
| + be.Logs = make([]*logpb.LogEntry, len(idxs)) |
| + for i, idx := range idxs { |
| + be.Logs[i] = b.logEntry(idx) |
| + } |
| + if tidx >= 0 { |
| + be.Terminal = true |
| + be.TerminalIndex = uint64(tidx) |
| + } |
| + } |
| + |
| + return &be |
| +} |
| + |
| +func (b *bundleBuilder) addStreamEntries(name string, term int, idxs ...int) { |
| + b.addBundleEntry(b.genBundleEntry(name, term, idxs...)) |
| +} |
| + |
| +func (b *bundleBuilder) addFullStream(name string, count int) { |
| + idxs := make([]int, count) |
| + for i := range idxs { |
| + idxs[i] = i |
| + } |
| + b.addStreamEntries(name, count-1, idxs...) |
| +} |
| + |
| +func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry { |
| + return &logpb.LogEntry{ |
| + StreamIndex: uint64(idx), |
| + Sequence: uint64(idx), |
| + Content: &logpb.LogEntry_Text{ |
| + Text: &logpb.Text{ |
| + Lines: []*logpb.Text_Line{ |
| + { |
| + Value: fmt.Sprintf("Line #%d", idx), |
| + Delimiter: "\n", |
| + }, |
| + }, |
| + }, |
| + }, |
| + } |
| +} |
| + |
| +func (b *bundleBuilder) bundle() []byte { |
| + bytes := b.bundleWithEntries(b.entries...) |
| + b.entries = nil |
| + return bytes |
| +} |
| + |
| +func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []byte { |
| + bundle := logpb.ButlerLogBundle{ |
| + Source: "test stream", |
| + Timestamp: google.NewTimestamp(clock.Now(b)), |
| + Entries: e, |
| + } |
| + |
| + buf := bytes.Buffer{} |
| + w := butlerproto.Writer{Compress: true} |
| + if err := w.Write(&buf, &bundle); err != nil { |
| + panic(err) |
| + } |
| + return buf.Bytes() |
| +} |
| + |
| +type indexRange struct { |
| + start int |
| + end int |
| +} |
| + |
| +func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r.end) } |
| + |
| +// shouldHaveRegisteredStream asserts that a testCoordinatorClient has |
| +// registered a stream (string) and its terminal index (int). |
| +func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) string { |
| + tcc := actual.(*testCoordinatorClient) |
| + name := expected[0].(string) |
| + tidx := expected[1].(int) |
| + |
| + cur, ok := tcc.stream(name) |
| + if !ok { |
| + return fmt.Sprintf("stream %q is not registered", name) |
| + } |
| + if tidx >= 0 && cur < 0 { |
| + return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", name) |
| + } |
| + if cur >= 0 && tidx < 0 { |
| + return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", name) |
| + } |
| + return "" |
| +} |
| + |
| +// shoudNotHaveRegisteredStream asserts that a testCoordinatorClient has not |
| +// registered a stream (string). |
| +func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string { |
| + tcc := actual.(*testCoordinatorClient) |
| + name := expected[0].(string) |
| + |
| + if _, ok := tcc.stream(name); ok { |
| + return fmt.Sprintf("stream %q is registered, but it shoult NOT be.", name) |
|
martiniss
2016/01/27 22:19:43
typo
dnj
2016/01/29 20:46:52
Done.
|
| + } |
| + return "" |
| +} |
| + |
| +// shouldHaveStoredStream asserts that a storage.Storage instance has contiguous |
| +// stream records in it. |
| +// |
| +// actual is the storage.Storage instance. expected is a stream name (string) |
| +// followed by a a series of records to assert. This can either be a specific |
| +// integer index or an intexRange marking a closed range of indices. |
| +func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string { |
| + st := actual.(storage.Storage) |
| + name := expected[0].(string) |
| + |
| + // Load all entries for this stream. |
| + req := storage.GetRequest{ |
| + Path: types.StreamPath(name), |
| + } |
| + |
| + entries := make(map[int]*logpb.LogEntry) |
| + var ierr error |
| + err := st.Get(&req, func(idx types.MessageIndex, d []byte) bool { |
| + le := logpb.LogEntry{} |
| + if ierr = proto.Unmarshal(d, &le); ierr != nil { |
| + return false |
| + } |
| + entries[int(idx)] = &le |
| + return true |
| + }) |
| + if ierr != nil { |
| + err = ierr |
| + } |
| + if err != nil && err != storage.ErrDoesNotExist { |
| + return fmt.Sprintf("error: %v", err) |
| + } |
| + |
| + assertLogEntry := func(i int) string { |
| + le := entries[i] |
| + if le == nil { |
| + return fmt.Sprintf("%d", i) |
| + } |
| + delete(entries, i) |
| + |
| + if le.StreamIndex != uint64(i) { |
| + return fmt.Sprintf("*%d", i) |
| + } |
| + return "" |
| + } |
| + |
| + var failed []string |
| + for _, exp := range expected[1:] { |
| + switch e := exp.(type) { |
| + case int: |
| + if err := assertLogEntry(e); err != "" { |
| + failed = append(failed, err) |
| + } |
| + |
| + case indexRange: |
| + var errs []string |
| + for i := e.start; i <= e.end; i++ { |
| + if err := assertLogEntry(i); err != "" { |
| + errs = append(errs, err) |
| + } |
| + } |
| + if len(errs) > 0 { |
| + failed = append(failed, fmt.Sprintf("%s{%s}", e.String(), strings.Join(errs, ","))) |
| + } |
| + |
| + default: |
| + panic(fmt.Errorf("unknown expected type %T", e)) |
| + } |
| + } |
| + |
| + // Extras? |
| + if len(entries) > 0 { |
| + idxs := make([]int, 0, len(entries)) |
| + for i := range entries { |
| + idxs = append(idxs, i) |
| + } |
| + sort.Ints(idxs) |
| + |
| + extra := make([]string, len(idxs)) |
| + for i, idx := range idxs { |
| + extra[i] = fmt.Sprintf("%d", idx) |
| + } |
| + failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(extra, ","))) |
| + } |
| + |
| + if len(failed) > 0 { |
| + return strings.Join(failed, ", ") |
| + } |
| + return "" |
| +} |
| + |
| +// TestCollector runs through a series of end-to-end Collector workflows and |
| +// ensures that the Collector behaves appropriately. |
| +func TestCollector(t *testing.T) { |
| + t.Parallel() |
| + |
| + Convey(`Using a test configuration`, t, func() { |
| + c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal) |
| + |
| + tcc := &testCoordinatorClient{} |
| + st := &testStorage{Storage: &memory.Storage{}} |
| + |
| + coll := New(Options{ |
| + Storage: st, |
| + Coordinator: tcc, |
| + }) |
| + |
| + bb := bundleBuilder{ |
| + Context: c, |
| + } |
| + |
| + Convey(`Can process multiple single full streams from a Butler bundle.`, func() { |
| + bb.addFullStream("foo/+/bar", 128) |
| + bb.addFullStream("foo/+/baz", 256) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 127}) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255) |
| + So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 255}) |
| + }) |
| + |
| + Convey(`Will return an error if a transient error happened while registering.`, func() { |
| + tcc.errC = make(chan error, 1) |
| + tcc.errC <- errors.WrapTransient(errors.New("test error")) |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + err := coll.Process(c, bb.bundle()) |
| + So(err, ShouldNotBeNil) |
| + }) |
| + |
| + Convey(`Will not return an error if a non-transient error happened while registering.`, func() { |
| + tcc.errC = make(chan error, 1) |
| + tcc.errC <- errors.New("test error") |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + }) |
| + |
| + Convey(`Will return an error if a transient error happened while terminating.`, func() { |
| + tcc.errC = make(chan error, 2) |
| + tcc.errC <- nil // Register |
| + tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + So(coll.Process(c, bb.bundle()), ShouldNotBeNil) |
| + }) |
| + |
| + Convey(`Will not return an error if a non-transient error happened while terminating.`, func() { |
| + tcc.errC = make(chan error, 2) |
| + tcc.errC <- nil // Register |
| + tcc.errC <- errors.New("test error") // Terminate |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + }) |
| + |
| + Convey(`Will return an error if a transient error happened on storage.`, func() { |
| + // Single transient error. |
| + count := int32(0) |
| + st.err = func() error { |
| + if atomic.AddInt32(&count, 1) == 1 { |
| + return errors.WrapTransient(errors.New("test error")) |
| + } |
| + return nil |
| + } |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + So(coll.Process(c, bb.bundle()), ShouldNotBeNil) |
| + }) |
| + |
| + Convey(`Will not return an error if a non-transient error happened on storage.`, func() { |
| + // Single non-transient error. |
| + count := int32(0) |
| + st.err = func() error { |
| + if atomic.AddInt32(&count, 1) == 1 { |
| + return errors.New("test error") |
| + } |
| + return nil |
| + } |
| + |
| + bb.addFullStream("foo/+/bar", 128) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + }) |
| + |
| + Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() { |
| + be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8) |
| + be.Desc.ContentType = "" // Missing ContentType => invalid. |
| + |
| + bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5) |
| + bb.addBundleEntry(be) |
| + bb.addFullStream("foo/+/bar", 32) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31}) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3, 5) |
| + }) |
| + |
| + Convey(`Will drop streams with missing secrets.`, func() { |
| + be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2) |
| + be.Secret = nil |
| + bb.addBundleEntry(be) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127) |
| + }) |
| + |
| + Convey(`Will drop messages with mismatching secrets.`, func() { |
| + bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + // Push another bundle with a different secret. |
| + be := bb.genBundleEntry("foo/+/bar", 4, 3, 4) |
| + be.Secret = bytes.Repeat([]byte{0xAA}, types.StreamSecretLength) |
| + be.TerminalIndex = 1337 |
| + bb.addBundleEntry(be) |
| + bb.addFullStream("foo/+/baz", 3) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 2}) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2) |
| + So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 2}) |
| + }) |
| + |
| + Convey(`Will return no error if the data has a corrupt bundle header.`, func() { |
| + So(coll.Process(c, []byte{0x00}), ShouldBeNil) |
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will drop bundles with unknown ProtoVersion string.`, func() { |
| + buf := bytes.Buffer{} |
| + w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"} |
| + w.Write(&buf, &logpb.ButlerLogBundle{}) |
| + |
| + So(coll.Process(c, buf.Bytes()), ShouldBeNil) |
| + |
| + So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will drop records beyond a local terminal index.`, func() { |
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + bb.addStreamEntries("foo/+/bar", 4, 3, 5) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3}) |
| + }) |
| + |
| + Convey(`Will not ingest records beyond a remote terminal index.`, func() { |
| + tcc.register(stateProxy{ |
| + path: "foo/+/bar", |
| + secret: testSecret, |
| + terminalIndex: 3, |
| + }) |
| + |
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2) |
| + bb.addStreamEntries("foo/+/bar", 4, 3, 5) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3}) |
| + }) |
| + |
| + Convey(`Will not ingest records if the stream is archived.`, func() { |
| + tcc.register(stateProxy{ |
| + path: "foo/+/bar", |
| + secret: testSecret, |
| + terminalIndex: -1, |
| + archived: true, |
| + }) |
| + |
| + bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will not ingest records if the stream is purged.`, func() { |
| + tcc.register(stateProxy{ |
| + path: "foo/+/bar", |
| + secret: testSecret, |
| + terminalIndex: -1, |
| + purged: true, |
| + }) |
| + |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1) |
| + So(st, shouldHaveStoredStream, "foo/+/bar") |
| + }) |
| + |
| + Convey(`Will not ingest a bundle with no bundle entries.`, func() { |
| + So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil) |
| + }) |
| + |
| + Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() { |
| + be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4) |
| + |
| + // Add a binary log entry. This does NOT match the text descriptor, and |
| + // should fail validation. |
| + be.Logs = append(be.Logs, &logpb.LogEntry{ |
| + StreamIndex: 2, |
| + Sequence: 2, |
| + Content: &logpb.LogEntry_Binary{ |
| + &logpb.Binary{ |
| + Data: []byte{0xd0, 0x6f, 0x00, 0xd5}, |
| + }, |
| + }, |
| + }) |
| + bb.addBundleEntry(be) |
| + So(coll.Process(c, bb.bundle()), ShouldBeNil) |
| + |
| + So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4) |
| + So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4) |
| + }) |
| + }) |
| +} |