Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(430)

Unified Diff: server/internal/logdog/collector/streamstatecache_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/streamstatecache_test.go
diff --git a/server/internal/logdog/collector/streamstatecache_test.go b/server/internal/logdog/collector/streamstatecache_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..48ae6b68a9248b181642518a8e7194ba9c00d0f8
--- /dev/null
+++ b/server/internal/logdog/collector/streamstatecache_test.go
@@ -0,0 +1,255 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package collector
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logdog/types"
+ cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
+ "golang.org/x/net/context"
+
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+func TestStreamStateCache(t *testing.T) {
+ t.Parallel()
+
+ Convey(`Using a test configuration`, t, func() {
+ c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
+ tcc := testCoordinatorClient{}
+
+ o := streamStateCacheOptions{
+ coordinator: &tcc,
+ cacheSize: 4,
+ expiration: 1 * time.Second,
+ }
+
+ st := cc.State{
+ Path: "foo/+/bar",
+ State: &service.LogStreamState{
+ TerminalIndex: -1,
+ },
+ }
+
+ // Note: In all of these tests, we check if "proto" field (ProtoVersion)
+ // is "remote". We use ProtoVersion as a channel between our fake remote
+ // service. When our fake remote service returns a LogStreamState, it sets
+ // "remote" to true to differentiate it from the local pushed state.
+ //
+ // If a LogStreamState has "remote" set to true, that implies that it was
+ // sent by the fake testing service rather than the local test.
+ Convey(`A streamStateCache`, func() {
+ ssc := newStreamStateCache(o)
+
+ resultC := make(chan *stateProxy)
+ req := func(s cc.State) {
+ var res *stateProxy
+ defer func() {
+ resultC <- res
+ }()
+
+ st, err := ssc.getOrRegister(c, &s)
+ if err == nil {
+ res = st
+ }
+ }
+
+ Convey(`Can register a stream`, func() {
+ s, err := ssc.getOrRegister(c, &st)
+ So(err, ShouldBeNil)
+ So(s.proto, ShouldEqual, "remote")
+ So(tcc.calls, ShouldEqual, 1)
+
+ Convey(`Will not re-register the same stream.`, func() {
+ st.ProtoVersion = ""
+
+ s, err := ssc.getOrRegister(c, &st)
+ So(err, ShouldBeNil)
+ So(s.proto, ShouldEqual, "remote")
+ So(tcc.calls, ShouldEqual, 1)
+ })
+
+ Convey(`When the registration expires`, func() {
+ st.ProtoVersion = ""
+ tc.Add(time.Second)
+
+ Convey(`Will re-register the stream.`, func() {
+ s, err := ssc.getOrRegister(c, &st)
+ So(err, ShouldBeNil)
+ So(s.proto, ShouldEqual, "remote")
+ So(tcc.calls, ShouldEqual, 2)
+ })
+ })
+
+ Convey(`Can terminate a registered stream`, func() {
+ s.terminalIndex = 1337
+ So(ssc.setTerminalIndex(c, s), ShouldBeNil)
+ So(tcc.calls, ShouldEqual, 2) // +1 call
+
+ Convey(`Registering the stream will include the terminal index.`, func() {
+ // Fill it in with junk to make sure we are getting cached.
+ st.State.TerminalIndex = 123
+ st.ProtoVersion = ""
+
+ s, err := ssc.getOrRegister(c, &st)
+ So(err, ShouldBeNil)
+ So(s.proto, ShouldEqual, "remote")
+ So(s.terminalIndex, ShouldEqual, 1337)
+ So(tcc.calls, ShouldEqual, 2) // No additional calls.
+ })
+ })
+ })
+
+ Convey(`When the terminal index is set before the fetch finishes, it will be returned.`, func() {
+ tcc.callC = make(chan struct{})
+ tcc.errC = make(chan error)
+
+ go req(st)
+
+ // Wait for our request to block on RegisterStream.
+ <-tcc.callC
+
+ // Set the terminal index. We will use a minimal stateProxy. We know
+ // that this will happen after the streamStateCacheEntry is registered
+ // because both block on the LRU cache's Mutate, which is atomic, and
+ // getOrRegister must have added the streamStateCacheEntry in order for
+ // the lock to be available for setTerminalIndex to proceed.
+ terminalErrC := make(chan error)
+ go func() {
+ terminalErrC <- ssc.setTerminalIndex(c, &stateProxy{
+ path: st.Path,
+ terminalIndex: 1337,
+ })
+ }()
+
+ // Let both requests succeed.
+ <-tcc.callC
+ tcc.errC <- nil
+ tcc.errC <- nil
+
+ // Read the stateProxy from our getOrRegister request.
+ s := <-resultC
+ So(s, ShouldNotBeNil)
+ So(s.terminalIndex, ShouldEqual, 1337)
+ })
+
+ Convey(`When multiple goroutines register the same stream, it gets registered once.`, func() {
+ tcc.callC = make(chan struct{})
+ tcc.errC = make(chan error)
+
+ errs := make(errors.MultiError, 256)
+ for i := 0; i < len(errs); i++ {
+ go req(st)
+ }
+
+ <-tcc.callC
+ tcc.errC <- nil
+ for i := 0; i < len(errs); i++ {
+ <-resultC
+ }
+
+ So(errors.SingleError(errs), ShouldBeNil)
+ So(tcc.calls, ShouldEqual, 1)
+ })
+
+ Convey(`Multiple registrations for the same stream will result in two requests if the first expires.`, func() {
+ tcc.callC = make(chan struct{})
+ tcc.errC = make(chan error)
+
+ // First request.
+ go req(st)
+
+ // Wait for the request to happen, then advance time past the request's
+ // expiration.
+ <-tcc.callC
+ tc.Add(time.Second)
+
+ // Second request.
+ go req(st)
+
+ // Release both calls and reap the results.
+ <-tcc.callC
+ tcc.errC <- nil
+ tcc.errC <- nil
+
+ r1 := <-resultC
+ r2 := <-resultC
+
+ So(r1.proto, ShouldEqual, "remote")
+ So(r2.proto, ShouldEqual, "remote")
+ So(tcc.calls, ShouldEqual, 2)
+ })
+
+ Convey(`A registration error will result in a getOrRegister error.`, func() {
+ tcc.errC = make(chan error, 1)
+ tcc.errC <- errors.New("test error")
+
+ _, err := ssc.getOrRegister(c, &st)
+ So(err, ShouldNotBeNil)
+ So(tcc.calls, ShouldEqual, 1)
+
+ Convey(`A second registration without error will make a new request.`, func() {
+ tcc.errC = nil
+
+ _, err := ssc.getOrRegister(c, &st)
+ So(err, ShouldBeNil)
+ So(tcc.calls, ShouldEqual, 2)
+ })
+ })
+ })
+
+ Convey(`A streamStateCache can register multiple streams at once.`, func() {
+ tcc.callC = make(chan struct{})
+ tcc.errC = make(chan error)
+ ssc := newStreamStateCache(o)
+
+ count := 256
+ wg := sync.WaitGroup{}
+ errs := make(errors.MultiError, count)
+ state := make([]*stateProxy, count)
+ wg.Add(count)
+ for i := 0; i < count; i++ {
+ st := st
+ st.Path = types.StreamPath(fmt.Sprintf("foo/+/bar%d", i))
+
+ go func(i int) {
+ defer wg.Done()
+ state[i], errs[i] = ssc.getOrRegister(c, &st)
+ }(i)
+ }
+
+ // Wait for all of them to simultaneously call.
+ for i := 0; i < count; i++ {
+ <-tcc.callC
+ }
+
+ // They're all blocked on errC; allow them to continue.
+ for i := 0; i < count; i++ {
+ tcc.errC <- nil
+ }
+
+ // Wait for them to finish.
+ wg.Wait()
+
+ // Confirm that all registered successfully.
+ So(errors.SingleError(errs), ShouldBeNil)
+
+ remotes := 0
+ for i := 0; i < count; i++ {
+ if state[i].proto == "remote" {
+ remotes++
+ }
+ }
+ So(remotes, ShouldEqual, count)
+ })
+ })
+}

Powered by Google App Engine
This is Rietveld 408576698