OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package prpc | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 "io" | |
11 "io/ioutil" | |
12 "net/http" | |
13 "net/url" | |
14 "strconv" | |
15 "time" | |
16 | |
17 "github.com/golang/protobuf/proto" | |
18 "golang.org/x/net/context" | |
19 "google.golang.org/grpc" | |
20 "google.golang.org/grpc/codes" | |
21 "google.golang.org/grpc/metadata" | |
22 | |
23 "github.com/luci/luci-go/common/errors" | |
24 "github.com/luci/luci-go/common/logging" | |
25 "github.com/luci/luci-go/common/retry" | |
26 "github.com/luci/luci-go/common/transport" | |
27 ) | |
28 | |
29 // Client can make pRPC calls. | |
30 type Client struct { | |
31 Host string // host and optionally a port number of the target serv er. | |
32 Options *Options // if nil, DefaultOptions() are used. | |
33 } | |
34 | |
35 // NewClient creates a new pRPC client with default options. | |
36 func NewClient(host string) *Client { | |
dnj
2016/01/21 07:44:27
This seems unnecessary, since "Client" has no priv
nodir
2016/01/22 00:47:24
Done.
| |
37 return &Client{Host: host, Options: DefaultOptions()} | |
38 } | |
39 | |
40 // renderOptions copies client options and applies opts. | |
41 func (c *Client) renderOptions(opts []grpc.CallOption) *Options { | |
42 var options Options | |
43 if c.Options != nil { | |
44 options = *c.Options | |
45 } else { | |
46 options = *DefaultOptions() | |
dnj
2016/01/21 07:44:28
DefaultOptions generates a new instance anyway. Cl
nodir
2016/01/22 00:47:24
Done.
| |
47 } | |
48 options.apply(opts) | |
49 return &options | |
50 } | |
51 | |
52 // Call makes an RPC. | |
53 // Retries on transient errors according to retry options. | |
54 // Logs HTTP errors. | |
55 // | |
56 // opts must be created by this package. | |
57 // Calling from multiple goroutines concurrently is safe, unless Client is mutat ed. | |
58 // Called from generated code. | |
59 func (c *Client) Call(ctx context.Context, serviceName, methodName string, in, o ut proto.Message, opts ...grpc.CallOption) error { | |
60 options := c.renderOptions(opts) | |
61 | |
62 reqBody, err := proto.Marshal(in) | |
63 if err != nil { | |
64 return err | |
65 } | |
66 | |
67 req := prepareRequest(c.Host, serviceName, methodName, len(reqBody), opt ions) | |
68 fields := logging.Fields{ | |
dnj
2016/01/21 07:44:28
Fields layer naturally via "SetFields". ctx = logg
nodir
2016/01/22 00:47:23
ah, I see. Done, but I don't see much code-size s
dnj
2016/01/22 01:16:27
Looks like you are.
It's less about code size cha
nodir
2016/01/22 02:57:50
ok so there is no "a lot less code"
dnj (Google)
2016/01/22 04:06:23
Well, depends, each logging message where you don'
| |
69 "Host": c.Host, | |
70 "Service": serviceName, | |
71 "Method": methodName, | |
72 } | |
73 | |
74 // Send the request in a retry loop. | |
75 onRetry := func(err error, sleepTime time.Duration) { | |
dnj
2016/01/21 07:44:27
I personally like to inline this w/ the retry.Retr
nodir
2016/01/22 00:47:23
Done.
| |
76 fields := fields.Copy(logging.Fields{ | |
dnj
2016/01/21 07:44:27
(Layering fields is a lot cleaner than explicitly
nodir
2016/01/22 00:47:23
Done.
| |
77 logging.ErrorKey: err, | |
78 "SleepTime": sleepTime, | |
79 }) | |
80 fields.Warningf(ctx, "RPC failed transiently. Will retry in %s", sleepTime) | |
81 } | |
82 var client http.Client | |
83 client.Transport = transport.Get(ctx) | |
dnj
2016/01/21 07:44:27
I seriously dislike embedding Transport in context
nodir
2016/01/22 00:47:24
If it is that serious, please start a discussion i
| |
84 err = retry.Retry(ctx, retry.TransientOnly(options.Retry()), func() erro r { | |
85 fields.Infof(ctx, "RPC %s/%s.%s", c.Host, serviceName, methodNam e) | |
dnj
2016/01/21 07:44:27
This might be a bit wordy even for info level. May
nodir
2016/01/22 00:47:24
Done. It is wordy indeed, we use info in https://g
| |
86 | |
87 // Send the request. | |
88 req.Body = ioutil.NopCloser(bytes.NewBuffer(reqBody)) | |
dnj
2016/01/21 07:44:27
bytes.NewReader is more lightweight and appropriat
nodir
2016/01/22 00:47:24
Done.
| |
89 res, err := client.Do(req) | |
90 if err != nil { | |
91 return errors.WrapTransient(fmt.Errorf("failed to send r equest: %s", err)) | |
dnj
2016/01/21 07:44:28
IMO you should handle transient errors more surgic
nodir
2016/01/22 00:47:24
status is not available if err != nil
dnj
2016/01/22 01:16:27
This sounds like a hard failure to me then, not a
nodir
2016/01/22 02:57:50
Client.Do may return an error on URLFetch proxy fa
dnj (Google)
2016/01/22 04:06:23
Ugh okay. I really dislike that they don't give us
| |
92 } | |
93 defer res.Body.Close() | |
94 | |
95 if options.resHeaderMetadata != nil { | |
96 *options.resHeaderMetadata = metadata.MD(res.Header).Cop y() | |
97 } | |
98 | |
99 // Check response status code. | |
100 if res.StatusCode >= 300 { | |
dnj
2016/01/21 07:44:27
Maybe http.StatusMultipleChoices instead of 300?
nodir
2016/01/22 00:47:24
name "MultipleChoises" does not imply it is the fi
dnj
2016/01/22 01:16:27
Actually now that I think about it, you control th
nodir
2016/01/22 02:57:50
Why couple tightly? HTTP status codes < 300 are no
dnj (Google)
2016/01/22 04:06:23
If we fully control our server's response codes an
| |
101 return responseErr(ctx, res) | |
102 } | |
103 | |
104 // Read and parse response message. | |
105 buf, err := ioutil.ReadAll(res.Body) | |
dnj
2016/01/21 07:44:27
Since you're doing this in a retry loop, you could
nodir
2016/01/22 00:47:24
Done.
| |
106 if err != nil { | |
107 return errors.WrapTransient(fmt.Errorf("failed to read r esponse body: %s", err)) | |
dnj
2016/01/21 07:44:27
Why is a failure to read body transient? This woul
nodir
2016/01/22 00:47:24
Isn't TCP interruption transient? In other words,
dnj
2016/01/22 01:16:27
It depends why the interruption happened. e.g, if
nodir
2016/01/22 02:57:50
Done
| |
108 } | |
109 | |
110 if options.resTrailerMetadata != nil { | |
111 *options.resTrailerMetadata = metadata.MD(res.Trailer).C opy() | |
112 } | |
113 | |
114 return proto.Unmarshal(buf, out) // non-transient error | |
115 }, onRetry) | |
116 | |
117 if err != nil { | |
118 fields[logging.ErrorKey] = err | |
dnj
2016/01/21 07:44:27
If you set fields, you can make this:
logging.With
nodir
2016/01/22 00:47:24
Done
| |
119 fields.Warningf(ctx, "RPC failed permanently: %s", err) | |
120 } | |
121 | |
122 // We have to unwrap gRPC errors because | |
123 // grpc.Code and grpc.ErrorDesc functions do not work with error wrapper s. | |
124 // https://github.com/grpc/grpc-go/issues/494 | |
125 return errors.UnwrapAll(err) | |
126 } | |
127 | |
128 // prepareRequest creates an HTTP request for an RPC, | |
129 // except it does not set the request body. | |
130 func prepareRequest(host, serviceName, methodName string, contentLength int, opt ions *Options) *http.Request { | |
131 if host == "" { | |
132 panic("Host is not set") | |
133 } | |
134 req := &http.Request{ | |
135 Method: "POST", | |
136 URL: &url.URL{ | |
137 Scheme: "https", | |
138 Host: host, | |
139 Path: fmt.Sprintf("/prpc/%s/%s", serviceName, methodNa me), | |
140 }, | |
141 Header: http.Header{}, | |
142 } | |
143 if options.Insecure { | |
144 req.URL.Scheme = "http" | |
145 } | |
146 | |
147 // Set headers. | |
148 const mediaType = "application/prpc" // binary | |
149 req.Header.Set("Content-Type", mediaType) | |
150 req.Header.Set("Accept", mediaType) | |
151 if options.UserAgent != "" { | |
152 req.Header.Set("User-Agent", options.UserAgent) | |
dnj
2016/01/21 07:44:27
Endpoints clients have a default user agent (e.g.,
nodir
2016/01/22 00:47:24
ah right, forgot about that
| |
153 } | |
154 req.ContentLength = int64(contentLength) | |
155 req.Header.Set("Content-Length", strconv.Itoa(contentLength)) | |
156 // TODO(nodir): add "Accept-Encoding: gzip" when pRPC server supports it . | |
157 return req | |
158 } | |
159 | |
160 // responseErr converts an HTTP response to a gRPC error. | |
161 // The returned error may be wrapped with errors.WrapTransient. | |
162 func responseErr(c context.Context, res *http.Response) error { | |
163 // Read first 256 bytes of the body. Leave empty if cannot read. | |
164 var body string | |
165 const maxBody = 256 | |
dnj
2016/01/21 07:44:27
Perhaps this could be a Client setting? That way,
nodir
2016/01/22 00:47:24
Done.
| |
166 bodyBytes, err := ioutil.ReadAll(io.LimitReader(res.Body, maxBody)) | |
167 if err == nil { | |
168 if len(bodyBytes) == maxBody { | |
169 bodyBytes = append(bodyBytes, []byte("...")...) | |
170 } | |
171 body = string(bodyBytes) | |
172 } | |
173 | |
174 const noCode = 0xffffffff | |
dnj
2016/01/21 07:44:27
Rather than use a sentinel value, just use a boole
nodir
2016/01/22 00:47:24
not relevant any more
| |
175 code := codes.Code(noCode) | |
176 // Read explicit gRPC code. | |
177 const headerName = "X-Prpc-Grpc-Code" | |
dnj
2016/01/21 07:44:28
Shared constant w/ server?
nodir
2016/01/22 00:47:23
Added a constant
| |
178 if codeHeader := res.Header.Get(headerName); codeHeader != "" { | |
179 if intCode, err := strconv.Atoi(codeHeader); err != nil { | |
180 logging.Warningf(c, "could not parse %s header: %s", hea derName, err) | |
dnj
2016/01/21 07:44:28
Capitalize logging messages. Use fields:
logging.
nodir
2016/01/22 00:47:23
Done
dnj
2016/01/22 01:16:27
Acknowledged.
| |
181 } else { | |
182 code = codes.Code(intCode) | |
183 } | |
184 } | |
185 if code == noCode { | |
186 code = StatusCode(res.StatusCode) | |
187 } | |
188 | |
189 // Return the error. | |
190 err = grpc.Errorf(code, "HTTP %s: %s", res.Status, body) | |
191 if isTransientStatus(res.StatusCode) { | |
192 err = errors.WrapTransient(err) | |
193 } | |
194 return err | |
195 } | |
196 | |
197 // isTransientStatus returns true if an HTTP status code indicates a transient e rror. | |
198 func isTransientStatus(status int) bool { | |
199 return status >= 500 || status == http.StatusRequestTimeout | |
200 } | |
OLD | NEW |