OLD | NEW |
(Empty) | |
| 1 # Copyright 2010 Google Inc. |
| 2 # |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a |
| 4 # copy of this software and associated documentation files (the |
| 5 # "Software"), to deal in the Software without restriction, including |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- |
| 9 # lowing conditions: |
| 10 # |
| 11 # The above copyright notice and this permission notice shall be included |
| 12 # in all copies or substantial portions of the Software. |
| 13 # |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 20 # IN THE SOFTWARE. |
| 21 |
| 22 import errno |
| 23 import httplib |
| 24 import os |
| 25 import re |
| 26 import socket |
| 27 import time |
| 28 import boto |
| 29 from boto import config, storage_uri_for_key |
| 30 from boto.connection import AWSAuthConnection |
| 31 from boto.exception import ResumableDownloadException |
| 32 from boto.exception import ResumableTransferDisposition |
| 33 from boto.s3.keyfile import KeyFile |
| 34 |
| 35 """ |
| 36 Resumable download handler. |
| 37 |
| 38 Resumable downloads will retry failed downloads, resuming at the byte count |
| 39 completed by the last download attempt. If too many retries happen with no |
| 40 progress (per configurable num_retries param), the download will be aborted. |
| 41 |
| 42 The caller can optionally specify a tracker_file_name param in the |
| 43 ResumableDownloadHandler constructor. If you do this, that file will |
| 44 save the state needed to allow retrying later, in a separate process |
| 45 (e.g., in a later run of gsutil). |
| 46 |
| 47 Note that resumable downloads work across providers (they depend only |
| 48 on support Range GETs), but this code is in the boto.s3 package |
| 49 because it is the wrong abstraction level to go in the top-level boto |
| 50 package. |
| 51 |
| 52 TODO: At some point we should refactor the code to have a storage_service |
| 53 package where all these provider-independent files go. |
| 54 """ |
| 55 |
| 56 |
| 57 class ByteTranslatingCallbackHandler(object): |
| 58 """ |
| 59 Proxy class that translates progress callbacks made by |
| 60 boto.s3.Key.get_file(), taking into account that we're resuming |
| 61 a download. |
| 62 """ |
| 63 def __init__(self, proxied_cb, download_start_point): |
| 64 self.proxied_cb = proxied_cb |
| 65 self.download_start_point = download_start_point |
| 66 |
| 67 def call(self, total_bytes_uploaded, total_size): |
| 68 self.proxied_cb(self.download_start_point + total_bytes_uploaded, |
| 69 total_size) |
| 70 |
| 71 |
| 72 def get_cur_file_size(fp, position_to_eof=False): |
| 73 """ |
| 74 Returns size of file, optionally leaving fp positioned at EOF. |
| 75 """ |
| 76 if isinstance(fp, KeyFile) and not position_to_eof: |
| 77 # Avoid EOF seek for KeyFile case as it's very inefficient. |
| 78 return fp.getkey().size |
| 79 if not position_to_eof: |
| 80 cur_pos = fp.tell() |
| 81 fp.seek(0, os.SEEK_END) |
| 82 cur_file_size = fp.tell() |
| 83 if not position_to_eof: |
| 84 fp.seek(cur_pos, os.SEEK_SET) |
| 85 return cur_file_size |
| 86 |
| 87 |
| 88 class ResumableDownloadHandler(object): |
| 89 """ |
| 90 Handler for resumable downloads. |
| 91 """ |
| 92 |
| 93 ETAG_REGEX = '([a-z0-9]{32})\n' |
| 94 |
| 95 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, |
| 96 socket.gaierror) |
| 97 |
| 98 def __init__(self, tracker_file_name=None, num_retries=None): |
| 99 """ |
| 100 Constructor. Instantiate once for each downloaded file. |
| 101 |
| 102 :type tracker_file_name: string |
| 103 :param tracker_file_name: optional file name to save tracking info |
| 104 about this download. If supplied and the current process fails |
| 105 the download, it can be retried in a new process. If called |
| 106 with an existing file containing an unexpired timestamp, |
| 107 we'll resume the transfer for this file; else we'll start a |
| 108 new resumable download. |
| 109 |
| 110 :type num_retries: int |
| 111 :param num_retries: the number of times we'll re-try a resumable |
| 112 download making no progress. (Count resets every time we get |
| 113 progress, so download can span many more than this number of |
| 114 retries.) |
| 115 """ |
| 116 self.tracker_file_name = tracker_file_name |
| 117 self.num_retries = num_retries |
| 118 self.etag_value_for_current_download = None |
| 119 if tracker_file_name: |
| 120 self._load_tracker_file_etag() |
| 121 # Save download_start_point in instance state so caller can |
| 122 # find how much was transferred by this ResumableDownloadHandler |
| 123 # (across retries). |
| 124 self.download_start_point = None |
| 125 |
| 126 def _load_tracker_file_etag(self): |
| 127 f = None |
| 128 try: |
| 129 f = open(self.tracker_file_name, 'r') |
| 130 etag_line = f.readline() |
| 131 m = re.search(self.ETAG_REGEX, etag_line) |
| 132 if m: |
| 133 self.etag_value_for_current_download = m.group(1) |
| 134 else: |
| 135 print('Couldn\'t read etag in tracker file (%s). Restarting ' |
| 136 'download from scratch.' % self.tracker_file_name) |
| 137 except IOError, e: |
| 138 # Ignore non-existent file (happens first time a download |
| 139 # is attempted on an object), but warn user for other errors. |
| 140 if e.errno != errno.ENOENT: |
| 141 # Will restart because |
| 142 # self.etag_value_for_current_download == None. |
| 143 print('Couldn\'t read URI tracker file (%s): %s. Restarting ' |
| 144 'download from scratch.' % |
| 145 (self.tracker_file_name, e.strerror)) |
| 146 finally: |
| 147 if f: |
| 148 f.close() |
| 149 |
| 150 def _save_tracker_info(self, key): |
| 151 self.etag_value_for_current_download = key.etag.strip('"\'') |
| 152 if not self.tracker_file_name: |
| 153 return |
| 154 f = None |
| 155 try: |
| 156 f = open(self.tracker_file_name, 'w') |
| 157 f.write('%s\n' % self.etag_value_for_current_download) |
| 158 except IOError, e: |
| 159 raise ResumableDownloadException( |
| 160 'Couldn\'t write tracker file (%s): %s.\nThis can happen' |
| 161 'if you\'re using an incorrectly configured download tool\n' |
| 162 '(e.g., gsutil configured to save tracker files to an ' |
| 163 'unwritable directory)' % |
| 164 (self.tracker_file_name, e.strerror), |
| 165 ResumableTransferDisposition.ABORT) |
| 166 finally: |
| 167 if f: |
| 168 f.close() |
| 169 |
| 170 def _remove_tracker_file(self): |
| 171 if (self.tracker_file_name and |
| 172 os.path.exists(self.tracker_file_name)): |
| 173 os.unlink(self.tracker_file_name) |
| 174 |
| 175 def _attempt_resumable_download(self, key, fp, headers, cb, num_cb, |
| 176 torrent, version_id): |
| 177 """ |
| 178 Attempts a resumable download. |
| 179 |
| 180 Raises ResumableDownloadException if any problems occur. |
| 181 """ |
| 182 cur_file_size = get_cur_file_size(fp, position_to_eof=True) |
| 183 |
| 184 if (cur_file_size and |
| 185 self.etag_value_for_current_download and |
| 186 self.etag_value_for_current_download == key.etag.strip('"\'')): |
| 187 # Try to resume existing transfer. |
| 188 if cur_file_size > key.size: |
| 189 raise ResumableDownloadException( |
| 190 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' |
| 191 'if you re-try this download it will start from scratch' % |
| 192 (fp.name, cur_file_size, str(storage_uri_for_key(key)), |
| 193 key.size), ResumableTransferDisposition.ABORT) |
| 194 elif cur_file_size == key.size: |
| 195 if key.bucket.connection.debug >= 1: |
| 196 print 'Download complete.' |
| 197 return |
| 198 if key.bucket.connection.debug >= 1: |
| 199 print 'Resuming download.' |
| 200 headers = headers.copy() |
| 201 headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1) |
| 202 cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call |
| 203 self.download_start_point = cur_file_size |
| 204 else: |
| 205 if key.bucket.connection.debug >= 1: |
| 206 print 'Starting new resumable download.' |
| 207 self._save_tracker_info(key) |
| 208 self.download_start_point = 0 |
| 209 # Truncate the file, in case a new resumable download is being |
| 210 # started atop an existing file. |
| 211 fp.truncate(0) |
| 212 |
| 213 # Disable AWSAuthConnection-level retry behavior, since that would |
| 214 # cause downloads to restart from scratch. |
| 215 key.get_file(fp, headers, cb, num_cb, torrent, version_id, |
| 216 override_num_retries=0) |
| 217 fp.flush() |
| 218 |
| 219 def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False, |
| 220 version_id=None): |
| 221 """ |
| 222 Retrieves a file from a Key |
| 223 :type key: :class:`boto.s3.key.Key` or subclass |
| 224 :param key: The Key object from which upload is to be downloaded |
| 225 |
| 226 :type fp: file |
| 227 :param fp: File pointer into which data should be downloaded |
| 228 |
| 229 :type headers: string |
| 230 :param: headers to send when retrieving the files |
| 231 |
| 232 :type cb: function |
| 233 :param cb: (optional) a callback function that will be called to report |
| 234 progress on the download. The callback should accept two integer |
| 235 parameters, the first representing the number of bytes that have |
| 236 been successfully transmitted from the storage service and |
| 237 the second representing the total number of bytes that need |
| 238 to be transmitted. |
| 239 |
| 240 :type num_cb: int |
| 241 :param num_cb: (optional) If a callback is specified with the cb |
| 242 parameter this parameter determines the granularity of the callback |
| 243 by defining the maximum number of times the callback will be |
| 244 called during the file transfer. |
| 245 |
| 246 :type torrent: bool |
| 247 :param torrent: Flag for whether to get a torrent for the file |
| 248 |
| 249 :type version_id: string |
| 250 :param version_id: The version ID (optional) |
| 251 |
| 252 Raises ResumableDownloadException if a problem occurs during |
| 253 the transfer. |
| 254 """ |
| 255 |
| 256 debug = key.bucket.connection.debug |
| 257 if not headers: |
| 258 headers = {} |
| 259 |
| 260 # Use num-retries from constructor if one was provided; else check |
| 261 # for a value specified in the boto config file; else default to 5. |
| 262 if self.num_retries is None: |
| 263 self.num_retries = config.getint('Boto', 'num_retries', 5) |
| 264 progress_less_iterations = 0 |
| 265 |
| 266 while True: # Retry as long as we're making progress. |
| 267 had_file_bytes_before_attempt = get_cur_file_size(fp) |
| 268 try: |
| 269 self._attempt_resumable_download(key, fp, headers, cb, num_cb, |
| 270 torrent, version_id) |
| 271 # Download succceded, so remove the tracker file (if have one). |
| 272 self._remove_tracker_file() |
| 273 # Previously, check_final_md5() was called here to validate |
| 274 # downloaded file's checksum, however, to be consistent with |
| 275 # non-resumable downloads, this call was removed. Checksum |
| 276 # validation of file contents should be done by the caller. |
| 277 if debug >= 1: |
| 278 print 'Resumable download complete.' |
| 279 return |
| 280 except self.RETRYABLE_EXCEPTIONS, e: |
| 281 if debug >= 1: |
| 282 print('Caught exception (%s)' % e.__repr__()) |
| 283 if isinstance(e, IOError) and e.errno == errno.EPIPE: |
| 284 # Broken pipe error causes httplib to immediately |
| 285 # close the socket (http://bugs.python.org/issue5542), |
| 286 # so we need to close and reopen the key before resuming |
| 287 # the download. |
| 288 key.get_file(fp, headers, cb, num_cb, torrent, version_id, |
| 289 override_num_retries=0) |
| 290 except ResumableDownloadException, e: |
| 291 if (e.disposition == |
| 292 ResumableTransferDisposition.ABORT_CUR_PROCESS): |
| 293 if debug >= 1: |
| 294 print('Caught non-retryable ResumableDownloadException ' |
| 295 '(%s)' % e.message) |
| 296 raise |
| 297 elif (e.disposition == |
| 298 ResumableTransferDisposition.ABORT): |
| 299 if debug >= 1: |
| 300 print('Caught non-retryable ResumableDownloadException ' |
| 301 '(%s); aborting and removing tracker file' % |
| 302 e.message) |
| 303 self._remove_tracker_file() |
| 304 raise |
| 305 else: |
| 306 if debug >= 1: |
| 307 print('Caught ResumableDownloadException (%s) - will ' |
| 308 'retry' % e.message) |
| 309 |
| 310 # At this point we had a re-tryable failure; see if made progress. |
| 311 if get_cur_file_size(fp) > had_file_bytes_before_attempt: |
| 312 progress_less_iterations = 0 |
| 313 else: |
| 314 progress_less_iterations += 1 |
| 315 |
| 316 if progress_less_iterations > self.num_retries: |
| 317 # Don't retry any longer in the current process. |
| 318 raise ResumableDownloadException( |
| 319 'Too many resumable download attempts failed without ' |
| 320 'progress. You might try this download again later', |
| 321 ResumableTransferDisposition.ABORT_CUR_PROCESS) |
| 322 |
| 323 # Close the key, in case a previous download died partway |
| 324 # through and left data in the underlying key HTTP buffer. |
| 325 # Do this within a try/except block in case the connection is |
| 326 # closed (since key.close() attempts to do a final read, in which |
| 327 # case this read attempt would get an IncompleteRead exception, |
| 328 # which we can safely ignore. |
| 329 try: |
| 330 key.close() |
| 331 except httplib.IncompleteRead: |
| 332 pass |
| 333 |
| 334 sleep_time_secs = 2**progress_less_iterations |
| 335 if debug >= 1: |
| 336 print('Got retryable failure (%d progress-less in a row).\n' |
| 337 'Sleeping %d seconds before re-trying' % |
| 338 (progress_less_iterations, sleep_time_secs)) |
| 339 time.sleep(sleep_time_secs) |
OLD | NEW |