| Index: server/internal/logdog/coordinatorClient/client.go
|
| diff --git a/server/internal/logdog/coordinatorClient/client.go b/server/internal/logdog/coordinatorClient/client.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..68a2d8fcb62302c0fee703591658864db9e8481d
|
| --- /dev/null
|
| +++ b/server/internal/logdog/coordinatorClient/client.go
|
| @@ -0,0 +1,265 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package coordinatorClient
|
| +
|
| +import (
|
| + "encoding/base64"
|
| + "fmt"
|
| + "net/http"
|
| +
|
| + "github.com/golang/protobuf/proto"
|
| + "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
|
| + "github.com/luci/luci-go/common/auth"
|
| + "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/logdog/types"
|
| + "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| + "golang.org/x/net/context"
|
| + "google.golang.org/api/googleapi"
|
| +)
|
| +
|
| +var (
|
| + // ServiceScopes is the set of OAuth scopes required to communicate with this
|
| + // service.
|
| + ServiceScopes = []string{
|
| + auth.OAuthScopeEmail,
|
| + }
|
| +)
|
| +
|
| +// State is a representation of the remote Coordinator state.
|
| +type State struct {
|
| + // Path is the stream path.
|
| + Path types.StreamPath
|
| +
|
| + // Secret is the stream secret value. It must be included if Descriptor is not
|
| + // nil.
|
| + Secret []byte
|
| +
|
| + // ProtoVersion is the protobuf version string.
|
| + ProtoVersion string
|
| +
|
| + // Descriptor is the new stream state to push. If nil, no registration will
|
| + // occur.
|
| + State *service.LogStreamState
|
| + // Descriptor is the new stream state to push. If nil, no registration will
|
| + // occur.
|
| + Descriptor *logpb.LogStreamDescriptor
|
| +}
|
| +
|
| +func loadState(path string, secret string, desc string, state *service.LogStreamState) (*State, error) {
|
| + s := State{
|
| + Path: types.StreamPath(path),
|
| + State: state,
|
| + }
|
| + if err := s.Path.Validate(); err != nil {
|
| + return nil, fmt.Errorf("failed to validate stream path: %v", err)
|
| + }
|
| +
|
| + if secret != "" {
|
| + var err error
|
| + s.Secret, err = base64.StdEncoding.DecodeString(secret)
|
| + if err != nil {
|
| + return nil, fmt.Errorf("failed to decode secret: %v", err)
|
| + }
|
| + }
|
| +
|
| + if desc != "" {
|
| + d, err := base64.StdEncoding.DecodeString(desc)
|
| + if err != nil {
|
| + return nil, fmt.Errorf("failed to decode descriptor: %v", err)
|
| + }
|
| +
|
| + lsd := logpb.LogStreamDescriptor{}
|
| + if err := proto.Unmarshal(d, &lsd); err != nil {
|
| + return nil, fmt.Errorf("failed to unmarshal descriptor: %v", err)
|
| + }
|
| + s.Descriptor = &lsd
|
| + }
|
| +
|
| + if state != nil {
|
| + s.ProtoVersion = state.ProtoVersion
|
| + }
|
| +
|
| + return &s, nil
|
| +}
|
| +
|
| +// clientSideValidate performs a set of basic sanity checks to not waste time
|
| +// on something the Coordinator is known to reject.
|
| +func (s *State) clientSideValidate() error {
|
| + // Let's do some client-side validation and not waste the server's time if
|
| + // something is obviously wrong!
|
| + if err := s.Path.Validate(); err != nil {
|
| + return err
|
| + }
|
| + if s.Secret == nil {
|
| + return errors.New("missing stream secret")
|
| + }
|
| + if s.ProtoVersion == "" {
|
| + return errors.New("missing protobuf version")
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +// ServiceConfig is the structure returned by the GetConfig service API call.
|
| +type ServiceConfig struct {
|
| + service.GetConfigResponse
|
| +}
|
| +
|
| +// Archived returns true if the log stream is marked as archived.
|
| +func (s *State) Archived() bool {
|
| + if st := s.State; st != nil {
|
| + return !(st.ArchiveDataURL == "" && st.ArchiveIndexURL == "" && st.ArchiveStreamURL == "")
|
| + }
|
| + return false
|
| +}
|
| +
|
| +// Options is the set of options to supply to a new Client instance.
|
| +type Options struct {
|
| + // Client is the authenticated HTTP client to use.
|
| + Client *http.Client
|
| +
|
| + // BasePath is the API base path. If empty, the generated endpoint default
|
| + // base path will be used.
|
| + //
|
| + // This should not include the service endpoint, e.g.:
|
| + // https://logdog.example.com/api/
|
| + BasePath string
|
| +
|
| + // UserAgent, if supplied, will be included in the user agent string for
|
| + // endpoint requests.
|
| + UserAgent string
|
| +}
|
| +
|
| +// Client is a LogDog Coordinator client.
|
| +//
|
| +// Client methods will return an errors.Transient error if the failure is
|
| +// considered transient.
|
| +type Client struct {
|
| + *Options
|
| +
|
| + svc *service.Service
|
| +}
|
| +
|
| +// New returns a new production Client using the supplied authenticated HTTP
|
| +// Client.
|
| +func New(o Options) *Client {
|
| + svc, err := service.New(o.Client)
|
| + if err != nil {
|
| + // This will only happen if the supplied Client is nil, which is a bug.
|
| + panic(err)
|
| + }
|
| + if o.BasePath != "" {
|
| + svc.BasePath = fmt.Sprintf("%sservice/v1/", o.BasePath)
|
| + }
|
| + svc.UserAgent = o.UserAgent
|
| +
|
| + return &Client{
|
| + Options: &o,
|
| + svc: svc,
|
| + }
|
| +}
|
| +
|
| +// GetConfig loads the service configuration from the Coordinator.
|
| +func (c *Client) GetConfig(ctx context.Context) (*ServiceConfig, error) {
|
| + // Retrieve the global configuration.
|
| + gcfg, err := c.svc.GetConfig().Context(ctx).Do()
|
| + if err != nil {
|
| + return nil, translateError(err)
|
| + }
|
| +
|
| + return &ServiceConfig{
|
| + GetConfigResponse: *gcfg,
|
| + }, nil
|
| +}
|
| +
|
| +// LoadStream loads the named stream parameters.
|
| +func (c *Client) LoadStream(ctx context.Context, path types.StreamPath) (*State, error) {
|
| + if err := path.Validate(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + resp, err := c.svc.LoadStream(string(path)).Context(ctx).Do()
|
| + if err != nil {
|
| + return nil, translateError(err)
|
| + }
|
| +
|
| + s, err := loadState(resp.Path, resp.Secret, resp.Descriptor, resp.State)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return s, nil
|
| +}
|
| +
|
| +// RegisterStream registers stream metadata with the Coordinator. The
|
| +// Coordinator will respond with its own version of that State on success.
|
| +// This is idempotent so long as the data is consistent, so it may be called
|
| +// multiple times.
|
| +func (c *Client) RegisterStream(ctx context.Context, s State) (*State, error) {
|
| + if err := s.clientSideValidate(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + desc := []byte(nil)
|
| + if s.Descriptor != nil {
|
| + err := error(nil)
|
| + desc, err = proto.Marshal(s.Descriptor)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + }
|
| +
|
| + // No point in including the Descriptor; clear it (if it's set).
|
| + resp, err := c.svc.RegisterStream(&service.RegisterStreamRequest{
|
| + ProtoVersion: s.ProtoVersion,
|
| + Descriptor: base64.StdEncoding.EncodeToString(desc),
|
| + Path: string(s.Path),
|
| + Secret: base64.StdEncoding.EncodeToString(s.Secret),
|
| + }).Context(ctx).Do()
|
| + if err != nil {
|
| + return nil, translateError(err)
|
| + }
|
| +
|
| + rs, err := loadState(resp.Path, resp.Secret, "", resp.State)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + rs.Descriptor = s.Descriptor
|
| + return rs, nil
|
| +}
|
| +
|
| +// TerminateStream registers the terminal index for the named stream.
|
| +func (c *Client) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, tidx types.MessageIndex) error {
|
| + if tidx < 0 {
|
| + return errors.New("stream state has non-terminal index")
|
| + }
|
| +
|
| + err := c.svc.TerminateStream(&service.TerminateStreamRequest{
|
| + Path: string(p),
|
| + Secret: base64.StdEncoding.EncodeToString(s),
|
| + TerminalIndex: int64(tidx),
|
| + }).Context(ctx).Do()
|
| + if err != nil {
|
| + return translateError(err)
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func translateError(err error) error {
|
| + if gerr, ok := err.(*googleapi.Error); ok {
|
| + // Auth and server errors are considered transient.
|
| + switch {
|
| + case gerr.Code == http.StatusUnauthorized:
|
| + fallthrough
|
| + case gerr.Code == http.StatusForbidden:
|
| + fallthrough
|
| + case gerr.Code >= http.StatusInternalServerError:
|
| + return errors.WrapTransient(err)
|
| + }
|
| +
|
| + return err
|
| + }
|
| +
|
| + // Not a Google API error. Assume it's transient.
|
| + return errors.WrapTransient(err)
|
| +}
|
|
|