Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Side by Side Diff: server/internal/logdog/coordinatorClient/client.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinatorClient
6
7 import (
8 "encoding/base64"
9 "fmt"
10 "net/http"
11
12 "github.com/golang/protobuf/proto"
13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
14 "github.com/luci/luci-go/common/auth"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "golang.org/x/net/context"
19 "google.golang.org/api/googleapi"
20 )
21
22 var (
23 // ServiceScopes is the set of OAuth scopes required to communicate with this
24 // service.
25 ServiceScopes = []string{
26 auth.OAuthScopeEmail,
27 }
28 )
29
30 // State is a representation of the remote Coordinator state.
31 type State struct {
32 // Path is the stream path.
33 Path types.StreamPath
34
35 // Secret is the stream secret value. It must be included if Descriptor is not
36 // nil.
37 Secret []byte
38
39 // ProtoVersion is the protobuf version string.
40 ProtoVersion string
41
42 // Descriptor is the new stream state to push. If nil, no registration w ill
43 // occur.
44 State *service.LogStreamState
45 // Descriptor is the new stream state to push. If nil, no registration w ill
46 // occur.
47 Descriptor *logpb.LogStreamDescriptor
48 }
49
50 func loadState(path string, secret string, desc string, state *service.LogStream State) (*State, error) {
51 s := State{
52 Path: types.StreamPath(path),
53 State: state,
54 }
55 if err := s.Path.Validate(); err != nil {
56 return nil, fmt.Errorf("failed to validate stream path: %v", err )
57 }
58
59 if secret != "" {
60 var err error
61 s.Secret, err = base64.StdEncoding.DecodeString(secret)
62 if err != nil {
63 return nil, fmt.Errorf("failed to decode secret: %v", er r)
64 }
65 }
66
67 if desc != "" {
68 d, err := base64.StdEncoding.DecodeString(desc)
69 if err != nil {
70 return nil, fmt.Errorf("failed to decode descriptor: %v" , err)
71 }
72
73 lsd := logpb.LogStreamDescriptor{}
74 if err := proto.Unmarshal(d, &lsd); err != nil {
75 return nil, fmt.Errorf("failed to unmarshal descriptor: %v", err)
76 }
77 s.Descriptor = &lsd
78 }
79
80 if state != nil {
81 s.ProtoVersion = state.ProtoVersion
82 }
83
84 return &s, nil
85 }
86
87 // clientSideValidate performs a set of basic sanity checks to not waste time
88 // on something the Coordinator is known to reject.
89 func (s *State) clientSideValidate() error {
90 // Let's do some client-side validation and not waste the server's time if
91 // something is obviously wrong!
92 if err := s.Path.Validate(); err != nil {
93 return err
94 }
95 if s.Secret == nil {
96 return errors.New("missing stream secret")
97 }
98 if s.ProtoVersion == "" {
99 return errors.New("missing protobuf version")
100 }
101 return nil
102 }
103
104 // ServiceConfig is the structure returned by the GetConfig service API call.
105 type ServiceConfig struct {
106 service.GetConfigResponse
107 }
108
109 // Archived returns true if the log stream is marked as archived.
110 func (s *State) Archived() bool {
111 if st := s.State; st != nil {
112 return !(st.ArchiveDataURL == "" && st.ArchiveIndexURL == "" && st.ArchiveStreamURL == "")
113 }
114 return false
115 }
116
117 // Options is the set of options to supply to a new Client instance.
118 type Options struct {
119 // Client is the authenticated HTTP client to use.
120 Client *http.Client
121
122 // BasePath is the API base path. If empty, the generated endpoint defau lt
123 // base path will be used.
124 //
125 // This should not include the service endpoint, e.g.:
126 // https://logdog.example.com/api/
127 BasePath string
128
129 // UserAgent, if supplied, will be included in the user agent string for
130 // endpoint requests.
131 UserAgent string
132 }
133
134 // Client is a LogDog Coordinator client.
135 //
136 // Client methods will return an errors.Transient error if the failure is
137 // considered transient.
138 type Client struct {
139 *Options
140
141 svc *service.Service
142 }
143
144 // New returns a new production Client using the supplied authenticated HTTP
145 // Client.
146 func New(o Options) *Client {
147 svc, err := service.New(o.Client)
148 if err != nil {
149 // This will only happen if the supplied Client is nil, which is a bug.
150 panic(err)
151 }
152 if o.BasePath != "" {
153 svc.BasePath = fmt.Sprintf("%sservice/v1/", o.BasePath)
154 }
155 svc.UserAgent = o.UserAgent
156
157 return &Client{
158 Options: &o,
159 svc: svc,
160 }
161 }
162
163 // GetConfig loads the service configuration from the Coordinator.
164 func (c *Client) GetConfig(ctx context.Context) (*ServiceConfig, error) {
165 // Retrieve the global configuration.
166 gcfg, err := c.svc.GetConfig().Context(ctx).Do()
167 if err != nil {
168 return nil, translateError(err)
169 }
170
171 return &ServiceConfig{
172 GetConfigResponse: *gcfg,
173 }, nil
174 }
175
176 // LoadStream loads the named stream parameters.
177 func (c *Client) LoadStream(ctx context.Context, path types.StreamPath) (*State, error) {
178 if err := path.Validate(); err != nil {
179 return nil, err
180 }
181
182 resp, err := c.svc.LoadStream(string(path)).Context(ctx).Do()
183 if err != nil {
184 return nil, translateError(err)
185 }
186
187 s, err := loadState(resp.Path, resp.Secret, resp.Descriptor, resp.State)
188 if err != nil {
189 return nil, err
190 }
191 return s, nil
192 }
193
194 // RegisterStream registers stream metadata with the Coordinator. The
195 // Coordinator will respond with its own version of that State on success.
196 // This is idempotent so long as the data is consistent, so it may be called
197 // multiple times.
198 func (c *Client) RegisterStream(ctx context.Context, s State) (*State, error) {
199 if err := s.clientSideValidate(); err != nil {
200 return nil, err
201 }
202
203 desc := []byte(nil)
204 if s.Descriptor != nil {
205 err := error(nil)
206 desc, err = proto.Marshal(s.Descriptor)
207 if err != nil {
208 return nil, err
209 }
210 }
211
212 // No point in including the Descriptor; clear it (if it's set).
213 resp, err := c.svc.RegisterStream(&service.RegisterStreamRequest{
214 ProtoVersion: s.ProtoVersion,
215 Descriptor: base64.StdEncoding.EncodeToString(desc),
216 Path: string(s.Path),
217 Secret: base64.StdEncoding.EncodeToString(s.Secret),
218 }).Context(ctx).Do()
219 if err != nil {
220 return nil, translateError(err)
221 }
222
223 rs, err := loadState(resp.Path, resp.Secret, "", resp.State)
224 if err != nil {
225 return nil, err
226 }
227 rs.Descriptor = s.Descriptor
228 return rs, nil
229 }
230
231 // TerminateStream registers the terminal index for the named stream.
232 func (c *Client) TerminateStream(ctx context.Context, p types.StreamPath, s []by te, tidx types.MessageIndex) error {
233 if tidx < 0 {
234 return errors.New("stream state has non-terminal index")
235 }
236
237 err := c.svc.TerminateStream(&service.TerminateStreamRequest{
238 Path: string(p),
239 Secret: base64.StdEncoding.EncodeToString(s),
240 TerminalIndex: int64(tidx),
241 }).Context(ctx).Do()
242 if err != nil {
243 return translateError(err)
244 }
245 return nil
246 }
247
248 func translateError(err error) error {
249 if gerr, ok := err.(*googleapi.Error); ok {
250 // Auth and server errors are considered transient.
251 switch {
252 case gerr.Code == http.StatusUnauthorized:
253 fallthrough
254 case gerr.Code == http.StatusForbidden:
255 fallthrough
256 case gerr.Code >= http.StatusInternalServerError:
257 return errors.WrapTransient(err)
258 }
259
260 return err
261 }
262
263 // Not a Google API error. Assume it's transient.
264 return errors.WrapTransient(err)
265 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698