Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Unified Diff: server/internal/logdog/collector/collector_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/collector_test.go
diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..4aaa68c4478e1a1e1614f3992e4918b61440a31d
--- /dev/null
+++ b/server/internal/logdog/collector/collector_test.go
@@ -0,0 +1,490 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package collector
+
+import (
+ "bytes"
+ "fmt"
+ "sort"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logdog/butlerproto"
+ "github.com/luci/luci-go/common/logdog/types"
+ "github.com/luci/luci-go/common/proto/google"
+ "github.com/luci/luci-go/common/proto/logdog/logpb"
+ "github.com/luci/luci-go/server/logdog/storage"
+ "github.com/luci/luci-go/server/logdog/storage/memory"
+ "golang.org/x/net/context"
+
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+var testSecret = bytes.Repeat([]byte{0x55}, types.StreamSecretLength)
+
+type bundleBuilder struct {
martiniss 2016/01/27 22:19:44 What does this do? Build bundles I guess, but what
dnj 2016/01/29 20:46:52 Done.
+ context.Context
+
+ base time.Time
+ entries []*logpb.ButlerLogBundle_Entry
+}
+
+func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
+ if b.base.IsZero() {
+ b.base = clock.Now(b)
martiniss 2016/01/27 22:19:44 This confused me at first, but b is a context.
dnj 2016/01/29 20:46:52 Acknowledged.
+ }
+
+ b.entries = append(b.entries, be)
+}
+
+func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logpb.ButlerLogBundle_Entry {
+ p, n := types.StreamPath(name).Split()
+ be := logpb.ButlerLogBundle_Entry{
+ Secret: testSecret,
+ Desc: &logpb.LogStreamDescriptor{
+ Prefix: string(p),
+ Name: string(n),
+ ContentType: "application/test-message",
+ StreamType: logpb.LogStreamDescriptor_TEXT,
+ Timestamp: google.NewTimestamp(clock.Now(b)),
+ },
+ }
+
+ if len(idxs) > 0 {
+ be.Logs = make([]*logpb.LogEntry, len(idxs))
+ for i, idx := range idxs {
+ be.Logs[i] = b.logEntry(idx)
+ }
+ if tidx >= 0 {
+ be.Terminal = true
+ be.TerminalIndex = uint64(tidx)
+ }
+ }
+
+ return &be
+}
+
+func (b *bundleBuilder) addStreamEntries(name string, term int, idxs ...int) {
+ b.addBundleEntry(b.genBundleEntry(name, term, idxs...))
+}
+
+func (b *bundleBuilder) addFullStream(name string, count int) {
+ idxs := make([]int, count)
+ for i := range idxs {
+ idxs[i] = i
+ }
+ b.addStreamEntries(name, count-1, idxs...)
+}
+
+func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry {
+ return &logpb.LogEntry{
+ StreamIndex: uint64(idx),
+ Sequence: uint64(idx),
+ Content: &logpb.LogEntry_Text{
+ Text: &logpb.Text{
+ Lines: []*logpb.Text_Line{
+ {
+ Value: fmt.Sprintf("Line #%d", idx),
+ Delimiter: "\n",
+ },
+ },
+ },
+ },
+ }
+}
+
+func (b *bundleBuilder) bundle() []byte {
+ bytes := b.bundleWithEntries(b.entries...)
+ b.entries = nil
+ return bytes
+}
+
+func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []byte {
+ bundle := logpb.ButlerLogBundle{
+ Source: "test stream",
+ Timestamp: google.NewTimestamp(clock.Now(b)),
+ Entries: e,
+ }
+
+ buf := bytes.Buffer{}
+ w := butlerproto.Writer{Compress: true}
+ if err := w.Write(&buf, &bundle); err != nil {
+ panic(err)
+ }
+ return buf.Bytes()
+}
+
+type indexRange struct {
+ start int
+ end int
+}
+
+func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r.end) }
+
+// shouldHaveRegisteredStream asserts that a testCoordinatorClient has
+// registered a stream (string) and its terminal index (int).
+func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
+ tcc := actual.(*testCoordinatorClient)
+ name := expected[0].(string)
+ tidx := expected[1].(int)
+
+ cur, ok := tcc.stream(name)
+ if !ok {
+ return fmt.Sprintf("stream %q is not registered", name)
+ }
+ if tidx >= 0 && cur < 0 {
+ return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", name)
+ }
+ if cur >= 0 && tidx < 0 {
+ return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", name)
+ }
+ return ""
+}
+
+// shoudNotHaveRegisteredStream asserts that a testCoordinatorClient has not
+// registered a stream (string).
+func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
+ tcc := actual.(*testCoordinatorClient)
+ name := expected[0].(string)
+
+ if _, ok := tcc.stream(name); ok {
+ return fmt.Sprintf("stream %q is registered, but it shoult NOT be.", name)
martiniss 2016/01/27 22:19:43 typo
dnj 2016/01/29 20:46:52 Done.
+ }
+ return ""
+}
+
+// shouldHaveStoredStream asserts that a storage.Storage instance has contiguous
+// stream records in it.
+//
+// actual is the storage.Storage instance. expected is a stream name (string)
+// followed by a a series of records to assert. This can either be a specific
+// integer index or an intexRange marking a closed range of indices.
+func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string {
+ st := actual.(storage.Storage)
+ name := expected[0].(string)
+
+ // Load all entries for this stream.
+ req := storage.GetRequest{
+ Path: types.StreamPath(name),
+ }
+
+ entries := make(map[int]*logpb.LogEntry)
+ var ierr error
+ err := st.Get(&req, func(idx types.MessageIndex, d []byte) bool {
+ le := logpb.LogEntry{}
+ if ierr = proto.Unmarshal(d, &le); ierr != nil {
+ return false
+ }
+ entries[int(idx)] = &le
+ return true
+ })
+ if ierr != nil {
+ err = ierr
+ }
+ if err != nil && err != storage.ErrDoesNotExist {
+ return fmt.Sprintf("error: %v", err)
+ }
+
+ assertLogEntry := func(i int) string {
+ le := entries[i]
+ if le == nil {
+ return fmt.Sprintf("%d", i)
+ }
+ delete(entries, i)
+
+ if le.StreamIndex != uint64(i) {
+ return fmt.Sprintf("*%d", i)
+ }
+ return ""
+ }
+
+ var failed []string
+ for _, exp := range expected[1:] {
+ switch e := exp.(type) {
+ case int:
+ if err := assertLogEntry(e); err != "" {
+ failed = append(failed, err)
+ }
+
+ case indexRange:
+ var errs []string
+ for i := e.start; i <= e.end; i++ {
+ if err := assertLogEntry(i); err != "" {
+ errs = append(errs, err)
+ }
+ }
+ if len(errs) > 0 {
+ failed = append(failed, fmt.Sprintf("%s{%s}", e.String(), strings.Join(errs, ",")))
+ }
+
+ default:
+ panic(fmt.Errorf("unknown expected type %T", e))
+ }
+ }
+
+ // Extras?
+ if len(entries) > 0 {
+ idxs := make([]int, 0, len(entries))
+ for i := range entries {
+ idxs = append(idxs, i)
+ }
+ sort.Ints(idxs)
+
+ extra := make([]string, len(idxs))
+ for i, idx := range idxs {
+ extra[i] = fmt.Sprintf("%d", idx)
+ }
+ failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(extra, ",")))
+ }
+
+ if len(failed) > 0 {
+ return strings.Join(failed, ", ")
+ }
+ return ""
+}
+
+// TestCollector runs through a series of end-to-end Collector workflows and
+// ensures that the Collector behaves appropriately.
+func TestCollector(t *testing.T) {
+ t.Parallel()
+
+ Convey(`Using a test configuration`, t, func() {
+ c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
+
+ tcc := &testCoordinatorClient{}
+ st := &testStorage{Storage: &memory.Storage{}}
+
+ coll := New(Options{
+ Storage: st,
+ Coordinator: tcc,
+ })
+
+ bb := bundleBuilder{
+ Context: c,
+ }
+
+ Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
+ bb.addFullStream("foo/+/bar", 128)
+ bb.addFullStream("foo/+/baz", 256)
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 127)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 127})
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 255)
+ So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 255})
+ })
+
+ Convey(`Will return an error if a transient error happened while registering.`, func() {
+ tcc.errC = make(chan error, 1)
+ tcc.errC <- errors.WrapTransient(errors.New("test error"))
+
+ bb.addFullStream("foo/+/bar", 128)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ })
+
+ Convey(`Will not return an error if a non-transient error happened while registering.`, func() {
+ tcc.errC = make(chan error, 1)
+ tcc.errC <- errors.New("test error")
+
+ bb.addFullStream("foo/+/bar", 128)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ })
+
+ Convey(`Will return an error if a transient error happened while terminating.`, func() {
+ tcc.errC = make(chan error, 2)
+ tcc.errC <- nil // Register
+ tcc.errC <- errors.WrapTransient(errors.New("test error")) // Terminate
+
+ bb.addFullStream("foo/+/bar", 128)
+ So(coll.Process(c, bb.bundle()), ShouldNotBeNil)
+ })
+
+ Convey(`Will not return an error if a non-transient error happened while terminating.`, func() {
+ tcc.errC = make(chan error, 2)
+ tcc.errC <- nil // Register
+ tcc.errC <- errors.New("test error") // Terminate
+
+ bb.addFullStream("foo/+/bar", 128)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ })
+
+ Convey(`Will return an error if a transient error happened on storage.`, func() {
+ // Single transient error.
+ count := int32(0)
+ st.err = func() error {
+ if atomic.AddInt32(&count, 1) == 1 {
+ return errors.WrapTransient(errors.New("test error"))
+ }
+ return nil
+ }
+
+ bb.addFullStream("foo/+/bar", 128)
+ So(coll.Process(c, bb.bundle()), ShouldNotBeNil)
+ })
+
+ Convey(`Will not return an error if a non-transient error happened on storage.`, func() {
+ // Single non-transient error.
+ count := int32(0)
+ st.err = func() error {
+ if atomic.AddInt32(&count, 1) == 1 {
+ return errors.New("test error")
+ }
+ return nil
+ }
+
+ bb.addFullStream("foo/+/bar", 128)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ })
+
+ Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
+ be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8)
+ be.Desc.ContentType = "" // Missing ContentType => invalid.
+
+ bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5)
+ bb.addBundleEntry(be)
+ bb.addFullStream("foo/+/bar", 32)
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31})
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1)
+ So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3, 5)
+ })
+
+ Convey(`Will drop streams with missing secrets.`, func() {
+ be := bb.genBundleEntry("foo/+/trash", 2, 0, 1, 2)
+ be.Secret = nil
+ bb.addBundleEntry(be)
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127)
+ })
+
+ Convey(`Will drop messages with mismatching secrets.`, func() {
+ bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ // Push another bundle with a different secret.
+ be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
+ be.Secret = bytes.Repeat([]byte{0xAA}, types.StreamSecretLength)
+ be.TerminalIndex = 1337
+ bb.addBundleEntry(be)
+ bb.addFullStream("foo/+/baz", 3)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 2})
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/baz", 2)
+ So(st, shouldHaveStoredStream, "foo/+/baz", indexRange{0, 2})
+ })
+
+ Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
+ So(coll.Process(c, []byte{0x00}), ShouldBeNil)
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
+ })
+
+ Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
+ buf := bytes.Buffer{}
+ w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
+ w.Write(&buf, &logpb.ButlerLogBundle{})
+
+ So(coll.Process(c, buf.Bytes()), ShouldBeNil)
+
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
+ })
+
+ Convey(`Will drop records beyond a local terminal index.`, func() {
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ bb.addStreamEntries("foo/+/bar", 4, 3, 5)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
+ })
+
+ Convey(`Will not ingest records beyond a remote terminal index.`, func() {
+ tcc.register(stateProxy{
+ path: "foo/+/bar",
+ secret: testSecret,
+ terminalIndex: 3,
+ })
+
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2)
+ bb.addStreamEntries("foo/+/bar", 4, 3, 5)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3)
+ So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
+ })
+
+ Convey(`Will not ingest records if the stream is archived.`, func() {
+ tcc.register(stateProxy{
+ path: "foo/+/bar",
+ secret: testSecret,
+ terminalIndex: -1,
+ archived: true,
+ })
+
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "foo/+/bar")
+ })
+
+ Convey(`Will not ingest records if the stream is purged.`, func() {
+ tcc.register(stateProxy{
+ path: "foo/+/bar",
+ secret: testSecret,
+ terminalIndex: -1,
+ purged: true,
+ })
+
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
+ So(st, shouldHaveStoredStream, "foo/+/bar")
+ })
+
+ Convey(`Will not ingest a bundle with no bundle entries.`, func() {
+ So(coll.Process(c, bb.bundleWithEntries()), ShouldBeNil)
+ })
+
+ Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
+ be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
+
+ // Add a binary log entry. This does NOT match the text descriptor, and
+ // should fail validation.
+ be.Logs = append(be.Logs, &logpb.LogEntry{
+ StreamIndex: 2,
+ Sequence: 2,
+ Content: &logpb.LogEntry_Binary{
+ &logpb.Binary{
+ Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
+ },
+ },
+ })
+ bb.addBundleEntry(be)
+ So(coll.Process(c, bb.bundle()), ShouldBeNil)
+
+ So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
+ So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4)
+ })
+ })
+}

Powered by Google App Engine
This is Rietveld 408576698