| Index: server/internal/logdog/collector/streamstatecache_test.go
|
| diff --git a/server/internal/logdog/collector/streamstatecache_test.go b/server/internal/logdog/collector/streamstatecache_test.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..48ae6b68a9248b181642518a8e7194ba9c00d0f8
|
| --- /dev/null
|
| +++ b/server/internal/logdog/collector/streamstatecache_test.go
|
| @@ -0,0 +1,255 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package collector
|
| +
|
| +import (
|
| + "fmt"
|
| + "sync"
|
| + "testing"
|
| + "time"
|
| +
|
| + "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
|
| + "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/logdog/types"
|
| + cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
|
| + "golang.org/x/net/context"
|
| +
|
| + . "github.com/smartystreets/goconvey/convey"
|
| +)
|
| +
|
| +func TestStreamStateCache(t *testing.T) {
|
| + t.Parallel()
|
| +
|
| + Convey(`Using a test configuration`, t, func() {
|
| + c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
|
| + tcc := testCoordinatorClient{}
|
| +
|
| + o := streamStateCacheOptions{
|
| + coordinator: &tcc,
|
| + cacheSize: 4,
|
| + expiration: 1 * time.Second,
|
| + }
|
| +
|
| + st := cc.State{
|
| + Path: "foo/+/bar",
|
| + State: &service.LogStreamState{
|
| + TerminalIndex: -1,
|
| + },
|
| + }
|
| +
|
| + // Note: In all of these tests, we check if "proto" field (ProtoVersion)
|
| + // is "remote". We use ProtoVersion as a channel between our fake remote
|
| + // service. When our fake remote service returns a LogStreamState, it sets
|
| + // "remote" to true to differentiate it from the local pushed state.
|
| + //
|
| + // If a LogStreamState has "remote" set to true, that implies that it was
|
| + // sent by the fake testing service rather than the local test.
|
| + Convey(`A streamStateCache`, func() {
|
| + ssc := newStreamStateCache(o)
|
| +
|
| + resultC := make(chan *stateProxy)
|
| + req := func(s cc.State) {
|
| + var res *stateProxy
|
| + defer func() {
|
| + resultC <- res
|
| + }()
|
| +
|
| + st, err := ssc.getOrRegister(c, &s)
|
| + if err == nil {
|
| + res = st
|
| + }
|
| + }
|
| +
|
| + Convey(`Can register a stream`, func() {
|
| + s, err := ssc.getOrRegister(c, &st)
|
| + So(err, ShouldBeNil)
|
| + So(s.proto, ShouldEqual, "remote")
|
| + So(tcc.calls, ShouldEqual, 1)
|
| +
|
| + Convey(`Will not re-register the same stream.`, func() {
|
| + st.ProtoVersion = ""
|
| +
|
| + s, err := ssc.getOrRegister(c, &st)
|
| + So(err, ShouldBeNil)
|
| + So(s.proto, ShouldEqual, "remote")
|
| + So(tcc.calls, ShouldEqual, 1)
|
| + })
|
| +
|
| + Convey(`When the registration expires`, func() {
|
| + st.ProtoVersion = ""
|
| + tc.Add(time.Second)
|
| +
|
| + Convey(`Will re-register the stream.`, func() {
|
| + s, err := ssc.getOrRegister(c, &st)
|
| + So(err, ShouldBeNil)
|
| + So(s.proto, ShouldEqual, "remote")
|
| + So(tcc.calls, ShouldEqual, 2)
|
| + })
|
| + })
|
| +
|
| + Convey(`Can terminate a registered stream`, func() {
|
| + s.terminalIndex = 1337
|
| + So(ssc.setTerminalIndex(c, s), ShouldBeNil)
|
| + So(tcc.calls, ShouldEqual, 2) // +1 call
|
| +
|
| + Convey(`Registering the stream will include the terminal index.`, func() {
|
| + // Fill it in with junk to make sure we are getting cached.
|
| + st.State.TerminalIndex = 123
|
| + st.ProtoVersion = ""
|
| +
|
| + s, err := ssc.getOrRegister(c, &st)
|
| + So(err, ShouldBeNil)
|
| + So(s.proto, ShouldEqual, "remote")
|
| + So(s.terminalIndex, ShouldEqual, 1337)
|
| + So(tcc.calls, ShouldEqual, 2) // No additional calls.
|
| + })
|
| + })
|
| + })
|
| +
|
| + Convey(`When the terminal index is set before the fetch finishes, it will be returned.`, func() {
|
| + tcc.callC = make(chan struct{})
|
| + tcc.errC = make(chan error)
|
| +
|
| + go req(st)
|
| +
|
| + // Wait for our request to block on RegisterStream.
|
| + <-tcc.callC
|
| +
|
| + // Set the terminal index. We will use a minimal stateProxy. We know
|
| + // that this will happen after the streamStateCacheEntry is registered
|
| + // because both block on the LRU cache's Mutate, which is atomic, and
|
| + // getOrRegister must have added the streamStateCacheEntry in order for
|
| + // the lock to be available for setTerminalIndex to proceed.
|
| + terminalErrC := make(chan error)
|
| + go func() {
|
| + terminalErrC <- ssc.setTerminalIndex(c, &stateProxy{
|
| + path: st.Path,
|
| + terminalIndex: 1337,
|
| + })
|
| + }()
|
| +
|
| + // Let both requests succeed.
|
| + <-tcc.callC
|
| + tcc.errC <- nil
|
| + tcc.errC <- nil
|
| +
|
| + // Read the stateProxy from our getOrRegister request.
|
| + s := <-resultC
|
| + So(s, ShouldNotBeNil)
|
| + So(s.terminalIndex, ShouldEqual, 1337)
|
| + })
|
| +
|
| + Convey(`When multiple goroutines register the same stream, it gets registered once.`, func() {
|
| + tcc.callC = make(chan struct{})
|
| + tcc.errC = make(chan error)
|
| +
|
| + errs := make(errors.MultiError, 256)
|
| + for i := 0; i < len(errs); i++ {
|
| + go req(st)
|
| + }
|
| +
|
| + <-tcc.callC
|
| + tcc.errC <- nil
|
| + for i := 0; i < len(errs); i++ {
|
| + <-resultC
|
| + }
|
| +
|
| + So(errors.SingleError(errs), ShouldBeNil)
|
| + So(tcc.calls, ShouldEqual, 1)
|
| + })
|
| +
|
| + Convey(`Multiple registrations for the same stream will result in two requests if the first expires.`, func() {
|
| + tcc.callC = make(chan struct{})
|
| + tcc.errC = make(chan error)
|
| +
|
| + // First request.
|
| + go req(st)
|
| +
|
| + // Wait for the request to happen, then advance time past the request's
|
| + // expiration.
|
| + <-tcc.callC
|
| + tc.Add(time.Second)
|
| +
|
| + // Second request.
|
| + go req(st)
|
| +
|
| + // Release both calls and reap the results.
|
| + <-tcc.callC
|
| + tcc.errC <- nil
|
| + tcc.errC <- nil
|
| +
|
| + r1 := <-resultC
|
| + r2 := <-resultC
|
| +
|
| + So(r1.proto, ShouldEqual, "remote")
|
| + So(r2.proto, ShouldEqual, "remote")
|
| + So(tcc.calls, ShouldEqual, 2)
|
| + })
|
| +
|
| + Convey(`A registration error will result in a getOrRegister error.`, func() {
|
| + tcc.errC = make(chan error, 1)
|
| + tcc.errC <- errors.New("test error")
|
| +
|
| + _, err := ssc.getOrRegister(c, &st)
|
| + So(err, ShouldNotBeNil)
|
| + So(tcc.calls, ShouldEqual, 1)
|
| +
|
| + Convey(`A second registration without error will make a new request.`, func() {
|
| + tcc.errC = nil
|
| +
|
| + _, err := ssc.getOrRegister(c, &st)
|
| + So(err, ShouldBeNil)
|
| + So(tcc.calls, ShouldEqual, 2)
|
| + })
|
| + })
|
| + })
|
| +
|
| + Convey(`A streamStateCache can register multiple streams at once.`, func() {
|
| + tcc.callC = make(chan struct{})
|
| + tcc.errC = make(chan error)
|
| + ssc := newStreamStateCache(o)
|
| +
|
| + count := 256
|
| + wg := sync.WaitGroup{}
|
| + errs := make(errors.MultiError, count)
|
| + state := make([]*stateProxy, count)
|
| + wg.Add(count)
|
| + for i := 0; i < count; i++ {
|
| + st := st
|
| + st.Path = types.StreamPath(fmt.Sprintf("foo/+/bar%d", i))
|
| +
|
| + go func(i int) {
|
| + defer wg.Done()
|
| + state[i], errs[i] = ssc.getOrRegister(c, &st)
|
| + }(i)
|
| + }
|
| +
|
| + // Wait for all of them to simultaneously call.
|
| + for i := 0; i < count; i++ {
|
| + <-tcc.callC
|
| + }
|
| +
|
| + // They're all blocked on errC; allow them to continue.
|
| + for i := 0; i < count; i++ {
|
| + tcc.errC <- nil
|
| + }
|
| +
|
| + // Wait for them to finish.
|
| + wg.Wait()
|
| +
|
| + // Confirm that all registered successfully.
|
| + So(errors.SingleError(errs), ShouldBeNil)
|
| +
|
| + remotes := 0
|
| + for i := 0; i < count; i++ {
|
| + if state[i].proto == "remote" {
|
| + remotes++
|
| + }
|
| + }
|
| + So(remotes, ShouldEqual, count)
|
| + })
|
| + })
|
| +}
|
|
|