| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package coordinatorClient |
| 6 |
| 7 import ( |
| 8 "encoding/base64" |
| 9 "fmt" |
| 10 "net/http" |
| 11 |
| 12 "github.com/golang/protobuf/proto" |
| 13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1" |
| 14 "github.com/luci/luci-go/common/auth" |
| 15 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 "golang.org/x/net/context" |
| 19 "google.golang.org/api/googleapi" |
| 20 ) |
| 21 |
| 22 var ( |
| 23 // ServiceScopes is the set of OAuth scopes required to communicate with
this |
| 24 // service. |
| 25 ServiceScopes = []string{ |
| 26 auth.OAuthScopeEmail, |
| 27 } |
| 28 ) |
| 29 |
| 30 // State is a representation of the remote Coordinator state. |
| 31 type State struct { |
| 32 // Path is the stream path. |
| 33 Path types.StreamPath |
| 34 |
| 35 // Secret is the stream secret value. It must be included if Descriptor
is not |
| 36 // nil. |
| 37 Secret []byte |
| 38 |
| 39 // ProtoVersion is the protobuf version string. |
| 40 ProtoVersion string |
| 41 |
| 42 // Descriptor is the new stream state to push. If nil, no registration w
ill |
| 43 // occur. |
| 44 State *service.LogStreamState |
| 45 // Descriptor is the new stream state to push. If nil, no registration w
ill |
| 46 // occur. |
| 47 Descriptor *logpb.LogStreamDescriptor |
| 48 } |
| 49 |
| 50 func loadState(path string, secret string, desc string, state *service.LogStream
State) (*State, error) { |
| 51 s := State{ |
| 52 Path: types.StreamPath(path), |
| 53 State: state, |
| 54 } |
| 55 if err := s.Path.Validate(); err != nil { |
| 56 return nil, fmt.Errorf("failed to validate stream path: %v", err
) |
| 57 } |
| 58 |
| 59 if secret != "" { |
| 60 var err error |
| 61 s.Secret, err = base64.StdEncoding.DecodeString(secret) |
| 62 if err != nil { |
| 63 return nil, fmt.Errorf("failed to decode secret: %v", er
r) |
| 64 } |
| 65 } |
| 66 |
| 67 if desc != "" { |
| 68 d, err := base64.StdEncoding.DecodeString(desc) |
| 69 if err != nil { |
| 70 return nil, fmt.Errorf("failed to decode descriptor: %v"
, err) |
| 71 } |
| 72 |
| 73 lsd := logpb.LogStreamDescriptor{} |
| 74 if err := proto.Unmarshal(d, &lsd); err != nil { |
| 75 return nil, fmt.Errorf("failed to unmarshal descriptor:
%v", err) |
| 76 } |
| 77 s.Descriptor = &lsd |
| 78 } |
| 79 |
| 80 if state != nil { |
| 81 s.ProtoVersion = state.ProtoVersion |
| 82 } |
| 83 |
| 84 return &s, nil |
| 85 } |
| 86 |
| 87 // clientSideValidate performs a set of basic sanity checks to not waste time |
| 88 // on something the Coordinator is known to reject. |
| 89 func (s *State) clientSideValidate() error { |
| 90 // Let's do some client-side validation and not waste the server's time
if |
| 91 // something is obviously wrong! |
| 92 if err := s.Path.Validate(); err != nil { |
| 93 return err |
| 94 } |
| 95 if s.Secret == nil { |
| 96 return errors.New("missing stream secret") |
| 97 } |
| 98 if s.ProtoVersion == "" { |
| 99 return errors.New("missing protobuf version") |
| 100 } |
| 101 return nil |
| 102 } |
| 103 |
| 104 // ServiceConfig is the structure returned by the GetConfig service API call. |
| 105 type ServiceConfig struct { |
| 106 service.GetConfigResponse |
| 107 } |
| 108 |
| 109 // Archived returns true if the log stream is marked as archived. |
| 110 func (s *State) Archived() bool { |
| 111 if st := s.State; st != nil { |
| 112 return !(st.ArchiveDataURL == "" && st.ArchiveIndexURL == "" &&
st.ArchiveStreamURL == "") |
| 113 } |
| 114 return false |
| 115 } |
| 116 |
| 117 // Options is the set of options to supply to a new Client instance. |
| 118 type Options struct { |
| 119 // Client is the authenticated HTTP client to use. |
| 120 Client *http.Client |
| 121 |
| 122 // BasePath is the API base path. If empty, the generated endpoint defau
lt |
| 123 // base path will be used. |
| 124 // |
| 125 // This should not include the service endpoint, e.g.: |
| 126 // https://logdog.example.com/api/ |
| 127 BasePath string |
| 128 |
| 129 // UserAgent, if supplied, will be included in the user agent string for |
| 130 // endpoint requests. |
| 131 UserAgent string |
| 132 } |
| 133 |
| 134 // Client is a LogDog Coordinator client. |
| 135 // |
| 136 // Client methods will return an errors.Transient error if the failure is |
| 137 // considered transient. |
| 138 type Client struct { |
| 139 *Options |
| 140 |
| 141 svc *service.Service |
| 142 } |
| 143 |
| 144 // New returns a new production Client using the supplied authenticated HTTP |
| 145 // Client. |
| 146 func New(o Options) *Client { |
| 147 svc, err := service.New(o.Client) |
| 148 if err != nil { |
| 149 // This will only happen if the supplied Client is nil, which is
a bug. |
| 150 panic(err) |
| 151 } |
| 152 if o.BasePath != "" { |
| 153 svc.BasePath = fmt.Sprintf("%sservice/v1/", o.BasePath) |
| 154 } |
| 155 svc.UserAgent = o.UserAgent |
| 156 |
| 157 return &Client{ |
| 158 Options: &o, |
| 159 svc: svc, |
| 160 } |
| 161 } |
| 162 |
| 163 // GetConfig loads the service configuration from the Coordinator. |
| 164 func (c *Client) GetConfig(ctx context.Context) (*ServiceConfig, error) { |
| 165 // Retrieve the global configuration. |
| 166 gcfg, err := c.svc.GetConfig().Context(ctx).Do() |
| 167 if err != nil { |
| 168 return nil, translateError(err) |
| 169 } |
| 170 |
| 171 return &ServiceConfig{ |
| 172 GetConfigResponse: *gcfg, |
| 173 }, nil |
| 174 } |
| 175 |
| 176 // LoadStream loads the named stream parameters. |
| 177 func (c *Client) LoadStream(ctx context.Context, path types.StreamPath) (*State,
error) { |
| 178 if err := path.Validate(); err != nil { |
| 179 return nil, err |
| 180 } |
| 181 |
| 182 resp, err := c.svc.LoadStream(string(path)).Context(ctx).Do() |
| 183 if err != nil { |
| 184 return nil, translateError(err) |
| 185 } |
| 186 |
| 187 s, err := loadState(resp.Path, resp.Secret, resp.Descriptor, resp.State) |
| 188 if err != nil { |
| 189 return nil, err |
| 190 } |
| 191 return s, nil |
| 192 } |
| 193 |
| 194 // RegisterStream registers stream metadata with the Coordinator. The |
| 195 // Coordinator will respond with its own version of that State on success. |
| 196 // This is idempotent so long as the data is consistent, so it may be called |
| 197 // multiple times. |
| 198 func (c *Client) RegisterStream(ctx context.Context, s State) (*State, error) { |
| 199 if err := s.clientSideValidate(); err != nil { |
| 200 return nil, err |
| 201 } |
| 202 |
| 203 desc := []byte(nil) |
| 204 if s.Descriptor != nil { |
| 205 err := error(nil) |
| 206 desc, err = proto.Marshal(s.Descriptor) |
| 207 if err != nil { |
| 208 return nil, err |
| 209 } |
| 210 } |
| 211 |
| 212 // No point in including the Descriptor; clear it (if it's set). |
| 213 resp, err := c.svc.RegisterStream(&service.RegisterStreamRequest{ |
| 214 ProtoVersion: s.ProtoVersion, |
| 215 Descriptor: base64.StdEncoding.EncodeToString(desc), |
| 216 Path: string(s.Path), |
| 217 Secret: base64.StdEncoding.EncodeToString(s.Secret), |
| 218 }).Context(ctx).Do() |
| 219 if err != nil { |
| 220 return nil, translateError(err) |
| 221 } |
| 222 |
| 223 rs, err := loadState(resp.Path, resp.Secret, "", resp.State) |
| 224 if err != nil { |
| 225 return nil, err |
| 226 } |
| 227 rs.Descriptor = s.Descriptor |
| 228 return rs, nil |
| 229 } |
| 230 |
| 231 // TerminateStream registers the terminal index for the named stream. |
| 232 func (c *Client) TerminateStream(ctx context.Context, p types.StreamPath, s []by
te, tidx types.MessageIndex) error { |
| 233 if tidx < 0 { |
| 234 return errors.New("stream state has non-terminal index") |
| 235 } |
| 236 |
| 237 err := c.svc.TerminateStream(&service.TerminateStreamRequest{ |
| 238 Path: string(p), |
| 239 Secret: base64.StdEncoding.EncodeToString(s), |
| 240 TerminalIndex: int64(tidx), |
| 241 }).Context(ctx).Do() |
| 242 if err != nil { |
| 243 return translateError(err) |
| 244 } |
| 245 return nil |
| 246 } |
| 247 |
| 248 func translateError(err error) error { |
| 249 if gerr, ok := err.(*googleapi.Error); ok { |
| 250 // Auth and server errors are considered transient. |
| 251 switch { |
| 252 case gerr.Code == http.StatusUnauthorized: |
| 253 fallthrough |
| 254 case gerr.Code == http.StatusForbidden: |
| 255 fallthrough |
| 256 case gerr.Code >= http.StatusInternalServerError: |
| 257 return errors.WrapTransient(err) |
| 258 } |
| 259 |
| 260 return err |
| 261 } |
| 262 |
| 263 // Not a Google API error. Assume it's transient. |
| 264 return errors.WrapTransient(err) |
| 265 } |
| OLD | NEW |