| Index: server/internal/logdog/collector/coordinator.go
|
| diff --git a/server/internal/logdog/collector/coordinator.go b/server/internal/logdog/collector/coordinator.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..abe1b8341cbf86dfc74bab4d03ce61ccdae716c7
|
| --- /dev/null
|
| +++ b/server/internal/logdog/collector/coordinator.go
|
| @@ -0,0 +1,81 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package collector
|
| +
|
| +import (
|
| + "time"
|
| +
|
| + "github.com/luci/luci-go/common/logdog/types"
|
| + log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/retry"
|
| + cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +// CoordinatorClient is an interface to the methods in the Coordinator service
|
| +// that the Collector uses.
|
| +//
|
| +// cc.Client implements this interface.
|
| +type CoordinatorClient interface {
|
| + // RegisterStream registers a stream's state with the Coordinator. On success,
|
| + // it will return the Coordinator's view of the stream state.
|
| + //
|
| + // This operation is idempotent. It may return an errors.Transient error if
|
| + // a transient failure happened.
|
| + RegisterStream(context.Context, cc.State) (*cc.State, error)
|
| +
|
| + // TerminateStream registers the stream's terminal index with the Coordinator.
|
| + // If successful, the terminal index was successfully registered.
|
| + //
|
| + // This operation is idempotent. It may return an errors.Transient error if
|
| + // a transient failure happened.
|
| + TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error
|
| +}
|
| +
|
| +// RetryCoordinatorClient wraps a CoordinatorClient, retrying transient errors.
|
| +type RetryCoordinatorClient struct {
|
| + // Client is the CoordinatorClient that is being wrapped.
|
| + Client CoordinatorClient
|
| +
|
| + // G is the retry.Generator to use to generate retry.Iterator instances. If
|
| + // nil, retry.Default will be used.
|
| + F retry.Factory
|
| +}
|
| +
|
| +// RegisterStream implements CoordinatorClient.
|
| +func (c *RetryCoordinatorClient) RegisterStream(ctx context.Context, st cc.State) (rst *cc.State, err error) {
|
| + err = retry.Retry(ctx, c.retryFactory(), func() (err error) {
|
| + rst, err = c.Client.RegisterStream(ctx, st)
|
| + return
|
| + }, func(err error, d time.Duration) {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "delay": d,
|
| + }.Warningf(ctx, "Transient error registering stream. Retrying...")
|
| + })
|
| + return
|
| +}
|
| +
|
| +// TerminateStream implements CoordinatorClient.
|
| +func (c *RetryCoordinatorClient) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error {
|
| + return retry.Retry(ctx, c.retryFactory(), func() error {
|
| + return c.Client.TerminateStream(ctx, p, s, idx)
|
| + }, func(err error, d time.Duration) {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "delay": d,
|
| + }.Warningf(ctx, "Transient error terminating stream. Retrying...")
|
| + })
|
| +}
|
| +
|
| +func (c *RetryCoordinatorClient) retryFactory() retry.Factory {
|
| + var f retry.Factory
|
| + if c.F != nil {
|
| + f = c.F
|
| + } else {
|
| + f = retry.Default
|
| + }
|
| + return retry.TransientOnly(f)
|
| +}
|
|
|