OLD | NEW |
(Empty) | |
| 1 # Copyright 2013 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, |
| 10 # software distributed under the License is distributed on an |
| 11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
| 12 # either express or implied. See the License for the specific |
| 13 # language governing permissions and limitations under the License. |
| 14 |
| 15 """Util functions and classes for cloudstorage_api.""" |
| 16 |
| 17 |
| 18 |
| 19 __all__ = ['set_default_retry_params', |
| 20 'RetryParams', |
| 21 ] |
| 22 |
| 23 import copy |
| 24 import httplib |
| 25 import logging |
| 26 import math |
| 27 import os |
| 28 import threading |
| 29 import time |
| 30 import urllib |
| 31 |
| 32 |
| 33 try: |
| 34 from google.appengine.api import app_identity |
| 35 from google.appengine.api import urlfetch |
| 36 from google.appengine.datastore import datastore_rpc |
| 37 from google.appengine.ext import ndb |
| 38 from google.appengine.ext.ndb import eventloop |
| 39 from google.appengine.ext.ndb import tasklets |
| 40 from google.appengine.ext.ndb import utils |
| 41 from google.appengine import runtime |
| 42 from google.appengine.runtime import apiproxy_errors |
| 43 except ImportError: |
| 44 from google.appengine.api import app_identity |
| 45 from google.appengine.api import urlfetch |
| 46 from google.appengine.datastore import datastore_rpc |
| 47 from google.appengine import runtime |
| 48 from google.appengine.runtime import apiproxy_errors |
| 49 from google.appengine.ext import ndb |
| 50 from google.appengine.ext.ndb import eventloop |
| 51 from google.appengine.ext.ndb import tasklets |
| 52 from google.appengine.ext.ndb import utils |
| 53 |
| 54 |
| 55 _RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError, |
| 56 apiproxy_errors.Error, |
| 57 app_identity.InternalError, |
| 58 app_identity.BackendDeadlineExceeded) |
| 59 |
| 60 _thread_local_settings = threading.local() |
| 61 _thread_local_settings.default_retry_params = None |
| 62 |
| 63 |
| 64 def set_default_retry_params(retry_params): |
| 65 """Set a default RetryParams for current thread current request.""" |
| 66 _thread_local_settings.default_retry_params = copy.copy(retry_params) |
| 67 |
| 68 |
| 69 def _get_default_retry_params(): |
| 70 """Get default RetryParams for current request and current thread. |
| 71 |
| 72 Returns: |
| 73 A new instance of the default RetryParams. |
| 74 """ |
| 75 default = getattr(_thread_local_settings, 'default_retry_params', None) |
| 76 if default is None or not default.belong_to_current_request(): |
| 77 return RetryParams() |
| 78 else: |
| 79 return copy.copy(default) |
| 80 |
| 81 |
| 82 def _quote_filename(filename): |
| 83 """Quotes filename to use as a valid URI path. |
| 84 |
| 85 Args: |
| 86 filename: user provided filename. /bucket/filename. |
| 87 |
| 88 Returns: |
| 89 The filename properly quoted to use as URI's path component. |
| 90 """ |
| 91 return urllib.quote(filename) |
| 92 |
| 93 |
| 94 def _unquote_filename(filename): |
| 95 """Unquotes a valid URI path back to its filename. |
| 96 |
| 97 This is the opposite of _quote_filename. |
| 98 |
| 99 Args: |
| 100 filename: a quoted filename. /bucket/some%20filename. |
| 101 |
| 102 Returns: |
| 103 The filename unquoted. |
| 104 """ |
| 105 return urllib.unquote(filename) |
| 106 |
| 107 |
| 108 def _should_retry(resp): |
| 109 """Given a urlfetch response, decide whether to retry that request.""" |
| 110 return (resp.status_code == httplib.REQUEST_TIMEOUT or |
| 111 (resp.status_code >= 500 and |
| 112 resp.status_code < 600)) |
| 113 |
| 114 |
| 115 class _RetryWrapper(object): |
| 116 """A wrapper that wraps retry logic around any tasklet.""" |
| 117 |
| 118 def __init__(self, |
| 119 retry_params, |
| 120 retriable_exceptions=_RETRIABLE_EXCEPTIONS, |
| 121 should_retry=lambda r: False): |
| 122 """Init. |
| 123 |
| 124 Args: |
| 125 retry_params: an RetryParams instance. |
| 126 retriable_exceptions: a list of exception classes that are retriable. |
| 127 should_retry: a function that takes a result from the tasklet and returns |
| 128 a boolean. True if the result should be retried. |
| 129 """ |
| 130 self.retry_params = retry_params |
| 131 self.retriable_exceptions = retriable_exceptions |
| 132 self.should_retry = should_retry |
| 133 |
| 134 @ndb.tasklet |
| 135 def run(self, tasklet, **kwds): |
| 136 """Run a tasklet with retry. |
| 137 |
| 138 The retry should be transparent to the caller: if no results |
| 139 are successful, the exception or result from the last retry is returned |
| 140 to the caller. |
| 141 |
| 142 Args: |
| 143 tasklet: the tasklet to run. |
| 144 **kwds: keywords arguments to run the tasklet. |
| 145 |
| 146 Raises: |
| 147 The exception from running the tasklet. |
| 148 |
| 149 Returns: |
| 150 The result from running the tasklet. |
| 151 """ |
| 152 start_time = time.time() |
| 153 n = 1 |
| 154 |
| 155 while True: |
| 156 e = None |
| 157 result = None |
| 158 got_result = False |
| 159 |
| 160 try: |
| 161 result = yield tasklet(**kwds) |
| 162 got_result = True |
| 163 if not self.should_retry(result): |
| 164 raise ndb.Return(result) |
| 165 except runtime.DeadlineExceededError: |
| 166 logging.debug( |
| 167 'Tasklet has exceeded request deadline after %s seconds total', |
| 168 time.time() - start_time) |
| 169 raise |
| 170 except self.retriable_exceptions, e: |
| 171 pass |
| 172 |
| 173 if n == 1: |
| 174 logging.debug('Tasklet is %r', tasklet) |
| 175 |
| 176 delay = self.retry_params.delay(n, start_time) |
| 177 |
| 178 if delay <= 0: |
| 179 logging.debug( |
| 180 'Tasklet failed after %s attempts and %s seconds in total', |
| 181 n, time.time() - start_time) |
| 182 if got_result: |
| 183 raise ndb.Return(result) |
| 184 elif e is not None: |
| 185 raise e |
| 186 else: |
| 187 assert False, 'Should never reach here.' |
| 188 |
| 189 if got_result: |
| 190 logging.debug( |
| 191 'Got result %r from tasklet.', result) |
| 192 else: |
| 193 logging.debug( |
| 194 'Got exception "%r" from tasklet.', e) |
| 195 logging.debug('Retry in %s seconds.', delay) |
| 196 n += 1 |
| 197 yield tasklets.sleep(delay) |
| 198 |
| 199 |
| 200 class RetryParams(object): |
| 201 """Retry configuration parameters.""" |
| 202 |
| 203 _DEFAULT_USER_AGENT = 'App Engine Python GCS Client' |
| 204 |
| 205 @datastore_rpc._positional(1) |
| 206 def __init__(self, |
| 207 backoff_factor=2.0, |
| 208 initial_delay=0.1, |
| 209 max_delay=10.0, |
| 210 min_retries=3, |
| 211 max_retries=6, |
| 212 max_retry_period=30.0, |
| 213 urlfetch_timeout=None, |
| 214 save_access_token=False, |
| 215 _user_agent=None): |
| 216 """Init. |
| 217 |
| 218 This object is unique per request per thread. |
| 219 |
| 220 Library will retry according to this setting when App Engine Server |
| 221 can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or |
| 222 500-600 response. |
| 223 |
| 224 Args: |
| 225 backoff_factor: exponential backoff multiplier. |
| 226 initial_delay: seconds to delay for the first retry. |
| 227 max_delay: max seconds to delay for every retry. |
| 228 min_retries: min number of times to retry. This value is automatically |
| 229 capped by max_retries. |
| 230 max_retries: max number of times to retry. Set this to 0 for no retry. |
| 231 max_retry_period: max total seconds spent on retry. Retry stops when |
| 232 this period passed AND min_retries has been attempted. |
| 233 urlfetch_timeout: timeout for urlfetch in seconds. Could be None, |
| 234 in which case the value will be chosen by urlfetch module. |
| 235 save_access_token: persist access token to datastore to avoid |
| 236 excessive usage of GetAccessToken API. Usually the token is cached |
| 237 in process and in memcache. In some cases, memcache isn't very |
| 238 reliable. |
| 239 _user_agent: The user agent string that you want to use in your requests. |
| 240 """ |
| 241 self.backoff_factor = self._check('backoff_factor', backoff_factor) |
| 242 self.initial_delay = self._check('initial_delay', initial_delay) |
| 243 self.max_delay = self._check('max_delay', max_delay) |
| 244 self.max_retry_period = self._check('max_retry_period', max_retry_period) |
| 245 self.max_retries = self._check('max_retries', max_retries, True, int) |
| 246 self.min_retries = self._check('min_retries', min_retries, True, int) |
| 247 if self.min_retries > self.max_retries: |
| 248 self.min_retries = self.max_retries |
| 249 |
| 250 self.urlfetch_timeout = None |
| 251 if urlfetch_timeout is not None: |
| 252 self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout) |
| 253 self.save_access_token = self._check('save_access_token', save_access_token, |
| 254 True, bool) |
| 255 self._user_agent = _user_agent or self._DEFAULT_USER_AGENT |
| 256 |
| 257 self._request_id = os.getenv('REQUEST_LOG_ID') |
| 258 |
| 259 def __eq__(self, other): |
| 260 if not isinstance(other, self.__class__): |
| 261 return False |
| 262 return self.__dict__ == other.__dict__ |
| 263 |
| 264 def __ne__(self, other): |
| 265 return not self.__eq__(other) |
| 266 |
| 267 @classmethod |
| 268 def _check(cls, name, val, can_be_zero=False, val_type=float): |
| 269 """Check init arguments. |
| 270 |
| 271 Args: |
| 272 name: name of the argument. For logging purpose. |
| 273 val: value. Value has to be non negative number. |
| 274 can_be_zero: whether value can be zero. |
| 275 val_type: Python type of the value. |
| 276 |
| 277 Returns: |
| 278 The value. |
| 279 |
| 280 Raises: |
| 281 ValueError: when invalid value is passed in. |
| 282 TypeError: when invalid value type is passed in. |
| 283 """ |
| 284 valid_types = [val_type] |
| 285 if val_type is float: |
| 286 valid_types.append(int) |
| 287 |
| 288 if type(val) not in valid_types: |
| 289 raise TypeError( |
| 290 'Expect type %s for parameter %s' % (val_type.__name__, name)) |
| 291 if val < 0: |
| 292 raise ValueError( |
| 293 'Value for parameter %s has to be greater than 0' % name) |
| 294 if not can_be_zero and val == 0: |
| 295 raise ValueError( |
| 296 'Value for parameter %s can not be 0' % name) |
| 297 return val |
| 298 |
| 299 def belong_to_current_request(self): |
| 300 return os.getenv('REQUEST_LOG_ID') == self._request_id |
| 301 |
| 302 def delay(self, n, start_time): |
| 303 """Calculate delay before the next retry. |
| 304 |
| 305 Args: |
| 306 n: the number of current attempt. The first attempt should be 1. |
| 307 start_time: the time when retry started in unix time. |
| 308 |
| 309 Returns: |
| 310 Number of seconds to wait before next retry. -1 if retry should give up. |
| 311 """ |
| 312 if (n > self.max_retries or |
| 313 (n > self.min_retries and |
| 314 time.time() - start_time > self.max_retry_period)): |
| 315 return -1 |
| 316 return min( |
| 317 math.pow(self.backoff_factor, n-1) * self.initial_delay, |
| 318 self.max_delay) |
| 319 |
| 320 |
| 321 def _run_until_rpc(): |
| 322 """Eagerly evaluate tasklets until it is blocking on some RPC. |
| 323 |
| 324 Usually ndb eventloop el isn't run until some code calls future.get_result(). |
| 325 |
| 326 When an async tasklet is called, the tasklet wrapper evaluates the tasklet |
| 327 code into a generator, enqueues a callback _help_tasklet_along onto |
| 328 the el.current queue, and returns a future. |
| 329 |
| 330 _help_tasklet_along, when called by the el, will |
| 331 get one yielded value from the generator. If the value if another future, |
| 332 set up a callback _on_future_complete to invoke _help_tasklet_along |
| 333 when the dependent future fulfills. If the value if a RPC, set up a |
| 334 callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills. |
| 335 Thus _help_tasklet_along drills down |
| 336 the chain of futures until some future is blocked by RPC. El runs |
| 337 all callbacks and constantly check pending RPC status. |
| 338 """ |
| 339 el = eventloop.get_event_loop() |
| 340 while el.current: |
| 341 el.run0() |
| 342 |
| 343 |
| 344 def _eager_tasklet(tasklet): |
| 345 """Decorator to turn tasklet to run eagerly.""" |
| 346 |
| 347 @utils.wrapping(tasklet) |
| 348 def eager_wrapper(*args, **kwds): |
| 349 fut = tasklet(*args, **kwds) |
| 350 _run_until_rpc() |
| 351 return fut |
| 352 |
| 353 return eager_wrapper |
OLD | NEW |