Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(19)

Side by Side Diff: third_party/boto/s3/resumable_download_handler.py

Issue 12633019: Added boto/ to depot_tools/third_party (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Moved boto down by one Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « third_party/boto/s3/prefix.py ('k') | third_party/boto/s3/tagging.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2010 Google Inc.
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 import errno
23 import httplib
24 import os
25 import re
26 import socket
27 import time
28 import boto
29 from boto import config, storage_uri_for_key
30 from boto.connection import AWSAuthConnection
31 from boto.exception import ResumableDownloadException
32 from boto.exception import ResumableTransferDisposition
33 from boto.s3.keyfile import KeyFile
34
35 """
36 Resumable download handler.
37
38 Resumable downloads will retry failed downloads, resuming at the byte count
39 completed by the last download attempt. If too many retries happen with no
40 progress (per configurable num_retries param), the download will be aborted.
41
42 The caller can optionally specify a tracker_file_name param in the
43 ResumableDownloadHandler constructor. If you do this, that file will
44 save the state needed to allow retrying later, in a separate process
45 (e.g., in a later run of gsutil).
46
47 Note that resumable downloads work across providers (they depend only
48 on support Range GETs), but this code is in the boto.s3 package
49 because it is the wrong abstraction level to go in the top-level boto
50 package.
51
52 TODO: At some point we should refactor the code to have a storage_service
53 package where all these provider-independent files go.
54 """
55
56
57 class ByteTranslatingCallbackHandler(object):
58 """
59 Proxy class that translates progress callbacks made by
60 boto.s3.Key.get_file(), taking into account that we're resuming
61 a download.
62 """
63 def __init__(self, proxied_cb, download_start_point):
64 self.proxied_cb = proxied_cb
65 self.download_start_point = download_start_point
66
67 def call(self, total_bytes_uploaded, total_size):
68 self.proxied_cb(self.download_start_point + total_bytes_uploaded,
69 total_size)
70
71
72 def get_cur_file_size(fp, position_to_eof=False):
73 """
74 Returns size of file, optionally leaving fp positioned at EOF.
75 """
76 if isinstance(fp, KeyFile) and not position_to_eof:
77 # Avoid EOF seek for KeyFile case as it's very inefficient.
78 return fp.getkey().size
79 if not position_to_eof:
80 cur_pos = fp.tell()
81 fp.seek(0, os.SEEK_END)
82 cur_file_size = fp.tell()
83 if not position_to_eof:
84 fp.seek(cur_pos, os.SEEK_SET)
85 return cur_file_size
86
87
88 class ResumableDownloadHandler(object):
89 """
90 Handler for resumable downloads.
91 """
92
93 ETAG_REGEX = '([a-z0-9]{32})\n'
94
95 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
96 socket.gaierror)
97
98 def __init__(self, tracker_file_name=None, num_retries=None):
99 """
100 Constructor. Instantiate once for each downloaded file.
101
102 :type tracker_file_name: string
103 :param tracker_file_name: optional file name to save tracking info
104 about this download. If supplied and the current process fails
105 the download, it can be retried in a new process. If called
106 with an existing file containing an unexpired timestamp,
107 we'll resume the transfer for this file; else we'll start a
108 new resumable download.
109
110 :type num_retries: int
111 :param num_retries: the number of times we'll re-try a resumable
112 download making no progress. (Count resets every time we get
113 progress, so download can span many more than this number of
114 retries.)
115 """
116 self.tracker_file_name = tracker_file_name
117 self.num_retries = num_retries
118 self.etag_value_for_current_download = None
119 if tracker_file_name:
120 self._load_tracker_file_etag()
121 # Save download_start_point in instance state so caller can
122 # find how much was transferred by this ResumableDownloadHandler
123 # (across retries).
124 self.download_start_point = None
125
126 def _load_tracker_file_etag(self):
127 f = None
128 try:
129 f = open(self.tracker_file_name, 'r')
130 etag_line = f.readline()
131 m = re.search(self.ETAG_REGEX, etag_line)
132 if m:
133 self.etag_value_for_current_download = m.group(1)
134 else:
135 print('Couldn\'t read etag in tracker file (%s). Restarting '
136 'download from scratch.' % self.tracker_file_name)
137 except IOError, e:
138 # Ignore non-existent file (happens first time a download
139 # is attempted on an object), but warn user for other errors.
140 if e.errno != errno.ENOENT:
141 # Will restart because
142 # self.etag_value_for_current_download == None.
143 print('Couldn\'t read URI tracker file (%s): %s. Restarting '
144 'download from scratch.' %
145 (self.tracker_file_name, e.strerror))
146 finally:
147 if f:
148 f.close()
149
150 def _save_tracker_info(self, key):
151 self.etag_value_for_current_download = key.etag.strip('"\'')
152 if not self.tracker_file_name:
153 return
154 f = None
155 try:
156 f = open(self.tracker_file_name, 'w')
157 f.write('%s\n' % self.etag_value_for_current_download)
158 except IOError, e:
159 raise ResumableDownloadException(
160 'Couldn\'t write tracker file (%s): %s.\nThis can happen'
161 'if you\'re using an incorrectly configured download tool\n'
162 '(e.g., gsutil configured to save tracker files to an '
163 'unwritable directory)' %
164 (self.tracker_file_name, e.strerror),
165 ResumableTransferDisposition.ABORT)
166 finally:
167 if f:
168 f.close()
169
170 def _remove_tracker_file(self):
171 if (self.tracker_file_name and
172 os.path.exists(self.tracker_file_name)):
173 os.unlink(self.tracker_file_name)
174
175 def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
176 torrent, version_id):
177 """
178 Attempts a resumable download.
179
180 Raises ResumableDownloadException if any problems occur.
181 """
182 cur_file_size = get_cur_file_size(fp, position_to_eof=True)
183
184 if (cur_file_size and
185 self.etag_value_for_current_download and
186 self.etag_value_for_current_download == key.etag.strip('"\'')):
187 # Try to resume existing transfer.
188 if cur_file_size > key.size:
189 raise ResumableDownloadException(
190 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
191 'if you re-try this download it will start from scratch' %
192 (fp.name, cur_file_size, str(storage_uri_for_key(key)),
193 key.size), ResumableTransferDisposition.ABORT)
194 elif cur_file_size == key.size:
195 if key.bucket.connection.debug >= 1:
196 print 'Download complete.'
197 return
198 if key.bucket.connection.debug >= 1:
199 print 'Resuming download.'
200 headers = headers.copy()
201 headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
202 cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call
203 self.download_start_point = cur_file_size
204 else:
205 if key.bucket.connection.debug >= 1:
206 print 'Starting new resumable download.'
207 self._save_tracker_info(key)
208 self.download_start_point = 0
209 # Truncate the file, in case a new resumable download is being
210 # started atop an existing file.
211 fp.truncate(0)
212
213 # Disable AWSAuthConnection-level retry behavior, since that would
214 # cause downloads to restart from scratch.
215 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
216 override_num_retries=0)
217 fp.flush()
218
219 def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
220 version_id=None):
221 """
222 Retrieves a file from a Key
223 :type key: :class:`boto.s3.key.Key` or subclass
224 :param key: The Key object from which upload is to be downloaded
225
226 :type fp: file
227 :param fp: File pointer into which data should be downloaded
228
229 :type headers: string
230 :param: headers to send when retrieving the files
231
232 :type cb: function
233 :param cb: (optional) a callback function that will be called to report
234 progress on the download. The callback should accept two integer
235 parameters, the first representing the number of bytes that have
236 been successfully transmitted from the storage service and
237 the second representing the total number of bytes that need
238 to be transmitted.
239
240 :type num_cb: int
241 :param num_cb: (optional) If a callback is specified with the cb
242 parameter this parameter determines the granularity of the callback
243 by defining the maximum number of times the callback will be
244 called during the file transfer.
245
246 :type torrent: bool
247 :param torrent: Flag for whether to get a torrent for the file
248
249 :type version_id: string
250 :param version_id: The version ID (optional)
251
252 Raises ResumableDownloadException if a problem occurs during
253 the transfer.
254 """
255
256 debug = key.bucket.connection.debug
257 if not headers:
258 headers = {}
259
260 # Use num-retries from constructor if one was provided; else check
261 # for a value specified in the boto config file; else default to 5.
262 if self.num_retries is None:
263 self.num_retries = config.getint('Boto', 'num_retries', 5)
264 progress_less_iterations = 0
265
266 while True: # Retry as long as we're making progress.
267 had_file_bytes_before_attempt = get_cur_file_size(fp)
268 try:
269 self._attempt_resumable_download(key, fp, headers, cb, num_cb,
270 torrent, version_id)
271 # Download succceded, so remove the tracker file (if have one).
272 self._remove_tracker_file()
273 # Previously, check_final_md5() was called here to validate
274 # downloaded file's checksum, however, to be consistent with
275 # non-resumable downloads, this call was removed. Checksum
276 # validation of file contents should be done by the caller.
277 if debug >= 1:
278 print 'Resumable download complete.'
279 return
280 except self.RETRYABLE_EXCEPTIONS, e:
281 if debug >= 1:
282 print('Caught exception (%s)' % e.__repr__())
283 if isinstance(e, IOError) and e.errno == errno.EPIPE:
284 # Broken pipe error causes httplib to immediately
285 # close the socket (http://bugs.python.org/issue5542),
286 # so we need to close and reopen the key before resuming
287 # the download.
288 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
289 override_num_retries=0)
290 except ResumableDownloadException, e:
291 if (e.disposition ==
292 ResumableTransferDisposition.ABORT_CUR_PROCESS):
293 if debug >= 1:
294 print('Caught non-retryable ResumableDownloadException '
295 '(%s)' % e.message)
296 raise
297 elif (e.disposition ==
298 ResumableTransferDisposition.ABORT):
299 if debug >= 1:
300 print('Caught non-retryable ResumableDownloadException '
301 '(%s); aborting and removing tracker file' %
302 e.message)
303 self._remove_tracker_file()
304 raise
305 else:
306 if debug >= 1:
307 print('Caught ResumableDownloadException (%s) - will '
308 'retry' % e.message)
309
310 # At this point we had a re-tryable failure; see if made progress.
311 if get_cur_file_size(fp) > had_file_bytes_before_attempt:
312 progress_less_iterations = 0
313 else:
314 progress_less_iterations += 1
315
316 if progress_less_iterations > self.num_retries:
317 # Don't retry any longer in the current process.
318 raise ResumableDownloadException(
319 'Too many resumable download attempts failed without '
320 'progress. You might try this download again later',
321 ResumableTransferDisposition.ABORT_CUR_PROCESS)
322
323 # Close the key, in case a previous download died partway
324 # through and left data in the underlying key HTTP buffer.
325 # Do this within a try/except block in case the connection is
326 # closed (since key.close() attempts to do a final read, in which
327 # case this read attempt would get an IncompleteRead exception,
328 # which we can safely ignore.
329 try:
330 key.close()
331 except httplib.IncompleteRead:
332 pass
333
334 sleep_time_secs = 2**progress_less_iterations
335 if debug >= 1:
336 print('Got retryable failure (%d progress-less in a row).\n'
337 'Sleeping %d seconds before re-trying' %
338 (progress_less_iterations, sleep_time_secs))
339 time.sleep(sleep_time_secs)
OLDNEW
« no previous file with comments | « third_party/boto/s3/prefix.py ('k') | third_party/boto/s3/tagging.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698