Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(238)

Side by Side Diff: third_party/cloud_storage/cloudstorage/api_utils.py

Issue 1031663002: Increase maximum file upload to 100MB, use cloudstorage python library (Closed) Base URL: https://github.com/dart-lang/pub-dartlang.git@master
Patch Set: Add deprecation comment to old cloud_storage.py:open() function Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2013 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing,
10 # software distributed under the License is distributed on an
11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12 # either express or implied. See the License for the specific
13 # language governing permissions and limitations under the License.
14
15 """Util functions and classes for cloudstorage_api."""
16
17
18
19 __all__ = ['set_default_retry_params',
20 'RetryParams',
21 ]
22
23 import copy
24 import httplib
25 import logging
26 import math
27 import os
28 import threading
29 import time
30 import urllib
31
32
33 try:
34 from google.appengine.api import app_identity
35 from google.appengine.api import urlfetch
36 from google.appengine.datastore import datastore_rpc
37 from google.appengine.ext import ndb
38 from google.appengine.ext.ndb import eventloop
39 from google.appengine.ext.ndb import tasklets
40 from google.appengine.ext.ndb import utils
41 from google.appengine import runtime
42 from google.appengine.runtime import apiproxy_errors
43 except ImportError:
44 from google.appengine.api import app_identity
45 from google.appengine.api import urlfetch
46 from google.appengine.datastore import datastore_rpc
47 from google.appengine import runtime
48 from google.appengine.runtime import apiproxy_errors
49 from google.appengine.ext import ndb
50 from google.appengine.ext.ndb import eventloop
51 from google.appengine.ext.ndb import tasklets
52 from google.appengine.ext.ndb import utils
53
54
55 _RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
56 apiproxy_errors.Error,
57 app_identity.InternalError,
58 app_identity.BackendDeadlineExceeded)
59
60 _thread_local_settings = threading.local()
61 _thread_local_settings.default_retry_params = None
62
63
64 def set_default_retry_params(retry_params):
65 """Set a default RetryParams for current thread current request."""
66 _thread_local_settings.default_retry_params = copy.copy(retry_params)
67
68
69 def _get_default_retry_params():
70 """Get default RetryParams for current request and current thread.
71
72 Returns:
73 A new instance of the default RetryParams.
74 """
75 default = getattr(_thread_local_settings, 'default_retry_params', None)
76 if default is None or not default.belong_to_current_request():
77 return RetryParams()
78 else:
79 return copy.copy(default)
80
81
82 def _quote_filename(filename):
83 """Quotes filename to use as a valid URI path.
84
85 Args:
86 filename: user provided filename. /bucket/filename.
87
88 Returns:
89 The filename properly quoted to use as URI's path component.
90 """
91 return urllib.quote(filename)
92
93
94 def _unquote_filename(filename):
95 """Unquotes a valid URI path back to its filename.
96
97 This is the opposite of _quote_filename.
98
99 Args:
100 filename: a quoted filename. /bucket/some%20filename.
101
102 Returns:
103 The filename unquoted.
104 """
105 return urllib.unquote(filename)
106
107
108 def _should_retry(resp):
109 """Given a urlfetch response, decide whether to retry that request."""
110 return (resp.status_code == httplib.REQUEST_TIMEOUT or
111 (resp.status_code >= 500 and
112 resp.status_code < 600))
113
114
115 class _RetryWrapper(object):
116 """A wrapper that wraps retry logic around any tasklet."""
117
118 def __init__(self,
119 retry_params,
120 retriable_exceptions=_RETRIABLE_EXCEPTIONS,
121 should_retry=lambda r: False):
122 """Init.
123
124 Args:
125 retry_params: an RetryParams instance.
126 retriable_exceptions: a list of exception classes that are retriable.
127 should_retry: a function that takes a result from the tasklet and returns
128 a boolean. True if the result should be retried.
129 """
130 self.retry_params = retry_params
131 self.retriable_exceptions = retriable_exceptions
132 self.should_retry = should_retry
133
134 @ndb.tasklet
135 def run(self, tasklet, **kwds):
136 """Run a tasklet with retry.
137
138 The retry should be transparent to the caller: if no results
139 are successful, the exception or result from the last retry is returned
140 to the caller.
141
142 Args:
143 tasklet: the tasklet to run.
144 **kwds: keywords arguments to run the tasklet.
145
146 Raises:
147 The exception from running the tasklet.
148
149 Returns:
150 The result from running the tasklet.
151 """
152 start_time = time.time()
153 n = 1
154
155 while True:
156 e = None
157 result = None
158 got_result = False
159
160 try:
161 result = yield tasklet(**kwds)
162 got_result = True
163 if not self.should_retry(result):
164 raise ndb.Return(result)
165 except runtime.DeadlineExceededError:
166 logging.debug(
167 'Tasklet has exceeded request deadline after %s seconds total',
168 time.time() - start_time)
169 raise
170 except self.retriable_exceptions, e:
171 pass
172
173 if n == 1:
174 logging.debug('Tasklet is %r', tasklet)
175
176 delay = self.retry_params.delay(n, start_time)
177
178 if delay <= 0:
179 logging.debug(
180 'Tasklet failed after %s attempts and %s seconds in total',
181 n, time.time() - start_time)
182 if got_result:
183 raise ndb.Return(result)
184 elif e is not None:
185 raise e
186 else:
187 assert False, 'Should never reach here.'
188
189 if got_result:
190 logging.debug(
191 'Got result %r from tasklet.', result)
192 else:
193 logging.debug(
194 'Got exception "%r" from tasklet.', e)
195 logging.debug('Retry in %s seconds.', delay)
196 n += 1
197 yield tasklets.sleep(delay)
198
199
200 class RetryParams(object):
201 """Retry configuration parameters."""
202
203 _DEFAULT_USER_AGENT = 'App Engine Python GCS Client'
204
205 @datastore_rpc._positional(1)
206 def __init__(self,
207 backoff_factor=2.0,
208 initial_delay=0.1,
209 max_delay=10.0,
210 min_retries=3,
211 max_retries=6,
212 max_retry_period=30.0,
213 urlfetch_timeout=None,
214 save_access_token=False,
215 _user_agent=None):
216 """Init.
217
218 This object is unique per request per thread.
219
220 Library will retry according to this setting when App Engine Server
221 can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
222 500-600 response.
223
224 Args:
225 backoff_factor: exponential backoff multiplier.
226 initial_delay: seconds to delay for the first retry.
227 max_delay: max seconds to delay for every retry.
228 min_retries: min number of times to retry. This value is automatically
229 capped by max_retries.
230 max_retries: max number of times to retry. Set this to 0 for no retry.
231 max_retry_period: max total seconds spent on retry. Retry stops when
232 this period passed AND min_retries has been attempted.
233 urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
234 in which case the value will be chosen by urlfetch module.
235 save_access_token: persist access token to datastore to avoid
236 excessive usage of GetAccessToken API. Usually the token is cached
237 in process and in memcache. In some cases, memcache isn't very
238 reliable.
239 _user_agent: The user agent string that you want to use in your requests.
240 """
241 self.backoff_factor = self._check('backoff_factor', backoff_factor)
242 self.initial_delay = self._check('initial_delay', initial_delay)
243 self.max_delay = self._check('max_delay', max_delay)
244 self.max_retry_period = self._check('max_retry_period', max_retry_period)
245 self.max_retries = self._check('max_retries', max_retries, True, int)
246 self.min_retries = self._check('min_retries', min_retries, True, int)
247 if self.min_retries > self.max_retries:
248 self.min_retries = self.max_retries
249
250 self.urlfetch_timeout = None
251 if urlfetch_timeout is not None:
252 self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
253 self.save_access_token = self._check('save_access_token', save_access_token,
254 True, bool)
255 self._user_agent = _user_agent or self._DEFAULT_USER_AGENT
256
257 self._request_id = os.getenv('REQUEST_LOG_ID')
258
259 def __eq__(self, other):
260 if not isinstance(other, self.__class__):
261 return False
262 return self.__dict__ == other.__dict__
263
264 def __ne__(self, other):
265 return not self.__eq__(other)
266
267 @classmethod
268 def _check(cls, name, val, can_be_zero=False, val_type=float):
269 """Check init arguments.
270
271 Args:
272 name: name of the argument. For logging purpose.
273 val: value. Value has to be non negative number.
274 can_be_zero: whether value can be zero.
275 val_type: Python type of the value.
276
277 Returns:
278 The value.
279
280 Raises:
281 ValueError: when invalid value is passed in.
282 TypeError: when invalid value type is passed in.
283 """
284 valid_types = [val_type]
285 if val_type is float:
286 valid_types.append(int)
287
288 if type(val) not in valid_types:
289 raise TypeError(
290 'Expect type %s for parameter %s' % (val_type.__name__, name))
291 if val < 0:
292 raise ValueError(
293 'Value for parameter %s has to be greater than 0' % name)
294 if not can_be_zero and val == 0:
295 raise ValueError(
296 'Value for parameter %s can not be 0' % name)
297 return val
298
299 def belong_to_current_request(self):
300 return os.getenv('REQUEST_LOG_ID') == self._request_id
301
302 def delay(self, n, start_time):
303 """Calculate delay before the next retry.
304
305 Args:
306 n: the number of current attempt. The first attempt should be 1.
307 start_time: the time when retry started in unix time.
308
309 Returns:
310 Number of seconds to wait before next retry. -1 if retry should give up.
311 """
312 if (n > self.max_retries or
313 (n > self.min_retries and
314 time.time() - start_time > self.max_retry_period)):
315 return -1
316 return min(
317 math.pow(self.backoff_factor, n-1) * self.initial_delay,
318 self.max_delay)
319
320
321 def _run_until_rpc():
322 """Eagerly evaluate tasklets until it is blocking on some RPC.
323
324 Usually ndb eventloop el isn't run until some code calls future.get_result().
325
326 When an async tasklet is called, the tasklet wrapper evaluates the tasklet
327 code into a generator, enqueues a callback _help_tasklet_along onto
328 the el.current queue, and returns a future.
329
330 _help_tasklet_along, when called by the el, will
331 get one yielded value from the generator. If the value if another future,
332 set up a callback _on_future_complete to invoke _help_tasklet_along
333 when the dependent future fulfills. If the value if a RPC, set up a
334 callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
335 Thus _help_tasklet_along drills down
336 the chain of futures until some future is blocked by RPC. El runs
337 all callbacks and constantly check pending RPC status.
338 """
339 el = eventloop.get_event_loop()
340 while el.current:
341 el.run0()
342
343
344 def _eager_tasklet(tasklet):
345 """Decorator to turn tasklet to run eagerly."""
346
347 @utils.wrapping(tasklet)
348 def eager_wrapper(*args, **kwds):
349 fut = tasklet(*args, **kwds)
350 _run_until_rpc()
351 return fut
352
353 return eager_wrapper
OLDNEW
« no previous file with comments | « third_party/cloud_storage/cloudstorage/__init__.py ('k') | third_party/cloud_storage/cloudstorage/cloudstorage_api.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698