Index: third_party/gsutil/boto/boto/s3/resumable_download_handler.py |
diff --git a/third_party/gsutil/boto/boto/s3/resumable_download_handler.py b/third_party/gsutil/boto/boto/s3/resumable_download_handler.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..32f704165b1a55cf5b3f09b387c5c92be7b701fc |
--- /dev/null |
+++ b/third_party/gsutil/boto/boto/s3/resumable_download_handler.py |
@@ -0,0 +1,339 @@ |
+# Copyright 2010 Google Inc. |
+# |
+# Permission is hereby granted, free of charge, to any person obtaining a |
+# copy of this software and associated documentation files (the |
+# "Software"), to deal in the Software without restriction, including |
+# without limitation the rights to use, copy, modify, merge, publish, dis- |
+# tribute, sublicense, and/or sell copies of the Software, and to permit |
+# persons to whom the Software is furnished to do so, subject to the fol- |
+# lowing conditions: |
+# |
+# The above copyright notice and this permission notice shall be included |
+# in all copies or substantial portions of the Software. |
+# |
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
+# IN THE SOFTWARE. |
+ |
+import errno |
+import httplib |
+import os |
+import re |
+import socket |
+import time |
+import boto |
+from boto import config, storage_uri_for_key |
+from boto.connection import AWSAuthConnection |
+from boto.exception import ResumableDownloadException |
+from boto.exception import ResumableTransferDisposition |
+from boto.s3.keyfile import KeyFile |
+ |
+""" |
+Resumable download handler. |
+ |
+Resumable downloads will retry failed downloads, resuming at the byte count |
+completed by the last download attempt. If too many retries happen with no |
+progress (per configurable num_retries param), the download will be aborted. |
+ |
+The caller can optionally specify a tracker_file_name param in the |
+ResumableDownloadHandler constructor. If you do this, that file will |
+save the state needed to allow retrying later, in a separate process |
+(e.g., in a later run of gsutil). |
+ |
+Note that resumable downloads work across providers (they depend only |
+on support Range GETs), but this code is in the boto.s3 package |
+because it is the wrong abstraction level to go in the top-level boto |
+package. |
+ |
+TODO: At some point we should refactor the code to have a storage_service |
+package where all these provider-independent files go. |
+""" |
+ |
+ |
+class ByteTranslatingCallbackHandler(object): |
+ """ |
+ Proxy class that translates progress callbacks made by |
+ boto.s3.Key.get_file(), taking into account that we're resuming |
+ a download. |
+ """ |
+ def __init__(self, proxied_cb, download_start_point): |
+ self.proxied_cb = proxied_cb |
+ self.download_start_point = download_start_point |
+ |
+ def call(self, total_bytes_uploaded, total_size): |
+ self.proxied_cb(self.download_start_point + total_bytes_uploaded, |
+ total_size) |
+ |
+ |
+def get_cur_file_size(fp, position_to_eof=False): |
+ """ |
+ Returns size of file, optionally leaving fp positioned at EOF. |
+ """ |
+ if isinstance(fp, KeyFile) and not position_to_eof: |
+ # Avoid EOF seek for KeyFile case as it's very inefficient. |
+ return fp.getkey().size |
+ if not position_to_eof: |
+ cur_pos = fp.tell() |
+ fp.seek(0, os.SEEK_END) |
+ cur_file_size = fp.tell() |
+ if not position_to_eof: |
+ fp.seek(cur_pos, os.SEEK_SET) |
+ return cur_file_size |
+ |
+ |
+class ResumableDownloadHandler(object): |
+ """ |
+ Handler for resumable downloads. |
+ """ |
+ |
+ ETAG_REGEX = '([a-z0-9]{32})\n' |
+ |
+ RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, |
+ socket.gaierror) |
+ |
+ def __init__(self, tracker_file_name=None, num_retries=None): |
+ """ |
+ Constructor. Instantiate once for each downloaded file. |
+ |
+ :type tracker_file_name: string |
+ :param tracker_file_name: optional file name to save tracking info |
+ about this download. If supplied and the current process fails |
+ the download, it can be retried in a new process. If called |
+ with an existing file containing an unexpired timestamp, |
+ we'll resume the transfer for this file; else we'll start a |
+ new resumable download. |
+ |
+ :type num_retries: int |
+ :param num_retries: the number of times we'll re-try a resumable |
+ download making no progress. (Count resets every time we get |
+ progress, so download can span many more than this number of |
+ retries.) |
+ """ |
+ self.tracker_file_name = tracker_file_name |
+ self.num_retries = num_retries |
+ self.etag_value_for_current_download = None |
+ if tracker_file_name: |
+ self._load_tracker_file_etag() |
+ # Save download_start_point in instance state so caller can |
+ # find how much was transferred by this ResumableDownloadHandler |
+ # (across retries). |
+ self.download_start_point = None |
+ |
+ def _load_tracker_file_etag(self): |
+ f = None |
+ try: |
+ f = open(self.tracker_file_name, 'r') |
+ etag_line = f.readline() |
+ m = re.search(self.ETAG_REGEX, etag_line) |
+ if m: |
+ self.etag_value_for_current_download = m.group(1) |
+ else: |
+ print('Couldn\'t read etag in tracker file (%s). Restarting ' |
+ 'download from scratch.' % self.tracker_file_name) |
+ except IOError, e: |
+ # Ignore non-existent file (happens first time a download |
+ # is attempted on an object), but warn user for other errors. |
+ if e.errno != errno.ENOENT: |
+ # Will restart because |
+ # self.etag_value_for_current_download == None. |
+ print('Couldn\'t read URI tracker file (%s): %s. Restarting ' |
+ 'download from scratch.' % |
+ (self.tracker_file_name, e.strerror)) |
+ finally: |
+ if f: |
+ f.close() |
+ |
+ def _save_tracker_info(self, key): |
+ self.etag_value_for_current_download = key.etag.strip('"\'') |
+ if not self.tracker_file_name: |
+ return |
+ f = None |
+ try: |
+ f = open(self.tracker_file_name, 'w') |
+ f.write('%s\n' % self.etag_value_for_current_download) |
+ except IOError, e: |
+ raise ResumableDownloadException( |
+ 'Couldn\'t write tracker file (%s): %s.\nThis can happen' |
+ 'if you\'re using an incorrectly configured download tool\n' |
+ '(e.g., gsutil configured to save tracker files to an ' |
+ 'unwritable directory)' % |
+ (self.tracker_file_name, e.strerror), |
+ ResumableTransferDisposition.ABORT) |
+ finally: |
+ if f: |
+ f.close() |
+ |
+ def _remove_tracker_file(self): |
+ if (self.tracker_file_name and |
+ os.path.exists(self.tracker_file_name)): |
+ os.unlink(self.tracker_file_name) |
+ |
+ def _attempt_resumable_download(self, key, fp, headers, cb, num_cb, |
+ torrent, version_id): |
+ """ |
+ Attempts a resumable download. |
+ |
+ Raises ResumableDownloadException if any problems occur. |
+ """ |
+ cur_file_size = get_cur_file_size(fp, position_to_eof=True) |
+ |
+ if (cur_file_size and |
+ self.etag_value_for_current_download and |
+ self.etag_value_for_current_download == key.etag.strip('"\'')): |
+ # Try to resume existing transfer. |
+ if cur_file_size > key.size: |
+ raise ResumableDownloadException( |
+ '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' |
+ 'if you re-try this download it will start from scratch' % |
+ (fp.name, cur_file_size, str(storage_uri_for_key(key)), |
+ key.size), ResumableTransferDisposition.ABORT) |
+ elif cur_file_size == key.size: |
+ if key.bucket.connection.debug >= 1: |
+ print 'Download complete.' |
+ return |
+ if key.bucket.connection.debug >= 1: |
+ print 'Resuming download.' |
+ headers = headers.copy() |
+ headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1) |
+ cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call |
+ self.download_start_point = cur_file_size |
+ else: |
+ if key.bucket.connection.debug >= 1: |
+ print 'Starting new resumable download.' |
+ self._save_tracker_info(key) |
+ self.download_start_point = 0 |
+ # Truncate the file, in case a new resumable download is being |
+ # started atop an existing file. |
+ fp.truncate(0) |
+ |
+ # Disable AWSAuthConnection-level retry behavior, since that would |
+ # cause downloads to restart from scratch. |
+ key.get_file(fp, headers, cb, num_cb, torrent, version_id, |
+ override_num_retries=0) |
+ fp.flush() |
+ |
+ def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False, |
+ version_id=None): |
+ """ |
+ Retrieves a file from a Key |
+ :type key: :class:`boto.s3.key.Key` or subclass |
+ :param key: The Key object from which upload is to be downloaded |
+ |
+ :type fp: file |
+ :param fp: File pointer into which data should be downloaded |
+ |
+ :type headers: string |
+ :param: headers to send when retrieving the files |
+ |
+ :type cb: function |
+ :param cb: (optional) a callback function that will be called to report |
+ progress on the download. The callback should accept two integer |
+ parameters, the first representing the number of bytes that have |
+ been successfully transmitted from the storage service and |
+ the second representing the total number of bytes that need |
+ to be transmitted. |
+ |
+ :type num_cb: int |
+ :param num_cb: (optional) If a callback is specified with the cb |
+ parameter this parameter determines the granularity of the callback |
+ by defining the maximum number of times the callback will be |
+ called during the file transfer. |
+ |
+ :type torrent: bool |
+ :param torrent: Flag for whether to get a torrent for the file |
+ |
+ :type version_id: string |
+ :param version_id: The version ID (optional) |
+ |
+ Raises ResumableDownloadException if a problem occurs during |
+ the transfer. |
+ """ |
+ |
+ debug = key.bucket.connection.debug |
+ if not headers: |
+ headers = {} |
+ |
+ # Use num-retries from constructor if one was provided; else check |
+ # for a value specified in the boto config file; else default to 5. |
+ if self.num_retries is None: |
+ self.num_retries = config.getint('Boto', 'num_retries', 5) |
+ progress_less_iterations = 0 |
+ |
+ while True: # Retry as long as we're making progress. |
+ had_file_bytes_before_attempt = get_cur_file_size(fp) |
+ try: |
+ self._attempt_resumable_download(key, fp, headers, cb, num_cb, |
+ torrent, version_id) |
+ # Download succceded, so remove the tracker file (if have one). |
+ self._remove_tracker_file() |
+ # Previously, check_final_md5() was called here to validate |
+ # downloaded file's checksum, however, to be consistent with |
+ # non-resumable downloads, this call was removed. Checksum |
+ # validation of file contents should be done by the caller. |
+ if debug >= 1: |
+ print 'Resumable download complete.' |
+ return |
+ except self.RETRYABLE_EXCEPTIONS, e: |
+ if debug >= 1: |
+ print('Caught exception (%s)' % e.__repr__()) |
+ if isinstance(e, IOError) and e.errno == errno.EPIPE: |
+ # Broken pipe error causes httplib to immediately |
+ # close the socket (http://bugs.python.org/issue5542), |
+ # so we need to close and reopen the key before resuming |
+ # the download. |
+ key.get_file(fp, headers, cb, num_cb, torrent, version_id, |
+ override_num_retries=0) |
+ except ResumableDownloadException, e: |
+ if (e.disposition == |
+ ResumableTransferDisposition.ABORT_CUR_PROCESS): |
+ if debug >= 1: |
+ print('Caught non-retryable ResumableDownloadException ' |
+ '(%s)' % e.message) |
+ raise |
+ elif (e.disposition == |
+ ResumableTransferDisposition.ABORT): |
+ if debug >= 1: |
+ print('Caught non-retryable ResumableDownloadException ' |
+ '(%s); aborting and removing tracker file' % |
+ e.message) |
+ self._remove_tracker_file() |
+ raise |
+ else: |
+ if debug >= 1: |
+ print('Caught ResumableDownloadException (%s) - will ' |
+ 'retry' % e.message) |
+ |
+ # At this point we had a re-tryable failure; see if made progress. |
+ if get_cur_file_size(fp) > had_file_bytes_before_attempt: |
+ progress_less_iterations = 0 |
+ else: |
+ progress_less_iterations += 1 |
+ |
+ if progress_less_iterations > self.num_retries: |
+ # Don't retry any longer in the current process. |
+ raise ResumableDownloadException( |
+ 'Too many resumable download attempts failed without ' |
+ 'progress. You might try this download again later', |
+ ResumableTransferDisposition.ABORT_CUR_PROCESS) |
+ |
+ # Close the key, in case a previous download died partway |
+ # through and left data in the underlying key HTTP buffer. |
+ # Do this within a try/except block in case the connection is |
+ # closed (since key.close() attempts to do a final read, in which |
+ # case this read attempt would get an IncompleteRead exception, |
+ # which we can safely ignore. |
+ try: |
+ key.close() |
+ except httplib.IncompleteRead: |
+ pass |
+ |
+ sleep_time_secs = 2**progress_less_iterations |
+ if debug >= 1: |
+ print('Got retryable failure (%d progress-less in a row).\n' |
+ 'Sleeping %d seconds before re-trying' % |
+ (progress_less_iterations, sleep_time_secs)) |
+ time.sleep(sleep_time_secs) |