OLD | NEW |
(Empty) | |
| 1 # Copyright 2010 Google Inc. |
| 2 # |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a |
| 4 # copy of this software and associated documentation files (the |
| 5 # "Software"), to deal in the Software without restriction, including |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- |
| 9 # lowing conditions: |
| 10 # |
| 11 # The above copyright notice and this permission notice shall be included |
| 12 # in all copies or substantial portions of the Software. |
| 13 # |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 20 # IN THE SOFTWARE. |
| 21 |
| 22 import cgi |
| 23 import errno |
| 24 import httplib |
| 25 import os |
| 26 import random |
| 27 import re |
| 28 import socket |
| 29 import time |
| 30 import urlparse |
| 31 import boto |
| 32 from boto import config, UserAgent |
| 33 from boto.connection import AWSAuthConnection |
| 34 from boto.exception import InvalidUriError |
| 35 from boto.exception import ResumableTransferDisposition |
| 36 from boto.exception import ResumableUploadException |
| 37 from boto.s3.keyfile import KeyFile |
| 38 try: |
| 39 from hashlib import md5 |
| 40 except ImportError: |
| 41 from md5 import md5 |
| 42 |
| 43 """ |
| 44 Handler for Google Cloud Storage resumable uploads. See |
| 45 http://code.google.com/apis/storage/docs/developer-guide.html#resumable |
| 46 for details. |
| 47 |
| 48 Resumable uploads will retry failed uploads, resuming at the byte |
| 49 count completed by the last upload attempt. If too many retries happen with |
| 50 no progress (per configurable num_retries param), the upload will be |
| 51 aborted in the current process. |
| 52 |
| 53 The caller can optionally specify a tracker_file_name param in the |
| 54 ResumableUploadHandler constructor. If you do this, that file will |
| 55 save the state needed to allow retrying later, in a separate process |
| 56 (e.g., in a later run of gsutil). |
| 57 """ |
| 58 |
| 59 |
| 60 class ResumableUploadHandler(object): |
| 61 |
| 62 BUFFER_SIZE = 8192 |
| 63 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, |
| 64 socket.gaierror) |
| 65 |
| 66 # (start, end) response indicating server has nothing (upload protocol uses |
| 67 # inclusive numbering). |
| 68 SERVER_HAS_NOTHING = (0, -1) |
| 69 |
| 70 def __init__(self, tracker_file_name=None, num_retries=None): |
| 71 """ |
| 72 Constructor. Instantiate once for each uploaded file. |
| 73 |
| 74 :type tracker_file_name: string |
| 75 :param tracker_file_name: optional file name to save tracker URI. |
| 76 If supplied and the current process fails the upload, it can be |
| 77 retried in a new process. If called with an existing file containing |
| 78 a valid tracker URI, we'll resume the upload from this URI; else |
| 79 we'll start a new resumable upload (and write the URI to this |
| 80 tracker file). |
| 81 |
| 82 :type num_retries: int |
| 83 :param num_retries: the number of times we'll re-try a resumable upload |
| 84 making no progress. (Count resets every time we get progress, so |
| 85 upload can span many more than this number of retries.) |
| 86 """ |
| 87 self.tracker_file_name = tracker_file_name |
| 88 self.num_retries = num_retries |
| 89 self.server_has_bytes = 0 # Byte count at last server check. |
| 90 self.tracker_uri = None |
| 91 if tracker_file_name: |
| 92 self._load_tracker_uri_from_file() |
| 93 # Save upload_start_point in instance state so caller can find how |
| 94 # much was transferred by this ResumableUploadHandler (across retries). |
| 95 self.upload_start_point = None |
| 96 |
| 97 def _load_tracker_uri_from_file(self): |
| 98 f = None |
| 99 try: |
| 100 f = open(self.tracker_file_name, 'r') |
| 101 uri = f.readline().strip() |
| 102 self._set_tracker_uri(uri) |
| 103 except IOError, e: |
| 104 # Ignore non-existent file (happens first time an upload |
| 105 # is attempted on a file), but warn user for other errors. |
| 106 if e.errno != errno.ENOENT: |
| 107 # Will restart because self.tracker_uri == None. |
| 108 print('Couldn\'t read URI tracker file (%s): %s. Restarting ' |
| 109 'upload from scratch.' % |
| 110 (self.tracker_file_name, e.strerror)) |
| 111 except InvalidUriError, e: |
| 112 # Warn user, but proceed (will restart because |
| 113 # self.tracker_uri == None). |
| 114 print('Invalid tracker URI (%s) found in URI tracker file ' |
| 115 '(%s). Restarting upload from scratch.' % |
| 116 (uri, self.tracker_file_name)) |
| 117 finally: |
| 118 if f: |
| 119 f.close() |
| 120 |
| 121 def _save_tracker_uri_to_file(self): |
| 122 """ |
| 123 Saves URI to tracker file if one was passed to constructor. |
| 124 """ |
| 125 if not self.tracker_file_name: |
| 126 return |
| 127 f = None |
| 128 try: |
| 129 f = open(self.tracker_file_name, 'w') |
| 130 f.write(self.tracker_uri) |
| 131 except IOError, e: |
| 132 raise ResumableUploadException( |
| 133 'Couldn\'t write URI tracker file (%s): %s.\nThis can happen' |
| 134 'if you\'re using an incorrectly configured upload tool\n' |
| 135 '(e.g., gsutil configured to save tracker files to an ' |
| 136 'unwritable directory)' % |
| 137 (self.tracker_file_name, e.strerror), |
| 138 ResumableTransferDisposition.ABORT) |
| 139 finally: |
| 140 if f: |
| 141 f.close() |
| 142 |
| 143 def _set_tracker_uri(self, uri): |
| 144 """ |
| 145 Called when we start a new resumable upload or get a new tracker |
| 146 URI for the upload. Saves URI and resets upload state. |
| 147 |
| 148 Raises InvalidUriError if URI is syntactically invalid. |
| 149 """ |
| 150 parse_result = urlparse.urlparse(uri) |
| 151 if (parse_result.scheme.lower() not in ['http', 'https'] or |
| 152 not parse_result.netloc): |
| 153 raise InvalidUriError('Invalid tracker URI (%s)' % uri) |
| 154 self.tracker_uri = uri |
| 155 self.tracker_uri_host = parse_result.netloc |
| 156 self.tracker_uri_path = '%s?%s' % ( |
| 157 parse_result.path, parse_result.query) |
| 158 self.server_has_bytes = 0 |
| 159 |
| 160 def get_tracker_uri(self): |
| 161 """ |
| 162 Returns upload tracker URI, or None if the upload has not yet started. |
| 163 """ |
| 164 return self.tracker_uri |
| 165 |
| 166 def _remove_tracker_file(self): |
| 167 if (self.tracker_file_name and |
| 168 os.path.exists(self.tracker_file_name)): |
| 169 os.unlink(self.tracker_file_name) |
| 170 |
| 171 def _build_content_range_header(self, range_spec='*', length_spec='*'): |
| 172 return 'bytes %s/%s' % (range_spec, length_spec) |
| 173 |
| 174 def _query_server_state(self, conn, file_length): |
| 175 """ |
| 176 Queries server to find out state of given upload. |
| 177 |
| 178 Note that this method really just makes special case use of the |
| 179 fact that the upload server always returns the current start/end |
| 180 state whenever a PUT doesn't complete. |
| 181 |
| 182 Returns HTTP response from sending request. |
| 183 |
| 184 Raises ResumableUploadException if problem querying server. |
| 185 """ |
| 186 # Send an empty PUT so that server replies with this resumable |
| 187 # transfer's state. |
| 188 put_headers = {} |
| 189 put_headers['Content-Range'] = ( |
| 190 self._build_content_range_header('*', file_length)) |
| 191 put_headers['Content-Length'] = '0' |
| 192 return AWSAuthConnection.make_request(conn, 'PUT', |
| 193 path=self.tracker_uri_path, |
| 194 auth_path=self.tracker_uri_path, |
| 195 headers=put_headers, |
| 196 host=self.tracker_uri_host) |
| 197 |
| 198 def _query_server_pos(self, conn, file_length): |
| 199 """ |
| 200 Queries server to find out what bytes it currently has. |
| 201 |
| 202 Returns (server_start, server_end), where the values are inclusive. |
| 203 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2. |
| 204 |
| 205 Raises ResumableUploadException if problem querying server. |
| 206 """ |
| 207 resp = self._query_server_state(conn, file_length) |
| 208 if resp.status == 200: |
| 209 # To handle the boundary condition where the server has the complete |
| 210 # file, we return (server_start, file_length-1). That way the |
| 211 # calling code can always simply read up through server_end. (If we |
| 212 # didn't handle this boundary condition here, the caller would have |
| 213 # to check whether server_end == file_length and read one fewer byte |
| 214 # in that case.) |
| 215 return (0, file_length - 1) # Completed upload. |
| 216 if resp.status != 308: |
| 217 # This means the server didn't have any state for the given |
| 218 # upload ID, which can happen (for example) if the caller saved |
| 219 # the tracker URI to a file and then tried to restart the transfer |
| 220 # after that upload ID has gone stale. In that case we need to |
| 221 # start a new transfer (and the caller will then save the new |
| 222 # tracker URI to the tracker file). |
| 223 raise ResumableUploadException( |
| 224 'Got non-308 response (%s) from server state query' % |
| 225 resp.status, ResumableTransferDisposition.START_OVER) |
| 226 got_valid_response = False |
| 227 range_spec = resp.getheader('range') |
| 228 if range_spec: |
| 229 # Parse 'bytes=<from>-<to>' range_spec. |
| 230 m = re.search('bytes=(\d+)-(\d+)', range_spec) |
| 231 if m: |
| 232 server_start = long(m.group(1)) |
| 233 server_end = long(m.group(2)) |
| 234 got_valid_response = True |
| 235 else: |
| 236 # No Range header, which means the server does not yet have |
| 237 # any bytes. Note that the Range header uses inclusive 'from' |
| 238 # and 'to' values. Since Range 0-0 would mean that the server |
| 239 # has byte 0, omitting the Range header is used to indicate that |
| 240 # the server doesn't have any bytes. |
| 241 return self.SERVER_HAS_NOTHING |
| 242 if not got_valid_response: |
| 243 raise ResumableUploadException( |
| 244 'Couldn\'t parse upload server state query response (%s)' % |
| 245 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) |
| 246 if conn.debug >= 1: |
| 247 print 'Server has: Range: %d - %d.' % (server_start, server_end) |
| 248 return (server_start, server_end) |
| 249 |
| 250 def _start_new_resumable_upload(self, key, headers=None): |
| 251 """ |
| 252 Starts a new resumable upload. |
| 253 |
| 254 Raises ResumableUploadException if any errors occur. |
| 255 """ |
| 256 conn = key.bucket.connection |
| 257 if conn.debug >= 1: |
| 258 print 'Starting new resumable upload.' |
| 259 self.server_has_bytes = 0 |
| 260 |
| 261 # Start a new resumable upload by sending a POST request with an |
| 262 # empty body and the "X-Goog-Resumable: start" header. Include any |
| 263 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length |
| 264 # (and raise an exception if they tried to pass one, since it's |
| 265 # a semantic error to specify it at this point, and if we were to |
| 266 # include one now it would cause the server to expect that many |
| 267 # bytes; the POST doesn't include the actual file bytes We set |
| 268 # the Content-Length in the subsequent PUT, based on the uploaded |
| 269 # file size. |
| 270 post_headers = {} |
| 271 for k in headers: |
| 272 if k.lower() == 'content-length': |
| 273 raise ResumableUploadException( |
| 274 'Attempt to specify Content-Length header (disallowed)', |
| 275 ResumableTransferDisposition.ABORT) |
| 276 post_headers[k] = headers[k] |
| 277 post_headers[conn.provider.resumable_upload_header] = 'start' |
| 278 |
| 279 resp = conn.make_request( |
| 280 'POST', key.bucket.name, key.name, post_headers) |
| 281 # Get tracker URI from response 'Location' header. |
| 282 body = resp.read() |
| 283 |
| 284 # Check for various status conditions. |
| 285 if resp.status in [500, 503]: |
| 286 # Retry status 500 and 503 errors after a delay. |
| 287 raise ResumableUploadException( |
| 288 'Got status %d from attempt to start resumable upload. ' |
| 289 'Will wait/retry' % resp.status, |
| 290 ResumableTransferDisposition.WAIT_BEFORE_RETRY) |
| 291 elif resp.status != 200 and resp.status != 201: |
| 292 raise ResumableUploadException( |
| 293 'Got status %d from attempt to start resumable upload. ' |
| 294 'Aborting' % resp.status, |
| 295 ResumableTransferDisposition.ABORT) |
| 296 |
| 297 # Else we got 200 or 201 response code, indicating the resumable |
| 298 # upload was created. |
| 299 tracker_uri = resp.getheader('Location') |
| 300 if not tracker_uri: |
| 301 raise ResumableUploadException( |
| 302 'No resumable tracker URI found in resumable initiation ' |
| 303 'POST response (%s)' % body, |
| 304 ResumableTransferDisposition.WAIT_BEFORE_RETRY) |
| 305 self._set_tracker_uri(tracker_uri) |
| 306 self._save_tracker_uri_to_file() |
| 307 |
| 308 def _upload_file_bytes(self, conn, http_conn, fp, file_length, |
| 309 total_bytes_uploaded, cb, num_cb, md5sum, headers): |
| 310 """ |
| 311 Makes one attempt to upload file bytes, using an existing resumable |
| 312 upload connection. |
| 313 |
| 314 Returns (etag, generation, meta_generation) from server upon success. |
| 315 |
| 316 Raises ResumableUploadException if any problems occur. |
| 317 """ |
| 318 buf = fp.read(self.BUFFER_SIZE) |
| 319 if cb: |
| 320 # The cb_count represents the number of full buffers to send between |
| 321 # cb executions. |
| 322 if num_cb > 2: |
| 323 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) |
| 324 elif num_cb < 0: |
| 325 cb_count = -1 |
| 326 else: |
| 327 cb_count = 0 |
| 328 i = 0 |
| 329 cb(total_bytes_uploaded, file_length) |
| 330 |
| 331 # Build resumable upload headers for the transfer. Don't send a |
| 332 # Content-Range header if the file is 0 bytes long, because the |
| 333 # resumable upload protocol uses an *inclusive* end-range (so, sending |
| 334 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). |
| 335 if not headers: |
| 336 put_headers = {} |
| 337 else: |
| 338 put_headers = headers.copy() |
| 339 if file_length: |
| 340 if total_bytes_uploaded == file_length: |
| 341 range_header = self._build_content_range_header( |
| 342 '*', file_length) |
| 343 else: |
| 344 range_header = self._build_content_range_header( |
| 345 '%d-%d' % (total_bytes_uploaded, file_length - 1), |
| 346 file_length) |
| 347 put_headers['Content-Range'] = range_header |
| 348 # Set Content-Length to the total bytes we'll send with this PUT. |
| 349 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) |
| 350 http_request = AWSAuthConnection.build_base_http_request( |
| 351 conn, 'PUT', path=self.tracker_uri_path, auth_path=None, |
| 352 headers=put_headers, host=self.tracker_uri_host) |
| 353 http_conn.putrequest('PUT', http_request.path) |
| 354 for k in put_headers: |
| 355 http_conn.putheader(k, put_headers[k]) |
| 356 http_conn.endheaders() |
| 357 |
| 358 # Turn off debug on http connection so upload content isn't included |
| 359 # in debug stream. |
| 360 http_conn.set_debuglevel(0) |
| 361 while buf: |
| 362 http_conn.send(buf) |
| 363 md5sum.update(buf) |
| 364 total_bytes_uploaded += len(buf) |
| 365 if cb: |
| 366 i += 1 |
| 367 if i == cb_count or cb_count == -1: |
| 368 cb(total_bytes_uploaded, file_length) |
| 369 i = 0 |
| 370 buf = fp.read(self.BUFFER_SIZE) |
| 371 http_conn.set_debuglevel(conn.debug) |
| 372 if cb: |
| 373 cb(total_bytes_uploaded, file_length) |
| 374 if total_bytes_uploaded != file_length: |
| 375 # Abort (and delete the tracker file) so if the user retries |
| 376 # they'll start a new resumable upload rather than potentially |
| 377 # attempting to pick back up later where we left off. |
| 378 raise ResumableUploadException( |
| 379 'File changed during upload: EOF at %d bytes of %d byte file.' % |
| 380 (total_bytes_uploaded, file_length), |
| 381 ResumableTransferDisposition.ABORT) |
| 382 resp = http_conn.getresponse() |
| 383 body = resp.read() |
| 384 # Restore http connection debug level. |
| 385 http_conn.set_debuglevel(conn.debug) |
| 386 |
| 387 if resp.status == 200: |
| 388 # Success. |
| 389 return (resp.getheader('etag'), |
| 390 resp.getheader('x-goog-generation'), |
| 391 resp.getheader('x-goog-metageneration')) |
| 392 # Retry timeout (408) and status 500 and 503 errors after a delay. |
| 393 elif resp.status in [408, 500, 503]: |
| 394 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY |
| 395 else: |
| 396 # Catch all for any other error codes. |
| 397 disposition = ResumableTransferDisposition.ABORT |
| 398 raise ResumableUploadException('Got response code %d while attempting ' |
| 399 'upload (%s)' % |
| 400 (resp.status, resp.reason), disposition) |
| 401 |
| 402 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb, |
| 403 num_cb, md5sum): |
| 404 """ |
| 405 Attempts a resumable upload. |
| 406 |
| 407 Returns (etag, generation, meta_generation) from server upon success. |
| 408 |
| 409 Raises ResumableUploadException if any problems occur. |
| 410 """ |
| 411 (server_start, server_end) = self.SERVER_HAS_NOTHING |
| 412 conn = key.bucket.connection |
| 413 if self.tracker_uri: |
| 414 # Try to resume existing resumable upload. |
| 415 try: |
| 416 (server_start, server_end) = ( |
| 417 self._query_server_pos(conn, file_length)) |
| 418 self.server_has_bytes = server_start |
| 419 |
| 420 if server_end: |
| 421 # If the server already has some of the content, we need to |
| 422 # update the md5 with the bytes that have already been |
| 423 # uploaded to ensure we get a complete hash in the end. |
| 424 print 'Catching up md5 for resumed upload' |
| 425 fp.seek(0) |
| 426 # Read local file's bytes through position server has. For |
| 427 # example, if server has (0, 3) we want to read 3-0+1=4 bytes. |
| 428 bytes_to_go = server_end + 1 |
| 429 while bytes_to_go: |
| 430 chunk = fp.read(min(key.BufferSize, bytes_to_go)) |
| 431 if not chunk: |
| 432 raise ResumableUploadException( |
| 433 'Hit end of file during resumable upload md5 ' |
| 434 'catchup. This should not happen under\n' |
| 435 'normal circumstances, as it indicates the ' |
| 436 'server has more bytes of this transfer\nthan' |
| 437 ' the current file size. Restarting upload.', |
| 438 ResumableTransferDisposition.START_OVER) |
| 439 md5sum.update(chunk) |
| 440 bytes_to_go -= len(chunk) |
| 441 |
| 442 if conn.debug >= 1: |
| 443 print 'Resuming transfer.' |
| 444 except ResumableUploadException, e: |
| 445 if conn.debug >= 1: |
| 446 print 'Unable to resume transfer (%s).' % e.message |
| 447 self._start_new_resumable_upload(key, headers) |
| 448 else: |
| 449 self._start_new_resumable_upload(key, headers) |
| 450 |
| 451 # upload_start_point allows the code that instantiated the |
| 452 # ResumableUploadHandler to find out the point from which it started |
| 453 # uploading (e.g., so it can correctly compute throughput). |
| 454 if self.upload_start_point is None: |
| 455 self.upload_start_point = server_end |
| 456 |
| 457 total_bytes_uploaded = server_end + 1 |
| 458 # Corner case: Don't attempt to seek if we've already uploaded the |
| 459 # entire file, because if the file is a stream (e.g., the KeyFile |
| 460 # wrapper around input key when copying between providers), attempting |
| 461 # to seek to the end of file would result in an InvalidRange error. |
| 462 if file_length < total_bytes_uploaded: |
| 463 fp.seek(total_bytes_uploaded) |
| 464 conn = key.bucket.connection |
| 465 |
| 466 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses |
| 467 # pool connections) because httplib requires a new HTTP connection per |
| 468 # transaction. (Without this, calling http_conn.getresponse() would get |
| 469 # "ResponseNotReady".) |
| 470 http_conn = conn.new_http_connection(self.tracker_uri_host, |
| 471 conn.is_secure) |
| 472 http_conn.set_debuglevel(conn.debug) |
| 473 |
| 474 # Make sure to close http_conn at end so if a local file read |
| 475 # failure occurs partway through server will terminate current upload |
| 476 # and can report that progress on next attempt. |
| 477 try: |
| 478 return self._upload_file_bytes(conn, http_conn, fp, file_length, |
| 479 total_bytes_uploaded, cb, num_cb, md5
sum, |
| 480 headers) |
| 481 except (ResumableUploadException, socket.error): |
| 482 resp = self._query_server_state(conn, file_length) |
| 483 if resp.status == 400: |
| 484 raise ResumableUploadException('Got 400 response from server ' |
| 485 'state query after failed resumable upload attempt. This ' |
| 486 'can happen for various reasons, including specifying an ' |
| 487 'invalid request (e.g., an invalid canned ACL) or if the ' |
| 488 'file size changed between upload attempts', |
| 489 ResumableTransferDisposition.ABORT) |
| 490 else: |
| 491 raise |
| 492 finally: |
| 493 http_conn.close() |
| 494 |
| 495 def _check_final_md5(self, key, etag): |
| 496 """ |
| 497 Checks that etag from server agrees with md5 computed before upload. |
| 498 This is important, since the upload could have spanned a number of |
| 499 hours and multiple processes (e.g., gsutil runs), and the user could |
| 500 change some of the file and not realize they have inconsistent data. |
| 501 """ |
| 502 if key.bucket.connection.debug >= 1: |
| 503 print 'Checking md5 against etag.' |
| 504 if key.md5 != etag.strip('"\''): |
| 505 # Call key.open_read() before attempting to delete the |
| 506 # (incorrect-content) key, so we perform that request on a |
| 507 # different HTTP connection. This is neededb because httplib |
| 508 # will return a "Response not ready" error if you try to perform |
| 509 # a second transaction on the connection. |
| 510 key.open_read() |
| 511 key.close() |
| 512 key.delete() |
| 513 raise ResumableUploadException( |
| 514 'File changed during upload: md5 signature doesn\'t match etag ' |
| 515 '(incorrect uploaded object deleted)', |
| 516 ResumableTransferDisposition.ABORT) |
| 517 |
| 518 def handle_resumable_upload_exception(self, e, debug): |
| 519 if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS): |
| 520 if debug >= 1: |
| 521 print('Caught non-retryable ResumableUploadException (%s); ' |
| 522 'aborting but retaining tracker file' % e.message) |
| 523 raise |
| 524 elif (e.disposition == ResumableTransferDisposition.ABORT): |
| 525 if debug >= 1: |
| 526 print('Caught non-retryable ResumableUploadException (%s); ' |
| 527 'aborting and removing tracker file' % e.message) |
| 528 self._remove_tracker_file() |
| 529 raise |
| 530 else: |
| 531 if debug >= 1: |
| 532 print('Caught ResumableUploadException (%s) - will retry' % |
| 533 e.message) |
| 534 |
| 535 def track_progress_less_iterations(self, server_had_bytes_before_attempt, |
| 536 roll_back_md5=True, debug=0): |
| 537 # At this point we had a re-tryable failure; see if made progress. |
| 538 if self.server_has_bytes > server_had_bytes_before_attempt: |
| 539 self.progress_less_iterations = 0 # If progress, reset counter. |
| 540 else: |
| 541 self.progress_less_iterations += 1 |
| 542 if roll_back_md5: |
| 543 # Rollback any potential md5sum updates, as we did not |
| 544 # make any progress in this iteration. |
| 545 self.md5sum = self.md5sum_before_attempt |
| 546 |
| 547 if self.progress_less_iterations > self.num_retries: |
| 548 # Don't retry any longer in the current process. |
| 549 raise ResumableUploadException( |
| 550 'Too many resumable upload attempts failed without ' |
| 551 'progress. You might try this upload again later', |
| 552 ResumableTransferDisposition.ABORT_CUR_PROCESS) |
| 553 |
| 554 # Use binary exponential backoff to desynchronize client requests. |
| 555 sleep_time_secs = random.random() * (2**self.progress_less_iterations) |
| 556 if debug >= 1: |
| 557 print ('Got retryable failure (%d progress-less in a row).\n' |
| 558 'Sleeping %3.1f seconds before re-trying' % |
| 559 (self.progress_less_iterations, sleep_time_secs)) |
| 560 time.sleep(sleep_time_secs) |
| 561 |
| 562 def send_file(self, key, fp, headers, cb=None, num_cb=10): |
| 563 """ |
| 564 Upload a file to a key into a bucket on GS, using GS resumable upload |
| 565 protocol. |
| 566 |
| 567 :type key: :class:`boto.s3.key.Key` or subclass |
| 568 :param key: The Key object to which data is to be uploaded |
| 569 |
| 570 :type fp: file-like object |
| 571 :param fp: The file pointer to upload |
| 572 |
| 573 :type headers: dict |
| 574 :param headers: The headers to pass along with the PUT request |
| 575 |
| 576 :type cb: function |
| 577 :param cb: a callback function that will be called to report progress on |
| 578 the upload. The callback should accept two integer parameters, the |
| 579 first representing the number of bytes that have been successfully |
| 580 transmitted to GS, and the second representing the total number of |
| 581 bytes that need to be transmitted. |
| 582 |
| 583 :type num_cb: int |
| 584 :param num_cb: (optional) If a callback is specified with the cb |
| 585 parameter, this parameter determines the granularity of the callback |
| 586 by defining the maximum number of times the callback will be called |
| 587 during the file transfer. Providing a negative integer will cause |
| 588 your callback to be called with each buffer read. |
| 589 |
| 590 Raises ResumableUploadException if a problem occurs during the transfer. |
| 591 """ |
| 592 |
| 593 if not headers: |
| 594 headers = {} |
| 595 # If Content-Type header is present and set to None, remove it. |
| 596 # This is gsutil's way of asking boto to refrain from auto-generating |
| 597 # that header. |
| 598 CT = 'Content-Type' |
| 599 if CT in headers and headers[CT] is None: |
| 600 del headers[CT] |
| 601 |
| 602 headers['User-Agent'] = UserAgent |
| 603 |
| 604 # Determine file size different ways for case where fp is actually a |
| 605 # wrapper around a Key vs an actual file. |
| 606 if isinstance(fp, KeyFile): |
| 607 file_length = fp.getkey().size |
| 608 else: |
| 609 fp.seek(0, os.SEEK_END) |
| 610 file_length = fp.tell() |
| 611 fp.seek(0) |
| 612 debug = key.bucket.connection.debug |
| 613 |
| 614 # Compute the MD5 checksum on the fly. |
| 615 self.md5sum = md5() |
| 616 |
| 617 # Use num-retries from constructor if one was provided; else check |
| 618 # for a value specified in the boto config file; else default to 5. |
| 619 if self.num_retries is None: |
| 620 self.num_retries = config.getint('Boto', 'num_retries', 6) |
| 621 self.progress_less_iterations = 0 |
| 622 |
| 623 while True: # Retry as long as we're making progress. |
| 624 server_had_bytes_before_attempt = self.server_has_bytes |
| 625 self.md5sum_before_attempt = self.md5sum.copy() |
| 626 try: |
| 627 # Save generation and meta_generation in class state so caller |
| 628 # can find these values, for use in preconditions of future |
| 629 # operations on the uploaded object. |
| 630 (etag, self.generation, self.meta_generation) = ( |
| 631 self._attempt_resumable_upload(key, fp, file_length, |
| 632 headers, cb, num_cb, |
| 633 self.md5sum)) |
| 634 |
| 635 # Get the final md5 for the uploaded content. |
| 636 hd = self.md5sum.hexdigest() |
| 637 key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd) |
| 638 |
| 639 # Upload succceded, so remove the tracker file (if have one). |
| 640 self._remove_tracker_file() |
| 641 self._check_final_md5(key, etag) |
| 642 if debug >= 1: |
| 643 print 'Resumable upload complete.' |
| 644 return |
| 645 except self.RETRYABLE_EXCEPTIONS, e: |
| 646 if debug >= 1: |
| 647 print('Caught exception (%s)' % e.__repr__()) |
| 648 if isinstance(e, IOError) and e.errno == errno.EPIPE: |
| 649 # Broken pipe error causes httplib to immediately |
| 650 # close the socket (http://bugs.python.org/issue5542), |
| 651 # so we need to close the connection before we resume |
| 652 # the upload (which will cause a new connection to be |
| 653 # opened the next time an HTTP request is sent). |
| 654 key.bucket.connection.connection.close() |
| 655 except ResumableUploadException, e: |
| 656 self.handle_resumable_upload_exception(e, debug) |
| 657 |
| 658 self.track_progress_less_iterations(server_had_bytes_before_attempt, |
| 659 True, debug) |
OLD | NEW |