Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(395)

Side by Side Diff: common/prpc/client.go

Issue 1605363002: common/prpc, tools/cmd/cproto: prpc client (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: rebased and addressed comments Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/logging/fields.go ('k') | common/prpc/client_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package prpc
6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "net/http"
13 "net/url"
14 "strconv"
15 "strings"
16 "time"
17
18 "github.com/golang/protobuf/proto"
19 "golang.org/x/net/context"
20 "golang.org/x/net/context/ctxhttp"
21 "google.golang.org/grpc"
22 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/metadata"
24
25 "github.com/luci/luci-go/common/errors"
26 "github.com/luci/luci-go/common/grpcutil"
27 "github.com/luci/luci-go/common/logging"
28 "github.com/luci/luci-go/common/retry"
29 )
30
31 const (
32 // HeaderGRPCCode is a name of the HTTP header that specifies the
33 // gRPC code in the response.
34 // A pRPC server must always specify it.
35 HeaderGRPCCode = "X-Prpc-Grpc-Code"
36 )
37
38 // DefaultUserAgent is default User-Agent HTTP header for pRPC requests.
39 var DefaultUserAgent = "pRPC Client 1.0"
40
41 // Client can make pRPC calls.
42 type Client struct {
43 C *http.Client // if nil, uses http.DefaultClient
44
45 // ErrBodySize is the number of bytes to read from a HTTP response
46 // with status >= 300 and include in the error.
47 // If non-positive, defaults to 256.
48 ErrBodySize int
49
50 Host string // host and optionally a port number of the target serv er.
51 Options *Options // if nil, DefaultOptions() are used.
52 }
53
54 // renderOptions copies client options and applies opts.
55 func (c *Client) renderOptions(opts []grpc.CallOption) *Options {
56 var options *Options
57 if c.Options != nil {
58 cpy := *c.Options
59 options = &cpy
60 } else {
61 options = DefaultOptions()
62 }
63 options.apply(opts)
64 return options
65 }
66
67 func (c *Client) getHTTPClient() *http.Client {
68 if c.C == nil {
69 return http.DefaultClient
70 }
71 return c.C
72 }
73
74 // Call makes an RPC.
75 // Retries on transient errors according to retry options.
76 // Logs HTTP errors.
77 //
78 // opts must be created by this package.
79 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed.
80 // Called from generated code.
81 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error {
82 options := c.renderOptions(opts)
83
84 reqBody, err := proto.Marshal(in)
85 if err != nil {
86 return err
87 }
88
89 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions)
90 ctx = logging.SetFields(ctx, logging.Fields{
91 "host": c.Host,
92 "service": serviceName,
93 "method": methodName,
94 })
95
96 // Send the request in a retry loop.
97 var buf bytes.Buffer
98 err = retry.Retry(
99 ctx,
100 retry.TransientOnly(options.Retry),
101 func() error {
102 logging.Debugf(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodName)
103
104 // Send the request.
105 req.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
106 res, err := ctxhttp.Do(ctx, c.getHTTPClient(), req)
107 if err != nil {
108 return errors.WrapTransient(fmt.Errorf("failed t o send request: %s", err))
109 }
110 defer res.Body.Close()
111
112 if options.resHeaderMetadata != nil {
113 *options.resHeaderMetadata = metadata.MD(res.Hea der).Copy()
114 }
115
116 // Read the response body.
117 buf.Reset()
118 var body io.Reader = res.Body
119 if res.ContentLength > 0 {
120 buf.Grow(int(res.ContentLength))
121 body = io.LimitReader(body, res.ContentLength)
122 }
123 if _, err = buf.ReadFrom(body); err != nil {
124 return fmt.Errorf("failed to read response body: %s", err)
125 }
126
127 if options.resTrailerMetadata != nil {
128 *options.resTrailerMetadata = metadata.MD(res.Tr ailer).Copy()
129 }
130
131 codeHeader := res.Header.Get(HeaderGRPCCode)
132 if codeHeader == "" {
133 // Not a valid pRPC response.
134 body := buf.String()
135 bodySize := c.ErrBodySize
136 if bodySize <= 0 {
137 bodySize = 256
138 }
139 if len(body) > bodySize {
140 body = body[:bodySize] + "..."
141 }
142 return fmt.Errorf("HTTP %d: no gRPC code. Body: %s", res.StatusCode, body)
143 }
144
145 codeInt, err := strconv.Atoi(codeHeader)
146 if err != nil {
147 // Not a valid pRPC response.
148 return fmt.Errorf("invalid grpc code %q: %s", co deHeader, err)
149 }
150
151 code := codes.Code(codeInt)
152 if code != codes.OK {
153 desc := strings.TrimSuffix(buf.String(), "\n")
154 err := grpcutil.Errf(code, "%s", desc)
155 if isTransientCode(code) {
156 err = errors.WrapTransient(err)
157 }
158 return err
159 }
160
161 return proto.Unmarshal(buf.Bytes(), out) // non-transien t error
162 },
163 func(err error, sleepTime time.Duration) {
164 logging.Fields{
165 logging.ErrorKey: err,
166 "sleepTime": sleepTime,
167 }.Warningf(ctx, "RPC failed transiently. Will retry in % s", sleepTime)
168 },
169 )
170
171 if err != nil {
172 logging.WithError(err).Warningf(ctx, "RPC failed permanently: %s ", err)
173 }
174
175 // We have to unwrap gRPC errors because
176 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s.
177 // https://github.com/grpc/grpc-go/issues/494
178 return errors.UnwrapAll(err)
179 }
180
181 // prepareRequest creates an HTTP request for an RPC,
182 // except it does not set the request body.
183 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request {
184 if host == "" {
185 panic("Host is not set")
186 }
187 req := &http.Request{
188 Method: "POST",
189 URL: &url.URL{
190 Scheme: "https",
191 Host: host,
192 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me),
193 },
194 Header: http.Header{},
195 }
196 if options.Insecure {
197 req.URL.Scheme = "http"
198 }
199
200 // Set headers.
201 const mediaType = "application/prpc" // binary
202 req.Header.Set("Content-Type", mediaType)
203 req.Header.Set("Accept", mediaType)
204 userAgent := options.UserAgent
205 if userAgent == "" {
206 userAgent = DefaultUserAgent
207 }
208 req.Header.Set("User-Agent", userAgent)
209 req.ContentLength = int64(contentLength)
210 req.Header.Set("Content-Length", strconv.Itoa(contentLength))
211 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it .
212 return req
213 }
214
215 func isTransientCode(code codes.Code) bool {
216 switch code {
217 case codes.Internal, codes.Unknown:
218 return true
219
220 default:
221 return false
222 }
223 }
OLDNEW
« no previous file with comments | « common/logging/fields.go ('k') | common/prpc/client_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698