Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Unified Diff: third_party/cloud_storage/cloudstorage/api_utils.py

Issue 1031663002: Increase maximum file upload to 100MB, use cloudstorage python library (Closed) Base URL: https://github.com/dart-lang/pub-dartlang.git@master
Patch Set: Add deprecation comment to old cloud_storage.py:open() function Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/cloud_storage/cloudstorage/api_utils.py
diff --git a/third_party/cloud_storage/cloudstorage/api_utils.py b/third_party/cloud_storage/cloudstorage/api_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..680ac6bcbfadb5eae4582851a22d63d67b92363c
--- /dev/null
+++ b/third_party/cloud_storage/cloudstorage/api_utils.py
@@ -0,0 +1,353 @@
+# Copyright 2013 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+# either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+"""Util functions and classes for cloudstorage_api."""
+
+
+
+__all__ = ['set_default_retry_params',
+ 'RetryParams',
+ ]
+
+import copy
+import httplib
+import logging
+import math
+import os
+import threading
+import time
+import urllib
+
+
+try:
+ from google.appengine.api import app_identity
+ from google.appengine.api import urlfetch
+ from google.appengine.datastore import datastore_rpc
+ from google.appengine.ext import ndb
+ from google.appengine.ext.ndb import eventloop
+ from google.appengine.ext.ndb import tasklets
+ from google.appengine.ext.ndb import utils
+ from google.appengine import runtime
+ from google.appengine.runtime import apiproxy_errors
+except ImportError:
+ from google.appengine.api import app_identity
+ from google.appengine.api import urlfetch
+ from google.appengine.datastore import datastore_rpc
+ from google.appengine import runtime
+ from google.appengine.runtime import apiproxy_errors
+ from google.appengine.ext import ndb
+ from google.appengine.ext.ndb import eventloop
+ from google.appengine.ext.ndb import tasklets
+ from google.appengine.ext.ndb import utils
+
+
+_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
+ apiproxy_errors.Error,
+ app_identity.InternalError,
+ app_identity.BackendDeadlineExceeded)
+
+_thread_local_settings = threading.local()
+_thread_local_settings.default_retry_params = None
+
+
+def set_default_retry_params(retry_params):
+ """Set a default RetryParams for current thread current request."""
+ _thread_local_settings.default_retry_params = copy.copy(retry_params)
+
+
+def _get_default_retry_params():
+ """Get default RetryParams for current request and current thread.
+
+ Returns:
+ A new instance of the default RetryParams.
+ """
+ default = getattr(_thread_local_settings, 'default_retry_params', None)
+ if default is None or not default.belong_to_current_request():
+ return RetryParams()
+ else:
+ return copy.copy(default)
+
+
+def _quote_filename(filename):
+ """Quotes filename to use as a valid URI path.
+
+ Args:
+ filename: user provided filename. /bucket/filename.
+
+ Returns:
+ The filename properly quoted to use as URI's path component.
+ """
+ return urllib.quote(filename)
+
+
+def _unquote_filename(filename):
+ """Unquotes a valid URI path back to its filename.
+
+ This is the opposite of _quote_filename.
+
+ Args:
+ filename: a quoted filename. /bucket/some%20filename.
+
+ Returns:
+ The filename unquoted.
+ """
+ return urllib.unquote(filename)
+
+
+def _should_retry(resp):
+ """Given a urlfetch response, decide whether to retry that request."""
+ return (resp.status_code == httplib.REQUEST_TIMEOUT or
+ (resp.status_code >= 500 and
+ resp.status_code < 600))
+
+
+class _RetryWrapper(object):
+ """A wrapper that wraps retry logic around any tasklet."""
+
+ def __init__(self,
+ retry_params,
+ retriable_exceptions=_RETRIABLE_EXCEPTIONS,
+ should_retry=lambda r: False):
+ """Init.
+
+ Args:
+ retry_params: an RetryParams instance.
+ retriable_exceptions: a list of exception classes that are retriable.
+ should_retry: a function that takes a result from the tasklet and returns
+ a boolean. True if the result should be retried.
+ """
+ self.retry_params = retry_params
+ self.retriable_exceptions = retriable_exceptions
+ self.should_retry = should_retry
+
+ @ndb.tasklet
+ def run(self, tasklet, **kwds):
+ """Run a tasklet with retry.
+
+ The retry should be transparent to the caller: if no results
+ are successful, the exception or result from the last retry is returned
+ to the caller.
+
+ Args:
+ tasklet: the tasklet to run.
+ **kwds: keywords arguments to run the tasklet.
+
+ Raises:
+ The exception from running the tasklet.
+
+ Returns:
+ The result from running the tasklet.
+ """
+ start_time = time.time()
+ n = 1
+
+ while True:
+ e = None
+ result = None
+ got_result = False
+
+ try:
+ result = yield tasklet(**kwds)
+ got_result = True
+ if not self.should_retry(result):
+ raise ndb.Return(result)
+ except runtime.DeadlineExceededError:
+ logging.debug(
+ 'Tasklet has exceeded request deadline after %s seconds total',
+ time.time() - start_time)
+ raise
+ except self.retriable_exceptions, e:
+ pass
+
+ if n == 1:
+ logging.debug('Tasklet is %r', tasklet)
+
+ delay = self.retry_params.delay(n, start_time)
+
+ if delay <= 0:
+ logging.debug(
+ 'Tasklet failed after %s attempts and %s seconds in total',
+ n, time.time() - start_time)
+ if got_result:
+ raise ndb.Return(result)
+ elif e is not None:
+ raise e
+ else:
+ assert False, 'Should never reach here.'
+
+ if got_result:
+ logging.debug(
+ 'Got result %r from tasklet.', result)
+ else:
+ logging.debug(
+ 'Got exception "%r" from tasklet.', e)
+ logging.debug('Retry in %s seconds.', delay)
+ n += 1
+ yield tasklets.sleep(delay)
+
+
+class RetryParams(object):
+ """Retry configuration parameters."""
+
+ _DEFAULT_USER_AGENT = 'App Engine Python GCS Client'
+
+ @datastore_rpc._positional(1)
+ def __init__(self,
+ backoff_factor=2.0,
+ initial_delay=0.1,
+ max_delay=10.0,
+ min_retries=3,
+ max_retries=6,
+ max_retry_period=30.0,
+ urlfetch_timeout=None,
+ save_access_token=False,
+ _user_agent=None):
+ """Init.
+
+ This object is unique per request per thread.
+
+ Library will retry according to this setting when App Engine Server
+ can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
+ 500-600 response.
+
+ Args:
+ backoff_factor: exponential backoff multiplier.
+ initial_delay: seconds to delay for the first retry.
+ max_delay: max seconds to delay for every retry.
+ min_retries: min number of times to retry. This value is automatically
+ capped by max_retries.
+ max_retries: max number of times to retry. Set this to 0 for no retry.
+ max_retry_period: max total seconds spent on retry. Retry stops when
+ this period passed AND min_retries has been attempted.
+ urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
+ in which case the value will be chosen by urlfetch module.
+ save_access_token: persist access token to datastore to avoid
+ excessive usage of GetAccessToken API. Usually the token is cached
+ in process and in memcache. In some cases, memcache isn't very
+ reliable.
+ _user_agent: The user agent string that you want to use in your requests.
+ """
+ self.backoff_factor = self._check('backoff_factor', backoff_factor)
+ self.initial_delay = self._check('initial_delay', initial_delay)
+ self.max_delay = self._check('max_delay', max_delay)
+ self.max_retry_period = self._check('max_retry_period', max_retry_period)
+ self.max_retries = self._check('max_retries', max_retries, True, int)
+ self.min_retries = self._check('min_retries', min_retries, True, int)
+ if self.min_retries > self.max_retries:
+ self.min_retries = self.max_retries
+
+ self.urlfetch_timeout = None
+ if urlfetch_timeout is not None:
+ self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
+ self.save_access_token = self._check('save_access_token', save_access_token,
+ True, bool)
+ self._user_agent = _user_agent or self._DEFAULT_USER_AGENT
+
+ self._request_id = os.getenv('REQUEST_LOG_ID')
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return False
+ return self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ @classmethod
+ def _check(cls, name, val, can_be_zero=False, val_type=float):
+ """Check init arguments.
+
+ Args:
+ name: name of the argument. For logging purpose.
+ val: value. Value has to be non negative number.
+ can_be_zero: whether value can be zero.
+ val_type: Python type of the value.
+
+ Returns:
+ The value.
+
+ Raises:
+ ValueError: when invalid value is passed in.
+ TypeError: when invalid value type is passed in.
+ """
+ valid_types = [val_type]
+ if val_type is float:
+ valid_types.append(int)
+
+ if type(val) not in valid_types:
+ raise TypeError(
+ 'Expect type %s for parameter %s' % (val_type.__name__, name))
+ if val < 0:
+ raise ValueError(
+ 'Value for parameter %s has to be greater than 0' % name)
+ if not can_be_zero and val == 0:
+ raise ValueError(
+ 'Value for parameter %s can not be 0' % name)
+ return val
+
+ def belong_to_current_request(self):
+ return os.getenv('REQUEST_LOG_ID') == self._request_id
+
+ def delay(self, n, start_time):
+ """Calculate delay before the next retry.
+
+ Args:
+ n: the number of current attempt. The first attempt should be 1.
+ start_time: the time when retry started in unix time.
+
+ Returns:
+ Number of seconds to wait before next retry. -1 if retry should give up.
+ """
+ if (n > self.max_retries or
+ (n > self.min_retries and
+ time.time() - start_time > self.max_retry_period)):
+ return -1
+ return min(
+ math.pow(self.backoff_factor, n-1) * self.initial_delay,
+ self.max_delay)
+
+
+def _run_until_rpc():
+ """Eagerly evaluate tasklets until it is blocking on some RPC.
+
+ Usually ndb eventloop el isn't run until some code calls future.get_result().
+
+ When an async tasklet is called, the tasklet wrapper evaluates the tasklet
+ code into a generator, enqueues a callback _help_tasklet_along onto
+ the el.current queue, and returns a future.
+
+ _help_tasklet_along, when called by the el, will
+ get one yielded value from the generator. If the value if another future,
+ set up a callback _on_future_complete to invoke _help_tasklet_along
+ when the dependent future fulfills. If the value if a RPC, set up a
+ callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
+ Thus _help_tasklet_along drills down
+ the chain of futures until some future is blocked by RPC. El runs
+ all callbacks and constantly check pending RPC status.
+ """
+ el = eventloop.get_event_loop()
+ while el.current:
+ el.run0()
+
+
+def _eager_tasklet(tasklet):
+ """Decorator to turn tasklet to run eagerly."""
+
+ @utils.wrapping(tasklet)
+ def eager_wrapper(*args, **kwds):
+ fut = tasklet(*args, **kwds)
+ _run_until_rpc()
+ return fut
+
+ return eager_wrapper
« no previous file with comments | « third_party/cloud_storage/cloudstorage/__init__.py ('k') | third_party/cloud_storage/cloudstorage/cloudstorage_api.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698