Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: third_party/gsutil/boto/s3/resumable_download_handler.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Removed gsutil/tests and gsutil/docs Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2010 Google Inc.
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 import errno
23 import httplib
24 import os
25 import re
26 import socket
27 import time
28 import boto
29 from boto import config, storage_uri_for_key
30 from boto.connection import AWSAuthConnection
31 from boto.exception import ResumableDownloadException
32 from boto.exception import ResumableTransferDisposition
33
34 """
35 Resumable download handler.
36
37 Resumable downloads will retry failed downloads, resuming at the byte count
38 completed by the last download attempt. If too many retries happen with no
39 progress (per configurable num_retries param), the download will be aborted.
40
41 The caller can optionally specify a tracker_file_name param in the
42 ResumableDownloadHandler constructor. If you do this, that file will
43 save the state needed to allow retrying later, in a separate process
44 (e.g., in a later run of gsutil).
45
46 Note that resumable downloads work across providers (they depend only
47 on support Range GETs), but this code is in the boto.s3 package
48 because it is the wrong abstraction level to go in the top-level boto
49 package.
50
51 TODO: At some point we should refactor the code to have a storage_service
52 package where all these provider-independent files go.
53 """
54
55
56 class ByteTranslatingCallbackHandler(object):
57 """
58 Proxy class that translates progress callbacks made by
59 boto.s3.Key.get_file(), taking into account that we're resuming
60 a download.
61 """
62 def __init__(self, proxied_cb, download_start_point):
63 self.proxied_cb = proxied_cb
64 self.download_start_point = download_start_point
65
66 def call(self, total_bytes_uploaded, total_size):
67 self.proxied_cb(self.download_start_point + total_bytes_uploaded,
68 total_size)
69
70
71 def get_cur_file_size(fp, position_to_eof=False):
72 """
73 Returns size of file, optionally leaving fp positioned at EOF.
74 """
75 if not position_to_eof:
76 cur_pos = fp.tell()
77 fp.seek(0, os.SEEK_END)
78 cur_file_size = fp.tell()
79 if not position_to_eof:
80 fp.seek(cur_pos, os.SEEK_SET)
81 return cur_file_size
82
83
84 class ResumableDownloadHandler(object):
85 """
86 Handler for resumable downloads.
87 """
88
89 ETAG_REGEX = '([a-z0-9]{32})\n'
90
91 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
92 socket.gaierror)
93
94 def __init__(self, tracker_file_name=None, num_retries=None):
95 """
96 Constructor. Instantiate once for each downloaded file.
97
98 :type tracker_file_name: string
99 :param tracker_file_name: optional file name to save tracking info
100 about this download. If supplied and the current process fails
101 the download, it can be retried in a new process. If called
102 with an existing file containing an unexpired timestamp,
103 we'll resume the transfer for this file; else we'll start a
104 new resumable download.
105
106 :type num_retries: int
107 :param num_retries: the number of times we'll re-try a resumable
108 download making no progress. (Count resets every time we get
109 progress, so download can span many more than this number of
110 retries.)
111 """
112 self.tracker_file_name = tracker_file_name
113 self.num_retries = num_retries
114 self.etag_value_for_current_download = None
115 if tracker_file_name:
116 self._load_tracker_file_etag()
117 # Save download_start_point in instance state so caller can
118 # find how much was transferred by this ResumableDownloadHandler
119 # (across retries).
120 self.download_start_point = None
121
122 def _load_tracker_file_etag(self):
123 f = None
124 try:
125 f = open(self.tracker_file_name, 'r')
126 etag_line = f.readline()
127 m = re.search(self.ETAG_REGEX, etag_line)
128 if m:
129 self.etag_value_for_current_download = m.group(1)
130 else:
131 print('Couldn\'t read etag in tracker file (%s). Restarting '
132 'download from scratch.' % self.tracker_file_name)
133 except IOError, e:
134 # Ignore non-existent file (happens first time a download
135 # is attempted on an object), but warn user for other errors.
136 if e.errno != errno.ENOENT:
137 # Will restart because
138 # self.etag_value_for_current_download == None.
139 print('Couldn\'t read URI tracker file (%s): %s. Restarting '
140 'download from scratch.' %
141 (self.tracker_file_name, e.strerror))
142 finally:
143 if f:
144 f.close()
145
146 def _save_tracker_info(self, key):
147 self.etag_value_for_current_download = key.etag.strip('"\'')
148 if not self.tracker_file_name:
149 return
150 f = None
151 try:
152 f = open(self.tracker_file_name, 'w')
153 f.write('%s\n' % self.etag_value_for_current_download)
154 except IOError, e:
155 raise ResumableDownloadException(
156 'Couldn\'t write tracker file (%s): %s.\nThis can happen'
157 'if you\'re using an incorrectly configured download tool\n'
158 '(e.g., gsutil configured to save tracker files to an '
159 'unwritable directory)' %
160 (self.tracker_file_name, e.strerror),
161 ResumableTransferDisposition.ABORT)
162 finally:
163 if f:
164 f.close()
165
166 def _remove_tracker_file(self):
167 if (self.tracker_file_name and
168 os.path.exists(self.tracker_file_name)):
169 os.unlink(self.tracker_file_name)
170
171 def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
172 torrent, version_id):
173 """
174 Attempts a resumable download.
175
176 Raises ResumableDownloadException if any problems occur.
177 """
178 cur_file_size = get_cur_file_size(fp, position_to_eof=True)
179
180 if (cur_file_size and
181 self.etag_value_for_current_download and
182 self.etag_value_for_current_download == key.etag.strip('"\'')):
183 # Try to resume existing transfer.
184 if cur_file_size > key.size:
185 raise ResumableDownloadException(
186 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
187 'if you re-try this download it will start from scratch' %
188 (fp.name, cur_file_size, str(storage_uri_for_key(key)),
189 key.size), ResumableTransferDisposition.ABORT)
190 elif cur_file_size == key.size:
191 if key.bucket.connection.debug >= 1:
192 print 'Download complete.'
193 return
194 if key.bucket.connection.debug >= 1:
195 print 'Resuming download.'
196 headers = headers.copy()
197 headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
198 cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call
199 self.download_start_point = cur_file_size
200 else:
201 if key.bucket.connection.debug >= 1:
202 print 'Starting new resumable download.'
203 self._save_tracker_info(key)
204 self.download_start_point = 0
205 # Truncate the file, in case a new resumable download is being
206 # started atop an existing file.
207 fp.truncate(0)
208
209 # Disable AWSAuthConnection-level retry behavior, since that would
210 # cause downloads to restart from scratch.
211 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
212 override_num_retries=0)
213 fp.flush()
214
215 def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
216 version_id=None):
217 """
218 Retrieves a file from a Key
219 :type key: :class:`boto.s3.key.Key` or subclass
220 :param key: The Key object from which upload is to be downloaded
221
222 :type fp: file
223 :param fp: File pointer into which data should be downloaded
224
225 :type headers: string
226 :param: headers to send when retrieving the files
227
228 :type cb: function
229 :param cb: (optional) a callback function that will be called to report
230 progress on the download. The callback should accept two integer
231 parameters, the first representing the number of bytes that have
232 been successfully transmitted from the storage service and
233 the second representing the total number of bytes that need
234 to be transmitted.
235
236 :type num_cb: int
237 :param num_cb: (optional) If a callback is specified with the cb
238 parameter this parameter determines the granularity of the callback
239 by defining the maximum number of times the callback will be
240 called during the file transfer.
241
242 :type torrent: bool
243 :param torrent: Flag for whether to get a torrent for the file
244
245 :type version_id: string
246 :param version_id: The version ID (optional)
247
248 Raises ResumableDownloadException if a problem occurs during
249 the transfer.
250 """
251
252 debug = key.bucket.connection.debug
253 if not headers:
254 headers = {}
255
256 # Use num-retries from constructor if one was provided; else check
257 # for a value specified in the boto config file; else default to 5.
258 if self.num_retries is None:
259 self.num_retries = config.getint('Boto', 'num_retries', 5)
260 progress_less_iterations = 0
261
262 while True: # Retry as long as we're making progress.
263 had_file_bytes_before_attempt = get_cur_file_size(fp)
264 try:
265 self._attempt_resumable_download(key, fp, headers, cb, num_cb,
266 torrent, version_id)
267 # Download succceded, so remove the tracker file (if have one).
268 self._remove_tracker_file()
269 # Previously, check_final_md5() was called here to validate
270 # downloaded file's checksum, however, to be consistent with
271 # non-resumable downloads, this call was removed. Checksum
272 # validation of file contents should be done by the caller.
273 if debug >= 1:
274 print 'Resumable download complete.'
275 return
276 except self.RETRYABLE_EXCEPTIONS, e:
277 if debug >= 1:
278 print('Caught exception (%s)' % e.__repr__())
279 if isinstance(e, IOError) and e.errno == errno.EPIPE:
280 # Broken pipe error causes httplib to immediately
281 # close the socket (http://bugs.python.org/issue5542),
282 # so we need to close and reopen the key before resuming
283 # the download.
284 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
285 override_num_retries=0)
286 except ResumableDownloadException, e:
287 if (e.disposition ==
288 ResumableTransferDisposition.ABORT_CUR_PROCESS):
289 if debug >= 1:
290 print('Caught non-retryable ResumableDownloadException '
291 '(%s)' % e.message)
292 raise
293 elif (e.disposition ==
294 ResumableTransferDisposition.ABORT):
295 if debug >= 1:
296 print('Caught non-retryable ResumableDownloadException '
297 '(%s); aborting and removing tracker file' %
298 e.message)
299 self._remove_tracker_file()
300 raise
301 else:
302 if debug >= 1:
303 print('Caught ResumableDownloadException (%s) - will '
304 'retry' % e.message)
305
306 # At this point we had a re-tryable failure; see if made progress.
307 if get_cur_file_size(fp) > had_file_bytes_before_attempt:
308 progress_less_iterations = 0
309 else:
310 progress_less_iterations += 1
311
312 if progress_less_iterations > self.num_retries:
313 # Don't retry any longer in the current process.
314 raise ResumableDownloadException(
315 'Too many resumable download attempts failed without '
316 'progress. You might try this download again later',
317 ResumableTransferDisposition.ABORT_CUR_PROCESS)
318
319 # Close the key, in case a previous download died partway
320 # through and left data in the underlying key HTTP buffer.
321 # Do this within a try/except block in case the connection is
322 # closed (since key.close() attempts to do a final read, in which
323 # case this read attempt would get an IncompleteRead exception,
324 # which we can safely ignore.
325 try:
326 key.close()
327 except httplib.IncompleteRead:
328 pass
329
330 sleep_time_secs = 2**progress_less_iterations
331 if debug >= 1:
332 print('Got retryable failure (%d progress-less in a row).\n'
333 'Sleeping %d seconds before re-trying' %
334 (progress_less_iterations, sleep_time_secs))
335 time.sleep(sleep_time_secs)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698