| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2010 Google Inc. |
| 2 # |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a |
| 4 # copy of this software and associated documentation files (the |
| 5 # "Software"), to deal in the Software without restriction, including |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- |
| 9 # lowing conditions: |
| 10 # |
| 11 # The above copyright notice and this permission notice shall be included |
| 12 # in all copies or substantial portions of the Software. |
| 13 # |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 20 # IN THE SOFTWARE. |
| 21 |
| 22 import errno |
| 23 import httplib |
| 24 import os |
| 25 import re |
| 26 import socket |
| 27 import time |
| 28 import boto |
| 29 from boto import config, storage_uri_for_key |
| 30 from boto.connection import AWSAuthConnection |
| 31 from boto.exception import ResumableDownloadException |
| 32 from boto.exception import ResumableTransferDisposition |
| 33 |
| 34 """ |
| 35 Resumable download handler. |
| 36 |
| 37 Resumable downloads will retry failed downloads, resuming at the byte count |
| 38 completed by the last download attempt. If too many retries happen with no |
| 39 progress (per configurable num_retries param), the download will be aborted. |
| 40 |
| 41 The caller can optionally specify a tracker_file_name param in the |
| 42 ResumableDownloadHandler constructor. If you do this, that file will |
| 43 save the state needed to allow retrying later, in a separate process |
| 44 (e.g., in a later run of gsutil). |
| 45 |
| 46 Note that resumable downloads work across providers (they depend only |
| 47 on support Range GETs), but this code is in the boto.s3 package |
| 48 because it is the wrong abstraction level to go in the top-level boto |
| 49 package. |
| 50 |
| 51 TODO: At some point we should refactor the code to have a storage_service |
| 52 package where all these provider-independent files go. |
| 53 """ |
| 54 |
| 55 |
| 56 class ByteTranslatingCallbackHandler(object): |
| 57 """ |
| 58 Proxy class that translates progress callbacks made by |
| 59 boto.s3.Key.get_file(), taking into account that we're resuming |
| 60 a download. |
| 61 """ |
| 62 def __init__(self, proxied_cb, download_start_point): |
| 63 self.proxied_cb = proxied_cb |
| 64 self.download_start_point = download_start_point |
| 65 |
| 66 def call(self, total_bytes_uploaded, total_size): |
| 67 self.proxied_cb(self.download_start_point + total_bytes_uploaded, |
| 68 total_size) |
| 69 |
| 70 |
| 71 def get_cur_file_size(fp, position_to_eof=False): |
| 72 """ |
| 73 Returns size of file, optionally leaving fp positioned at EOF. |
| 74 """ |
| 75 if not position_to_eof: |
| 76 cur_pos = fp.tell() |
| 77 fp.seek(0, os.SEEK_END) |
| 78 cur_file_size = fp.tell() |
| 79 if not position_to_eof: |
| 80 fp.seek(cur_pos, os.SEEK_SET) |
| 81 return cur_file_size |
| 82 |
| 83 |
| 84 class ResumableDownloadHandler(object): |
| 85 """ |
| 86 Handler for resumable downloads. |
| 87 """ |
| 88 |
| 89 ETAG_REGEX = '([a-z0-9]{32})\n' |
| 90 |
| 91 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, |
| 92 socket.gaierror) |
| 93 |
| 94 def __init__(self, tracker_file_name=None, num_retries=None): |
| 95 """ |
| 96 Constructor. Instantiate once for each downloaded file. |
| 97 |
| 98 :type tracker_file_name: string |
| 99 :param tracker_file_name: optional file name to save tracking info |
| 100 about this download. If supplied and the current process fails |
| 101 the download, it can be retried in a new process. If called |
| 102 with an existing file containing an unexpired timestamp, |
| 103 we'll resume the transfer for this file; else we'll start a |
| 104 new resumable download. |
| 105 |
| 106 :type num_retries: int |
| 107 :param num_retries: the number of times we'll re-try a resumable |
| 108 download making no progress. (Count resets every time we get |
| 109 progress, so download can span many more than this number of |
| 110 retries.) |
| 111 """ |
| 112 self.tracker_file_name = tracker_file_name |
| 113 self.num_retries = num_retries |
| 114 self.etag_value_for_current_download = None |
| 115 if tracker_file_name: |
| 116 self._load_tracker_file_etag() |
| 117 # Save download_start_point in instance state so caller can |
| 118 # find how much was transferred by this ResumableDownloadHandler |
| 119 # (across retries). |
| 120 self.download_start_point = None |
| 121 |
| 122 def _load_tracker_file_etag(self): |
| 123 f = None |
| 124 try: |
| 125 f = open(self.tracker_file_name, 'r') |
| 126 etag_line = f.readline() |
| 127 m = re.search(self.ETAG_REGEX, etag_line) |
| 128 if m: |
| 129 self.etag_value_for_current_download = m.group(1) |
| 130 else: |
| 131 print('Couldn\'t read etag in tracker file (%s). Restarting ' |
| 132 'download from scratch.' % self.tracker_file_name) |
| 133 except IOError, e: |
| 134 # Ignore non-existent file (happens first time a download |
| 135 # is attempted on an object), but warn user for other errors. |
| 136 if e.errno != errno.ENOENT: |
| 137 # Will restart because |
| 138 # self.etag_value_for_current_download == None. |
| 139 print('Couldn\'t read URI tracker file (%s): %s. Restarting ' |
| 140 'download from scratch.' % |
| 141 (self.tracker_file_name, e.strerror)) |
| 142 finally: |
| 143 if f: |
| 144 f.close() |
| 145 |
| 146 def _save_tracker_info(self, key): |
| 147 self.etag_value_for_current_download = key.etag.strip('"\'') |
| 148 if not self.tracker_file_name: |
| 149 return |
| 150 f = None |
| 151 try: |
| 152 f = open(self.tracker_file_name, 'w') |
| 153 f.write('%s\n' % self.etag_value_for_current_download) |
| 154 except IOError, e: |
| 155 raise ResumableDownloadException( |
| 156 'Couldn\'t write tracker file (%s): %s.\nThis can happen' |
| 157 'if you\'re using an incorrectly configured download tool\n' |
| 158 '(e.g., gsutil configured to save tracker files to an ' |
| 159 'unwritable directory)' % |
| 160 (self.tracker_file_name, e.strerror), |
| 161 ResumableTransferDisposition.ABORT) |
| 162 finally: |
| 163 if f: |
| 164 f.close() |
| 165 |
| 166 def _remove_tracker_file(self): |
| 167 if (self.tracker_file_name and |
| 168 os.path.exists(self.tracker_file_name)): |
| 169 os.unlink(self.tracker_file_name) |
| 170 |
| 171 def _attempt_resumable_download(self, key, fp, headers, cb, num_cb, |
| 172 torrent, version_id): |
| 173 """ |
| 174 Attempts a resumable download. |
| 175 |
| 176 Raises ResumableDownloadException if any problems occur. |
| 177 """ |
| 178 cur_file_size = get_cur_file_size(fp, position_to_eof=True) |
| 179 |
| 180 if (cur_file_size and |
| 181 self.etag_value_for_current_download and |
| 182 self.etag_value_for_current_download == key.etag.strip('"\'')): |
| 183 # Try to resume existing transfer. |
| 184 if cur_file_size > key.size: |
| 185 raise ResumableDownloadException( |
| 186 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' |
| 187 'if you re-try this download it will start from scratch' % |
| 188 (fp.name, cur_file_size, str(storage_uri_for_key(key)), |
| 189 key.size), ResumableTransferDisposition.ABORT) |
| 190 elif cur_file_size == key.size: |
| 191 if key.bucket.connection.debug >= 1: |
| 192 print 'Download complete.' |
| 193 return |
| 194 if key.bucket.connection.debug >= 1: |
| 195 print 'Resuming download.' |
| 196 headers = headers.copy() |
| 197 headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1) |
| 198 cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call |
| 199 self.download_start_point = cur_file_size |
| 200 else: |
| 201 if key.bucket.connection.debug >= 1: |
| 202 print 'Starting new resumable download.' |
| 203 self._save_tracker_info(key) |
| 204 self.download_start_point = 0 |
| 205 # Truncate the file, in case a new resumable download is being |
| 206 # started atop an existing file. |
| 207 fp.truncate(0) |
| 208 |
| 209 # Disable AWSAuthConnection-level retry behavior, since that would |
| 210 # cause downloads to restart from scratch. |
| 211 key.get_file(fp, headers, cb, num_cb, torrent, version_id, |
| 212 override_num_retries=0) |
| 213 fp.flush() |
| 214 |
| 215 def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False, |
| 216 version_id=None): |
| 217 """ |
| 218 Retrieves a file from a Key |
| 219 :type key: :class:`boto.s3.key.Key` or subclass |
| 220 :param key: The Key object from which upload is to be downloaded |
| 221 |
| 222 :type fp: file |
| 223 :param fp: File pointer into which data should be downloaded |
| 224 |
| 225 :type headers: string |
| 226 :param: headers to send when retrieving the files |
| 227 |
| 228 :type cb: function |
| 229 :param cb: (optional) a callback function that will be called to report |
| 230 progress on the download. The callback should accept two integer |
| 231 parameters, the first representing the number of bytes that have |
| 232 been successfully transmitted from the storage service and |
| 233 the second representing the total number of bytes that need |
| 234 to be transmitted. |
| 235 |
| 236 :type num_cb: int |
| 237 :param num_cb: (optional) If a callback is specified with the cb |
| 238 parameter this parameter determines the granularity of the callback |
| 239 by defining the maximum number of times the callback will be |
| 240 called during the file transfer. |
| 241 |
| 242 :type torrent: bool |
| 243 :param torrent: Flag for whether to get a torrent for the file |
| 244 |
| 245 :type version_id: string |
| 246 :param version_id: The version ID (optional) |
| 247 |
| 248 Raises ResumableDownloadException if a problem occurs during |
| 249 the transfer. |
| 250 """ |
| 251 |
| 252 debug = key.bucket.connection.debug |
| 253 if not headers: |
| 254 headers = {} |
| 255 |
| 256 # Use num-retries from constructor if one was provided; else check |
| 257 # for a value specified in the boto config file; else default to 5. |
| 258 if self.num_retries is None: |
| 259 self.num_retries = config.getint('Boto', 'num_retries', 5) |
| 260 progress_less_iterations = 0 |
| 261 |
| 262 while True: # Retry as long as we're making progress. |
| 263 had_file_bytes_before_attempt = get_cur_file_size(fp) |
| 264 try: |
| 265 self._attempt_resumable_download(key, fp, headers, cb, num_cb, |
| 266 torrent, version_id) |
| 267 # Download succceded, so remove the tracker file (if have one). |
| 268 self._remove_tracker_file() |
| 269 # Previously, check_final_md5() was called here to validate |
| 270 # downloaded file's checksum, however, to be consistent with |
| 271 # non-resumable downloads, this call was removed. Checksum |
| 272 # validation of file contents should be done by the caller. |
| 273 if debug >= 1: |
| 274 print 'Resumable download complete.' |
| 275 return |
| 276 except self.RETRYABLE_EXCEPTIONS, e: |
| 277 if debug >= 1: |
| 278 print('Caught exception (%s)' % e.__repr__()) |
| 279 if isinstance(e, IOError) and e.errno == errno.EPIPE: |
| 280 # Broken pipe error causes httplib to immediately |
| 281 # close the socket (http://bugs.python.org/issue5542), |
| 282 # so we need to close and reopen the key before resuming |
| 283 # the download. |
| 284 key.get_file(fp, headers, cb, num_cb, torrent, version_id, |
| 285 override_num_retries=0) |
| 286 except ResumableDownloadException, e: |
| 287 if (e.disposition == |
| 288 ResumableTransferDisposition.ABORT_CUR_PROCESS): |
| 289 if debug >= 1: |
| 290 print('Caught non-retryable ResumableDownloadException ' |
| 291 '(%s)' % e.message) |
| 292 raise |
| 293 elif (e.disposition == |
| 294 ResumableTransferDisposition.ABORT): |
| 295 if debug >= 1: |
| 296 print('Caught non-retryable ResumableDownloadException ' |
| 297 '(%s); aborting and removing tracker file' % |
| 298 e.message) |
| 299 self._remove_tracker_file() |
| 300 raise |
| 301 else: |
| 302 if debug >= 1: |
| 303 print('Caught ResumableDownloadException (%s) - will ' |
| 304 'retry' % e.message) |
| 305 |
| 306 # At this point we had a re-tryable failure; see if made progress. |
| 307 if get_cur_file_size(fp) > had_file_bytes_before_attempt: |
| 308 progress_less_iterations = 0 |
| 309 else: |
| 310 progress_less_iterations += 1 |
| 311 |
| 312 if progress_less_iterations > self.num_retries: |
| 313 # Don't retry any longer in the current process. |
| 314 raise ResumableDownloadException( |
| 315 'Too many resumable download attempts failed without ' |
| 316 'progress. You might try this download again later', |
| 317 ResumableTransferDisposition.ABORT_CUR_PROCESS) |
| 318 |
| 319 # Close the key, in case a previous download died partway |
| 320 # through and left data in the underlying key HTTP buffer. |
| 321 # Do this within a try/except block in case the connection is |
| 322 # closed (since key.close() attempts to do a final read, in which |
| 323 # case this read attempt would get an IncompleteRead exception, |
| 324 # which we can safely ignore. |
| 325 try: |
| 326 key.close() |
| 327 except httplib.IncompleteRead: |
| 328 pass |
| 329 |
| 330 sleep_time_secs = 2**progress_less_iterations |
| 331 if debug >= 1: |
| 332 print('Got retryable failure (%d progress-less in a row).\n' |
| 333 'Sleeping %d seconds before re-trying' % |
| 334 (progress_less_iterations, sleep_time_secs)) |
| 335 time.sleep(sleep_time_secs) |
| OLD | NEW |