| Index: third_party/cloud_storage/cloudstorage/api_utils.py
|
| diff --git a/third_party/cloud_storage/cloudstorage/api_utils.py b/third_party/cloud_storage/cloudstorage/api_utils.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..680ac6bcbfadb5eae4582851a22d63d67b92363c
|
| --- /dev/null
|
| +++ b/third_party/cloud_storage/cloudstorage/api_utils.py
|
| @@ -0,0 +1,353 @@
|
| +# Copyright 2013 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing,
|
| +# software distributed under the License is distributed on an
|
| +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
| +# either express or implied. See the License for the specific
|
| +# language governing permissions and limitations under the License.
|
| +
|
| +"""Util functions and classes for cloudstorage_api."""
|
| +
|
| +
|
| +
|
| +__all__ = ['set_default_retry_params',
|
| + 'RetryParams',
|
| + ]
|
| +
|
| +import copy
|
| +import httplib
|
| +import logging
|
| +import math
|
| +import os
|
| +import threading
|
| +import time
|
| +import urllib
|
| +
|
| +
|
| +try:
|
| + from google.appengine.api import app_identity
|
| + from google.appengine.api import urlfetch
|
| + from google.appengine.datastore import datastore_rpc
|
| + from google.appengine.ext import ndb
|
| + from google.appengine.ext.ndb import eventloop
|
| + from google.appengine.ext.ndb import tasklets
|
| + from google.appengine.ext.ndb import utils
|
| + from google.appengine import runtime
|
| + from google.appengine.runtime import apiproxy_errors
|
| +except ImportError:
|
| + from google.appengine.api import app_identity
|
| + from google.appengine.api import urlfetch
|
| + from google.appengine.datastore import datastore_rpc
|
| + from google.appengine import runtime
|
| + from google.appengine.runtime import apiproxy_errors
|
| + from google.appengine.ext import ndb
|
| + from google.appengine.ext.ndb import eventloop
|
| + from google.appengine.ext.ndb import tasklets
|
| + from google.appengine.ext.ndb import utils
|
| +
|
| +
|
| +_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
|
| + apiproxy_errors.Error,
|
| + app_identity.InternalError,
|
| + app_identity.BackendDeadlineExceeded)
|
| +
|
| +_thread_local_settings = threading.local()
|
| +_thread_local_settings.default_retry_params = None
|
| +
|
| +
|
| +def set_default_retry_params(retry_params):
|
| + """Set a default RetryParams for current thread current request."""
|
| + _thread_local_settings.default_retry_params = copy.copy(retry_params)
|
| +
|
| +
|
| +def _get_default_retry_params():
|
| + """Get default RetryParams for current request and current thread.
|
| +
|
| + Returns:
|
| + A new instance of the default RetryParams.
|
| + """
|
| + default = getattr(_thread_local_settings, 'default_retry_params', None)
|
| + if default is None or not default.belong_to_current_request():
|
| + return RetryParams()
|
| + else:
|
| + return copy.copy(default)
|
| +
|
| +
|
| +def _quote_filename(filename):
|
| + """Quotes filename to use as a valid URI path.
|
| +
|
| + Args:
|
| + filename: user provided filename. /bucket/filename.
|
| +
|
| + Returns:
|
| + The filename properly quoted to use as URI's path component.
|
| + """
|
| + return urllib.quote(filename)
|
| +
|
| +
|
| +def _unquote_filename(filename):
|
| + """Unquotes a valid URI path back to its filename.
|
| +
|
| + This is the opposite of _quote_filename.
|
| +
|
| + Args:
|
| + filename: a quoted filename. /bucket/some%20filename.
|
| +
|
| + Returns:
|
| + The filename unquoted.
|
| + """
|
| + return urllib.unquote(filename)
|
| +
|
| +
|
| +def _should_retry(resp):
|
| + """Given a urlfetch response, decide whether to retry that request."""
|
| + return (resp.status_code == httplib.REQUEST_TIMEOUT or
|
| + (resp.status_code >= 500 and
|
| + resp.status_code < 600))
|
| +
|
| +
|
| +class _RetryWrapper(object):
|
| + """A wrapper that wraps retry logic around any tasklet."""
|
| +
|
| + def __init__(self,
|
| + retry_params,
|
| + retriable_exceptions=_RETRIABLE_EXCEPTIONS,
|
| + should_retry=lambda r: False):
|
| + """Init.
|
| +
|
| + Args:
|
| + retry_params: an RetryParams instance.
|
| + retriable_exceptions: a list of exception classes that are retriable.
|
| + should_retry: a function that takes a result from the tasklet and returns
|
| + a boolean. True if the result should be retried.
|
| + """
|
| + self.retry_params = retry_params
|
| + self.retriable_exceptions = retriable_exceptions
|
| + self.should_retry = should_retry
|
| +
|
| + @ndb.tasklet
|
| + def run(self, tasklet, **kwds):
|
| + """Run a tasklet with retry.
|
| +
|
| + The retry should be transparent to the caller: if no results
|
| + are successful, the exception or result from the last retry is returned
|
| + to the caller.
|
| +
|
| + Args:
|
| + tasklet: the tasklet to run.
|
| + **kwds: keywords arguments to run the tasklet.
|
| +
|
| + Raises:
|
| + The exception from running the tasklet.
|
| +
|
| + Returns:
|
| + The result from running the tasklet.
|
| + """
|
| + start_time = time.time()
|
| + n = 1
|
| +
|
| + while True:
|
| + e = None
|
| + result = None
|
| + got_result = False
|
| +
|
| + try:
|
| + result = yield tasklet(**kwds)
|
| + got_result = True
|
| + if not self.should_retry(result):
|
| + raise ndb.Return(result)
|
| + except runtime.DeadlineExceededError:
|
| + logging.debug(
|
| + 'Tasklet has exceeded request deadline after %s seconds total',
|
| + time.time() - start_time)
|
| + raise
|
| + except self.retriable_exceptions, e:
|
| + pass
|
| +
|
| + if n == 1:
|
| + logging.debug('Tasklet is %r', tasklet)
|
| +
|
| + delay = self.retry_params.delay(n, start_time)
|
| +
|
| + if delay <= 0:
|
| + logging.debug(
|
| + 'Tasklet failed after %s attempts and %s seconds in total',
|
| + n, time.time() - start_time)
|
| + if got_result:
|
| + raise ndb.Return(result)
|
| + elif e is not None:
|
| + raise e
|
| + else:
|
| + assert False, 'Should never reach here.'
|
| +
|
| + if got_result:
|
| + logging.debug(
|
| + 'Got result %r from tasklet.', result)
|
| + else:
|
| + logging.debug(
|
| + 'Got exception "%r" from tasklet.', e)
|
| + logging.debug('Retry in %s seconds.', delay)
|
| + n += 1
|
| + yield tasklets.sleep(delay)
|
| +
|
| +
|
| +class RetryParams(object):
|
| + """Retry configuration parameters."""
|
| +
|
| + _DEFAULT_USER_AGENT = 'App Engine Python GCS Client'
|
| +
|
| + @datastore_rpc._positional(1)
|
| + def __init__(self,
|
| + backoff_factor=2.0,
|
| + initial_delay=0.1,
|
| + max_delay=10.0,
|
| + min_retries=3,
|
| + max_retries=6,
|
| + max_retry_period=30.0,
|
| + urlfetch_timeout=None,
|
| + save_access_token=False,
|
| + _user_agent=None):
|
| + """Init.
|
| +
|
| + This object is unique per request per thread.
|
| +
|
| + Library will retry according to this setting when App Engine Server
|
| + can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
|
| + 500-600 response.
|
| +
|
| + Args:
|
| + backoff_factor: exponential backoff multiplier.
|
| + initial_delay: seconds to delay for the first retry.
|
| + max_delay: max seconds to delay for every retry.
|
| + min_retries: min number of times to retry. This value is automatically
|
| + capped by max_retries.
|
| + max_retries: max number of times to retry. Set this to 0 for no retry.
|
| + max_retry_period: max total seconds spent on retry. Retry stops when
|
| + this period passed AND min_retries has been attempted.
|
| + urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
|
| + in which case the value will be chosen by urlfetch module.
|
| + save_access_token: persist access token to datastore to avoid
|
| + excessive usage of GetAccessToken API. Usually the token is cached
|
| + in process and in memcache. In some cases, memcache isn't very
|
| + reliable.
|
| + _user_agent: The user agent string that you want to use in your requests.
|
| + """
|
| + self.backoff_factor = self._check('backoff_factor', backoff_factor)
|
| + self.initial_delay = self._check('initial_delay', initial_delay)
|
| + self.max_delay = self._check('max_delay', max_delay)
|
| + self.max_retry_period = self._check('max_retry_period', max_retry_period)
|
| + self.max_retries = self._check('max_retries', max_retries, True, int)
|
| + self.min_retries = self._check('min_retries', min_retries, True, int)
|
| + if self.min_retries > self.max_retries:
|
| + self.min_retries = self.max_retries
|
| +
|
| + self.urlfetch_timeout = None
|
| + if urlfetch_timeout is not None:
|
| + self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
|
| + self.save_access_token = self._check('save_access_token', save_access_token,
|
| + True, bool)
|
| + self._user_agent = _user_agent or self._DEFAULT_USER_AGENT
|
| +
|
| + self._request_id = os.getenv('REQUEST_LOG_ID')
|
| +
|
| + def __eq__(self, other):
|
| + if not isinstance(other, self.__class__):
|
| + return False
|
| + return self.__dict__ == other.__dict__
|
| +
|
| + def __ne__(self, other):
|
| + return not self.__eq__(other)
|
| +
|
| + @classmethod
|
| + def _check(cls, name, val, can_be_zero=False, val_type=float):
|
| + """Check init arguments.
|
| +
|
| + Args:
|
| + name: name of the argument. For logging purpose.
|
| + val: value. Value has to be non negative number.
|
| + can_be_zero: whether value can be zero.
|
| + val_type: Python type of the value.
|
| +
|
| + Returns:
|
| + The value.
|
| +
|
| + Raises:
|
| + ValueError: when invalid value is passed in.
|
| + TypeError: when invalid value type is passed in.
|
| + """
|
| + valid_types = [val_type]
|
| + if val_type is float:
|
| + valid_types.append(int)
|
| +
|
| + if type(val) not in valid_types:
|
| + raise TypeError(
|
| + 'Expect type %s for parameter %s' % (val_type.__name__, name))
|
| + if val < 0:
|
| + raise ValueError(
|
| + 'Value for parameter %s has to be greater than 0' % name)
|
| + if not can_be_zero and val == 0:
|
| + raise ValueError(
|
| + 'Value for parameter %s can not be 0' % name)
|
| + return val
|
| +
|
| + def belong_to_current_request(self):
|
| + return os.getenv('REQUEST_LOG_ID') == self._request_id
|
| +
|
| + def delay(self, n, start_time):
|
| + """Calculate delay before the next retry.
|
| +
|
| + Args:
|
| + n: the number of current attempt. The first attempt should be 1.
|
| + start_time: the time when retry started in unix time.
|
| +
|
| + Returns:
|
| + Number of seconds to wait before next retry. -1 if retry should give up.
|
| + """
|
| + if (n > self.max_retries or
|
| + (n > self.min_retries and
|
| + time.time() - start_time > self.max_retry_period)):
|
| + return -1
|
| + return min(
|
| + math.pow(self.backoff_factor, n-1) * self.initial_delay,
|
| + self.max_delay)
|
| +
|
| +
|
| +def _run_until_rpc():
|
| + """Eagerly evaluate tasklets until it is blocking on some RPC.
|
| +
|
| + Usually ndb eventloop el isn't run until some code calls future.get_result().
|
| +
|
| + When an async tasklet is called, the tasklet wrapper evaluates the tasklet
|
| + code into a generator, enqueues a callback _help_tasklet_along onto
|
| + the el.current queue, and returns a future.
|
| +
|
| + _help_tasklet_along, when called by the el, will
|
| + get one yielded value from the generator. If the value if another future,
|
| + set up a callback _on_future_complete to invoke _help_tasklet_along
|
| + when the dependent future fulfills. If the value if a RPC, set up a
|
| + callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
|
| + Thus _help_tasklet_along drills down
|
| + the chain of futures until some future is blocked by RPC. El runs
|
| + all callbacks and constantly check pending RPC status.
|
| + """
|
| + el = eventloop.get_event_loop()
|
| + while el.current:
|
| + el.run0()
|
| +
|
| +
|
| +def _eager_tasklet(tasklet):
|
| + """Decorator to turn tasklet to run eagerly."""
|
| +
|
| + @utils.wrapping(tasklet)
|
| + def eager_wrapper(*args, **kwds):
|
| + fut = tasklet(*args, **kwds)
|
| + _run_until_rpc()
|
| + return fut
|
| +
|
| + return eager_wrapper
|
|
|