| Index: server/internal/logdog/collector/utils_test.go
|
| diff --git a/server/internal/logdog/collector/utils_test.go b/server/internal/logdog/collector/utils_test.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d86435e72afe64f00e80520363f82093005945ae
|
| --- /dev/null
|
| +++ b/server/internal/logdog/collector/utils_test.go
|
| @@ -0,0 +1,145 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package collector
|
| +
|
| +import (
|
| + "fmt"
|
| + "sync"
|
| + "sync/atomic"
|
| +
|
| + "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
|
| + "github.com/luci/luci-go/common/logdog/types"
|
| + cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
|
| + "github.com/luci/luci-go/server/logdog/storage"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +// testCoordinatorClient is an implementation of CoordinatorClient that can be
|
| +// used for testing.
|
| +type testCoordinatorClient struct {
|
| + sync.Mutex
|
| +
|
| + // calls is the number of calls made to the interface's methods.
|
| + calls int32
|
| + // callC, if not nil, will have a token pushed to it when a call is made.
|
| + callC chan struct{}
|
| + // errC is a channel that error status will be read from if not nil.
|
| + errC chan error
|
| +
|
| + // state is the latest tracked stream state.
|
| + state map[string]*stateProxy
|
| +}
|
| +
|
| +func (c *testCoordinatorClient) register(s stateProxy) stateProxy {
|
| + c.Lock()
|
| + defer c.Unlock()
|
| +
|
| + // Update our state.
|
| + if c.state == nil {
|
| + c.state = make(map[string]*stateProxy)
|
| + }
|
| + if sp := c.state[string(s.path)]; sp != nil {
|
| + return *sp
|
| + }
|
| + c.state[string(s.path)] = &s
|
| + return s
|
| +}
|
| +
|
| +func (c *testCoordinatorClient) RegisterStream(ctx context.Context, s cc.State) (*cc.State, error) {
|
| + if err := c.incCalls(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + // Update our state.
|
| + sp := c.register(stateProxy{
|
| + path: s.Path,
|
| + proto: s.ProtoVersion,
|
| + secret: s.Secret,
|
| + terminalIndex: -1,
|
| + archived: s.Archived(),
|
| + purged: (s.State != nil && s.State.Purged),
|
| + })
|
| +
|
| + // Set the ProtoVersion to differentiate the output State from the input.
|
| + rs := cc.State{
|
| + Path: s.Path,
|
| + ProtoVersion: "remote",
|
| + Secret: s.Secret,
|
| + Descriptor: s.Descriptor,
|
| + State: &service.LogStreamState{
|
| + TerminalIndex: int64(sp.terminalIndex),
|
| + Purged: sp.purged,
|
| + },
|
| + }
|
| + if sp.archived {
|
| + rs.State.ArchiveStreamURL = "something so we are marked as archived"
|
| + }
|
| + return &rs, nil
|
| +}
|
| +
|
| +func (c *testCoordinatorClient) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error {
|
| + if err := c.incCalls(); err != nil {
|
| + return err
|
| + }
|
| +
|
| + c.Lock()
|
| + defer c.Unlock()
|
| +
|
| + // Update our state.
|
| + sp, ok := c.state[string(p)]
|
| + if !ok {
|
| + return fmt.Errorf("no such stream: %v", p)
|
| + }
|
| + if sp.terminalIndex >= 0 && idx != sp.terminalIndex {
|
| + return fmt.Errorf("incompatible terminal indexes: %d != %d", idx, sp.terminalIndex)
|
| + }
|
| +
|
| + sp.terminalIndex = idx
|
| + return nil
|
| +}
|
| +
|
| +// incCalls is an entry point for client goroutines. It offers the opportunity
|
| +// to track call count as well as trap executing goroutines within client calls.
|
| +//
|
| +// This must not be called while the lock is held, else it could lead to
|
| +// deadlock if multiple goroutines are trapped.
|
| +func (c *testCoordinatorClient) incCalls() error {
|
| + if c.callC != nil {
|
| + c.callC <- struct{}{}
|
| + }
|
| +
|
| + atomic.AddInt32(&c.calls, 1)
|
| +
|
| + if c.errC != nil {
|
| + return <-c.errC
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func (c *testCoordinatorClient) stream(name string) (int, bool) {
|
| + c.Lock()
|
| + defer c.Unlock()
|
| +
|
| + sp, ok := c.state[name]
|
| + if !ok {
|
| + return 0, false
|
| + }
|
| + return int(sp.terminalIndex), true
|
| +}
|
| +
|
| +// testStorage is a testing storage instance that returns errors.
|
| +type testStorage struct {
|
| + storage.Storage
|
| + err func() error
|
| +}
|
| +
|
| +func (s *testStorage) Put(r *storage.PutRequest) error {
|
| + if s.err != nil {
|
| + if err := s.err(); err != nil {
|
| + return err
|
| + }
|
| + }
|
| + return s.Storage.Put(r)
|
| +}
|
|
|