| Index: third_party/boto/gs/resumable_upload_handler.py
 | 
| diff --git a/third_party/boto/gs/resumable_upload_handler.py b/third_party/boto/gs/resumable_upload_handler.py
 | 
| new file mode 100644
 | 
| index 0000000000000000000000000000000000000000..bb775934c435036cc223f50d3cd3b743208cf018
 | 
| --- /dev/null
 | 
| +++ b/third_party/boto/gs/resumable_upload_handler.py
 | 
| @@ -0,0 +1,659 @@
 | 
| +# Copyright 2010 Google Inc.
 | 
| +#
 | 
| +# Permission is hereby granted, free of charge, to any person obtaining a
 | 
| +# copy of this software and associated documentation files (the
 | 
| +# "Software"), to deal in the Software without restriction, including
 | 
| +# without limitation the rights to use, copy, modify, merge, publish, dis-
 | 
| +# tribute, sublicense, and/or sell copies of the Software, and to permit
 | 
| +# persons to whom the Software is furnished to do so, subject to the fol-
 | 
| +# lowing conditions:
 | 
| +#
 | 
| +# The above copyright notice and this permission notice shall be included
 | 
| +# in all copies or substantial portions of the Software.
 | 
| +#
 | 
| +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 | 
| +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 | 
| +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 | 
| +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 | 
| +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | 
| +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 | 
| +# IN THE SOFTWARE.
 | 
| +
 | 
| +import cgi
 | 
| +import errno
 | 
| +import httplib
 | 
| +import os
 | 
| +import random
 | 
| +import re
 | 
| +import socket
 | 
| +import time
 | 
| +import urlparse
 | 
| +import boto
 | 
| +from boto import config, UserAgent
 | 
| +from boto.connection import AWSAuthConnection
 | 
| +from boto.exception import InvalidUriError
 | 
| +from boto.exception import ResumableTransferDisposition
 | 
| +from boto.exception import ResumableUploadException
 | 
| +from boto.s3.keyfile import KeyFile
 | 
| +try:
 | 
| +    from hashlib import md5
 | 
| +except ImportError:
 | 
| +    from md5 import md5
 | 
| +
 | 
| +"""
 | 
| +Handler for Google Cloud Storage resumable uploads. See
 | 
| +http://code.google.com/apis/storage/docs/developer-guide.html#resumable
 | 
| +for details.
 | 
| +
 | 
| +Resumable uploads will retry failed uploads, resuming at the byte
 | 
| +count completed by the last upload attempt. If too many retries happen with
 | 
| +no progress (per configurable num_retries param), the upload will be
 | 
| +aborted in the current process.
 | 
| +
 | 
| +The caller can optionally specify a tracker_file_name param in the
 | 
| +ResumableUploadHandler constructor. If you do this, that file will
 | 
| +save the state needed to allow retrying later, in a separate process
 | 
| +(e.g., in a later run of gsutil).
 | 
| +"""
 | 
| +
 | 
| +
 | 
| +class ResumableUploadHandler(object):
 | 
| +
 | 
| +    BUFFER_SIZE = 8192
 | 
| +    RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
 | 
| +                            socket.gaierror)
 | 
| +
 | 
| +    # (start, end) response indicating server has nothing (upload protocol uses
 | 
| +    # inclusive numbering).
 | 
| +    SERVER_HAS_NOTHING = (0, -1)
 | 
| +
 | 
| +    def __init__(self, tracker_file_name=None, num_retries=None):
 | 
| +        """
 | 
| +        Constructor. Instantiate once for each uploaded file.
 | 
| +
 | 
| +        :type tracker_file_name: string
 | 
| +        :param tracker_file_name: optional file name to save tracker URI.
 | 
| +            If supplied and the current process fails the upload, it can be
 | 
| +            retried in a new process. If called with an existing file containing
 | 
| +            a valid tracker URI, we'll resume the upload from this URI; else
 | 
| +            we'll start a new resumable upload (and write the URI to this
 | 
| +            tracker file).
 | 
| +
 | 
| +        :type num_retries: int
 | 
| +        :param num_retries: the number of times we'll re-try a resumable upload
 | 
| +            making no progress. (Count resets every time we get progress, so
 | 
| +            upload can span many more than this number of retries.)
 | 
| +        """
 | 
| +        self.tracker_file_name = tracker_file_name
 | 
| +        self.num_retries = num_retries
 | 
| +        self.server_has_bytes = 0  # Byte count at last server check.
 | 
| +        self.tracker_uri = None
 | 
| +        if tracker_file_name:
 | 
| +            self._load_tracker_uri_from_file()
 | 
| +        # Save upload_start_point in instance state so caller can find how
 | 
| +        # much was transferred by this ResumableUploadHandler (across retries).
 | 
| +        self.upload_start_point = None
 | 
| +
 | 
| +    def _load_tracker_uri_from_file(self):
 | 
| +        f = None
 | 
| +        try:
 | 
| +            f = open(self.tracker_file_name, 'r')
 | 
| +            uri = f.readline().strip()
 | 
| +            self._set_tracker_uri(uri)
 | 
| +        except IOError, e:
 | 
| +            # Ignore non-existent file (happens first time an upload
 | 
| +            # is attempted on a file), but warn user for other errors.
 | 
| +            if e.errno != errno.ENOENT:
 | 
| +                # Will restart because self.tracker_uri == None.
 | 
| +                print('Couldn\'t read URI tracker file (%s): %s. Restarting '
 | 
| +                      'upload from scratch.' %
 | 
| +                      (self.tracker_file_name, e.strerror))
 | 
| +        except InvalidUriError, e:
 | 
| +            # Warn user, but proceed (will restart because
 | 
| +            # self.tracker_uri == None).
 | 
| +            print('Invalid tracker URI (%s) found in URI tracker file '
 | 
| +                  '(%s). Restarting upload from scratch.' %
 | 
| +                  (uri, self.tracker_file_name))
 | 
| +        finally:
 | 
| +            if f:
 | 
| +                f.close()
 | 
| +
 | 
| +    def _save_tracker_uri_to_file(self):
 | 
| +        """
 | 
| +        Saves URI to tracker file if one was passed to constructor.
 | 
| +        """
 | 
| +        if not self.tracker_file_name:
 | 
| +            return
 | 
| +        f = None
 | 
| +        try:
 | 
| +            f = open(self.tracker_file_name, 'w')
 | 
| +            f.write(self.tracker_uri)
 | 
| +        except IOError, e:
 | 
| +            raise ResumableUploadException(
 | 
| +                'Couldn\'t write URI tracker file (%s): %s.\nThis can happen'
 | 
| +                'if you\'re using an incorrectly configured upload tool\n'
 | 
| +                '(e.g., gsutil configured to save tracker files to an '
 | 
| +                'unwritable directory)' %
 | 
| +                (self.tracker_file_name, e.strerror),
 | 
| +                ResumableTransferDisposition.ABORT)
 | 
| +        finally:
 | 
| +            if f:
 | 
| +                f.close()
 | 
| +
 | 
| +    def _set_tracker_uri(self, uri):
 | 
| +        """
 | 
| +        Called when we start a new resumable upload or get a new tracker
 | 
| +        URI for the upload. Saves URI and resets upload state.
 | 
| +
 | 
| +        Raises InvalidUriError if URI is syntactically invalid.
 | 
| +        """
 | 
| +        parse_result = urlparse.urlparse(uri)
 | 
| +        if (parse_result.scheme.lower() not in ['http', 'https'] or
 | 
| +            not parse_result.netloc):
 | 
| +            raise InvalidUriError('Invalid tracker URI (%s)' % uri)
 | 
| +        self.tracker_uri = uri
 | 
| +        self.tracker_uri_host = parse_result.netloc
 | 
| +        self.tracker_uri_path = '%s?%s' % (
 | 
| +            parse_result.path, parse_result.query)
 | 
| +        self.server_has_bytes = 0
 | 
| +
 | 
| +    def get_tracker_uri(self):
 | 
| +        """
 | 
| +        Returns upload tracker URI, or None if the upload has not yet started.
 | 
| +        """
 | 
| +        return self.tracker_uri
 | 
| +
 | 
| +    def _remove_tracker_file(self):
 | 
| +        if (self.tracker_file_name and
 | 
| +            os.path.exists(self.tracker_file_name)):
 | 
| +                os.unlink(self.tracker_file_name)
 | 
| +
 | 
| +    def _build_content_range_header(self, range_spec='*', length_spec='*'):
 | 
| +        return 'bytes %s/%s' % (range_spec, length_spec)
 | 
| +
 | 
| +    def _query_server_state(self, conn, file_length):
 | 
| +        """
 | 
| +        Queries server to find out state of given upload.
 | 
| +
 | 
| +        Note that this method really just makes special case use of the
 | 
| +        fact that the upload server always returns the current start/end
 | 
| +        state whenever a PUT doesn't complete.
 | 
| +
 | 
| +        Returns HTTP response from sending request.
 | 
| +
 | 
| +        Raises ResumableUploadException if problem querying server.
 | 
| +        """
 | 
| +        # Send an empty PUT so that server replies with this resumable
 | 
| +        # transfer's state.
 | 
| +        put_headers = {}
 | 
| +        put_headers['Content-Range'] = (
 | 
| +            self._build_content_range_header('*', file_length))
 | 
| +        put_headers['Content-Length'] = '0'
 | 
| +        return AWSAuthConnection.make_request(conn, 'PUT',
 | 
| +                                              path=self.tracker_uri_path,
 | 
| +                                              auth_path=self.tracker_uri_path,
 | 
| +                                              headers=put_headers,
 | 
| +                                              host=self.tracker_uri_host)
 | 
| +
 | 
| +    def _query_server_pos(self, conn, file_length):
 | 
| +        """
 | 
| +        Queries server to find out what bytes it currently has.
 | 
| +
 | 
| +        Returns (server_start, server_end), where the values are inclusive.
 | 
| +        For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
 | 
| +
 | 
| +        Raises ResumableUploadException if problem querying server.
 | 
| +        """
 | 
| +        resp = self._query_server_state(conn, file_length)
 | 
| +        if resp.status == 200:
 | 
| +            # To handle the boundary condition where the server has the complete
 | 
| +            # file, we return (server_start, file_length-1). That way the
 | 
| +            # calling code can always simply read up through server_end. (If we
 | 
| +            # didn't handle this boundary condition here, the caller would have
 | 
| +            # to check whether server_end == file_length and read one fewer byte
 | 
| +            # in that case.)
 | 
| +            return (0, file_length - 1)  # Completed upload.
 | 
| +        if resp.status != 308:
 | 
| +            # This means the server didn't have any state for the given
 | 
| +            # upload ID, which can happen (for example) if the caller saved
 | 
| +            # the tracker URI to a file and then tried to restart the transfer
 | 
| +            # after that upload ID has gone stale. In that case we need to
 | 
| +            # start a new transfer (and the caller will then save the new
 | 
| +            # tracker URI to the tracker file).
 | 
| +            raise ResumableUploadException(
 | 
| +                'Got non-308 response (%s) from server state query' %
 | 
| +                resp.status, ResumableTransferDisposition.START_OVER)
 | 
| +        got_valid_response = False
 | 
| +        range_spec = resp.getheader('range')
 | 
| +        if range_spec:
 | 
| +            # Parse 'bytes=<from>-<to>' range_spec.
 | 
| +            m = re.search('bytes=(\d+)-(\d+)', range_spec)
 | 
| +            if m:
 | 
| +                server_start = long(m.group(1))
 | 
| +                server_end = long(m.group(2))
 | 
| +                got_valid_response = True
 | 
| +        else:
 | 
| +            # No Range header, which means the server does not yet have
 | 
| +            # any bytes. Note that the Range header uses inclusive 'from'
 | 
| +            # and 'to' values. Since Range 0-0 would mean that the server
 | 
| +            # has byte 0, omitting the Range header is used to indicate that
 | 
| +            # the server doesn't have any bytes.
 | 
| +            return self.SERVER_HAS_NOTHING
 | 
| +        if not got_valid_response:
 | 
| +            raise ResumableUploadException(
 | 
| +                'Couldn\'t parse upload server state query response (%s)' %
 | 
| +                str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
 | 
| +        if conn.debug >= 1:
 | 
| +            print 'Server has: Range: %d - %d.' % (server_start, server_end)
 | 
| +        return (server_start, server_end)
 | 
| +
 | 
| +    def _start_new_resumable_upload(self, key, headers=None):
 | 
| +        """
 | 
| +        Starts a new resumable upload.
 | 
| +
 | 
| +        Raises ResumableUploadException if any errors occur.
 | 
| +        """
 | 
| +        conn = key.bucket.connection
 | 
| +        if conn.debug >= 1:
 | 
| +            print 'Starting new resumable upload.'
 | 
| +        self.server_has_bytes = 0
 | 
| +
 | 
| +        # Start a new resumable upload by sending a POST request with an
 | 
| +        # empty body and the "X-Goog-Resumable: start" header. Include any
 | 
| +        # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
 | 
| +        # (and raise an exception if they tried to pass one, since it's
 | 
| +        # a semantic error to specify it at this point, and if we were to
 | 
| +        # include one now it would cause the server to expect that many
 | 
| +        # bytes; the POST doesn't include the actual file bytes  We set
 | 
| +        # the Content-Length in the subsequent PUT, based on the uploaded
 | 
| +        # file size.
 | 
| +        post_headers = {}
 | 
| +        for k in headers:
 | 
| +            if k.lower() == 'content-length':
 | 
| +                raise ResumableUploadException(
 | 
| +                    'Attempt to specify Content-Length header (disallowed)',
 | 
| +                    ResumableTransferDisposition.ABORT)
 | 
| +            post_headers[k] = headers[k]
 | 
| +        post_headers[conn.provider.resumable_upload_header] = 'start'
 | 
| +
 | 
| +        resp = conn.make_request(
 | 
| +            'POST', key.bucket.name, key.name, post_headers)
 | 
| +        # Get tracker URI from response 'Location' header.
 | 
| +        body = resp.read()
 | 
| +
 | 
| +        # Check for various status conditions.
 | 
| +        if resp.status in [500, 503]:
 | 
| +            # Retry status 500 and 503 errors after a delay.
 | 
| +            raise ResumableUploadException(
 | 
| +                'Got status %d from attempt to start resumable upload. '
 | 
| +                'Will wait/retry' % resp.status,
 | 
| +                ResumableTransferDisposition.WAIT_BEFORE_RETRY)
 | 
| +        elif resp.status != 200 and resp.status != 201:
 | 
| +            raise ResumableUploadException(
 | 
| +                'Got status %d from attempt to start resumable upload. '
 | 
| +                'Aborting' % resp.status,
 | 
| +                ResumableTransferDisposition.ABORT)
 | 
| +
 | 
| +        # Else we got 200 or 201 response code, indicating the resumable
 | 
| +        # upload was created.
 | 
| +        tracker_uri = resp.getheader('Location')
 | 
| +        if not tracker_uri:
 | 
| +            raise ResumableUploadException(
 | 
| +                'No resumable tracker URI found in resumable initiation '
 | 
| +                'POST response (%s)' % body,
 | 
| +                ResumableTransferDisposition.WAIT_BEFORE_RETRY)
 | 
| +        self._set_tracker_uri(tracker_uri)
 | 
| +        self._save_tracker_uri_to_file()
 | 
| +
 | 
| +    def _upload_file_bytes(self, conn, http_conn, fp, file_length,
 | 
| +                           total_bytes_uploaded, cb, num_cb, md5sum, headers):
 | 
| +        """
 | 
| +        Makes one attempt to upload file bytes, using an existing resumable
 | 
| +        upload connection.
 | 
| +
 | 
| +        Returns (etag, generation, meta_generation) from server upon success.
 | 
| +
 | 
| +        Raises ResumableUploadException if any problems occur.
 | 
| +        """
 | 
| +        buf = fp.read(self.BUFFER_SIZE)
 | 
| +        if cb:
 | 
| +            # The cb_count represents the number of full buffers to send between
 | 
| +            # cb executions.
 | 
| +            if num_cb > 2:
 | 
| +                cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
 | 
| +            elif num_cb < 0:
 | 
| +                cb_count = -1
 | 
| +            else:
 | 
| +                cb_count = 0
 | 
| +            i = 0
 | 
| +            cb(total_bytes_uploaded, file_length)
 | 
| +
 | 
| +        # Build resumable upload headers for the transfer. Don't send a
 | 
| +        # Content-Range header if the file is 0 bytes long, because the
 | 
| +        # resumable upload protocol uses an *inclusive* end-range (so, sending
 | 
| +        # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
 | 
| +        if not headers:
 | 
| +          put_headers = {}
 | 
| +        else:
 | 
| +          put_headers = headers.copy()
 | 
| +        if file_length:
 | 
| +            if total_bytes_uploaded == file_length:
 | 
| +                range_header = self._build_content_range_header(
 | 
| +                    '*', file_length)
 | 
| +            else:
 | 
| +                range_header = self._build_content_range_header(
 | 
| +                    '%d-%d' % (total_bytes_uploaded, file_length - 1),
 | 
| +                    file_length)
 | 
| +            put_headers['Content-Range'] = range_header
 | 
| +        # Set Content-Length to the total bytes we'll send with this PUT.
 | 
| +        put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
 | 
| +        http_request = AWSAuthConnection.build_base_http_request(
 | 
| +            conn, 'PUT', path=self.tracker_uri_path, auth_path=None,
 | 
| +            headers=put_headers, host=self.tracker_uri_host)
 | 
| +        http_conn.putrequest('PUT', http_request.path)
 | 
| +        for k in put_headers:
 | 
| +            http_conn.putheader(k, put_headers[k])
 | 
| +        http_conn.endheaders()
 | 
| +
 | 
| +        # Turn off debug on http connection so upload content isn't included
 | 
| +        # in debug stream.
 | 
| +        http_conn.set_debuglevel(0)
 | 
| +        while buf:
 | 
| +            http_conn.send(buf)
 | 
| +            md5sum.update(buf)
 | 
| +            total_bytes_uploaded += len(buf)
 | 
| +            if cb:
 | 
| +                i += 1
 | 
| +                if i == cb_count or cb_count == -1:
 | 
| +                    cb(total_bytes_uploaded, file_length)
 | 
| +                    i = 0
 | 
| +            buf = fp.read(self.BUFFER_SIZE)
 | 
| +        http_conn.set_debuglevel(conn.debug)
 | 
| +        if cb:
 | 
| +            cb(total_bytes_uploaded, file_length)
 | 
| +        if total_bytes_uploaded != file_length:
 | 
| +            # Abort (and delete the tracker file) so if the user retries
 | 
| +            # they'll start a new resumable upload rather than potentially
 | 
| +            # attempting to pick back up later where we left off.
 | 
| +            raise ResumableUploadException(
 | 
| +                'File changed during upload: EOF at %d bytes of %d byte file.' %
 | 
| +                (total_bytes_uploaded, file_length),
 | 
| +                ResumableTransferDisposition.ABORT)
 | 
| +        resp = http_conn.getresponse()
 | 
| +        body = resp.read()
 | 
| +        # Restore http connection debug level.
 | 
| +        http_conn.set_debuglevel(conn.debug)
 | 
| +
 | 
| +        if resp.status == 200:
 | 
| +            # Success.
 | 
| +            return (resp.getheader('etag'),
 | 
| +                    resp.getheader('x-goog-generation'),
 | 
| +                    resp.getheader('x-goog-metageneration'))
 | 
| +        # Retry timeout (408) and status 500 and 503 errors after a delay.
 | 
| +        elif resp.status in [408, 500, 503]:
 | 
| +            disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
 | 
| +        else:
 | 
| +            # Catch all for any other error codes.
 | 
| +            disposition = ResumableTransferDisposition.ABORT
 | 
| +        raise ResumableUploadException('Got response code %d while attempting '
 | 
| +                                       'upload (%s)' %
 | 
| +                                       (resp.status, resp.reason), disposition)
 | 
| +
 | 
| +    def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
 | 
| +                                  num_cb, md5sum):
 | 
| +        """
 | 
| +        Attempts a resumable upload.
 | 
| +
 | 
| +        Returns (etag, generation, meta_generation) from server upon success.
 | 
| +
 | 
| +        Raises ResumableUploadException if any problems occur.
 | 
| +        """
 | 
| +        (server_start, server_end) = self.SERVER_HAS_NOTHING
 | 
| +        conn = key.bucket.connection
 | 
| +        if self.tracker_uri:
 | 
| +            # Try to resume existing resumable upload.
 | 
| +            try:
 | 
| +                (server_start, server_end) = (
 | 
| +                    self._query_server_pos(conn, file_length))
 | 
| +                self.server_has_bytes = server_start
 | 
| +
 | 
| +                if server_end:
 | 
| +                  # If the server already has some of the content, we need to
 | 
| +                  # update the md5 with the bytes that have already been
 | 
| +                  # uploaded to ensure we get a complete hash in the end.
 | 
| +                  print 'Catching up md5 for resumed upload'
 | 
| +                  fp.seek(0)
 | 
| +                  # Read local file's bytes through position server has. For
 | 
| +                  # example, if server has (0, 3) we want to read 3-0+1=4 bytes.
 | 
| +                  bytes_to_go = server_end + 1
 | 
| +                  while bytes_to_go:
 | 
| +                      chunk = fp.read(min(key.BufferSize, bytes_to_go))
 | 
| +                      if not chunk:
 | 
| +                          raise ResumableUploadException(
 | 
| +                              'Hit end of file during resumable upload md5 '
 | 
| +                              'catchup. This should not happen under\n'
 | 
| +                              'normal circumstances, as it indicates the '
 | 
| +                              'server has more bytes of this transfer\nthan'
 | 
| +                              ' the current file size. Restarting upload.',
 | 
| +                              ResumableTransferDisposition.START_OVER)
 | 
| +                      md5sum.update(chunk)
 | 
| +                      bytes_to_go -= len(chunk)
 | 
| +
 | 
| +                if conn.debug >= 1:
 | 
| +                    print 'Resuming transfer.'
 | 
| +            except ResumableUploadException, e:
 | 
| +                if conn.debug >= 1:
 | 
| +                    print 'Unable to resume transfer (%s).' % e.message
 | 
| +                self._start_new_resumable_upload(key, headers)
 | 
| +        else:
 | 
| +            self._start_new_resumable_upload(key, headers)
 | 
| +
 | 
| +        # upload_start_point allows the code that instantiated the
 | 
| +        # ResumableUploadHandler to find out the point from which it started
 | 
| +        # uploading (e.g., so it can correctly compute throughput).
 | 
| +        if self.upload_start_point is None:
 | 
| +            self.upload_start_point = server_end
 | 
| +
 | 
| +        total_bytes_uploaded = server_end + 1
 | 
| +        # Corner case: Don't attempt to seek if we've already uploaded the
 | 
| +        # entire file, because if the file is a stream (e.g., the KeyFile
 | 
| +        # wrapper around input key when copying between providers), attempting
 | 
| +        # to seek to the end of file would result in an InvalidRange error.
 | 
| +        if file_length < total_bytes_uploaded:
 | 
| +          fp.seek(total_bytes_uploaded)
 | 
| +        conn = key.bucket.connection
 | 
| +
 | 
| +        # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
 | 
| +        # pool connections) because httplib requires a new HTTP connection per
 | 
| +        # transaction. (Without this, calling http_conn.getresponse() would get
 | 
| +        # "ResponseNotReady".)
 | 
| +        http_conn = conn.new_http_connection(self.tracker_uri_host,
 | 
| +                                             conn.is_secure)
 | 
| +        http_conn.set_debuglevel(conn.debug)
 | 
| +
 | 
| +        # Make sure to close http_conn at end so if a local file read
 | 
| +        # failure occurs partway through server will terminate current upload
 | 
| +        # and can report that progress on next attempt.
 | 
| +        try:
 | 
| +            return self._upload_file_bytes(conn, http_conn, fp, file_length,
 | 
| +                                           total_bytes_uploaded, cb, num_cb, md5sum,
 | 
| +                                           headers)
 | 
| +        except (ResumableUploadException, socket.error):
 | 
| +            resp = self._query_server_state(conn, file_length)
 | 
| +            if resp.status == 400:
 | 
| +                raise ResumableUploadException('Got 400 response from server '
 | 
| +                    'state query after failed resumable upload attempt. This '
 | 
| +                    'can happen for various reasons, including specifying an '
 | 
| +                    'invalid request (e.g., an invalid canned ACL) or if the '
 | 
| +                    'file size changed between upload attempts',
 | 
| +                    ResumableTransferDisposition.ABORT)
 | 
| +            else:
 | 
| +                raise
 | 
| +        finally:
 | 
| +            http_conn.close()
 | 
| +
 | 
| +    def _check_final_md5(self, key, etag):
 | 
| +        """
 | 
| +        Checks that etag from server agrees with md5 computed before upload.
 | 
| +        This is important, since the upload could have spanned a number of
 | 
| +        hours and multiple processes (e.g., gsutil runs), and the user could
 | 
| +        change some of the file and not realize they have inconsistent data.
 | 
| +        """
 | 
| +        if key.bucket.connection.debug >= 1:
 | 
| +            print 'Checking md5 against etag.'
 | 
| +        if key.md5 != etag.strip('"\''):
 | 
| +            # Call key.open_read() before attempting to delete the
 | 
| +            # (incorrect-content) key, so we perform that request on a
 | 
| +            # different HTTP connection. This is neededb because httplib
 | 
| +            # will return a "Response not ready" error if you try to perform
 | 
| +            # a second transaction on the connection.
 | 
| +            key.open_read()
 | 
| +            key.close()
 | 
| +            key.delete()
 | 
| +            raise ResumableUploadException(
 | 
| +                'File changed during upload: md5 signature doesn\'t match etag '
 | 
| +                '(incorrect uploaded object deleted)',
 | 
| +                ResumableTransferDisposition.ABORT)
 | 
| +
 | 
| +    def handle_resumable_upload_exception(self, e, debug):
 | 
| +        if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS):
 | 
| +            if debug >= 1:
 | 
| +                print('Caught non-retryable ResumableUploadException (%s); '
 | 
| +                      'aborting but retaining tracker file' % e.message)
 | 
| +            raise
 | 
| +        elif (e.disposition == ResumableTransferDisposition.ABORT):
 | 
| +            if debug >= 1:
 | 
| +                print('Caught non-retryable ResumableUploadException (%s); '
 | 
| +                      'aborting and removing tracker file' % e.message)
 | 
| +            self._remove_tracker_file()
 | 
| +            raise
 | 
| +        else:
 | 
| +            if debug >= 1:
 | 
| +                print('Caught ResumableUploadException (%s) - will retry' %
 | 
| +                      e.message)
 | 
| +
 | 
| +    def track_progress_less_iterations(self, server_had_bytes_before_attempt,
 | 
| +                                       roll_back_md5=True, debug=0):
 | 
| +        # At this point we had a re-tryable failure; see if made progress.
 | 
| +        if self.server_has_bytes > server_had_bytes_before_attempt:
 | 
| +            self.progress_less_iterations = 0   # If progress, reset counter.
 | 
| +        else:
 | 
| +            self.progress_less_iterations += 1
 | 
| +            if roll_back_md5:
 | 
| +                # Rollback any potential md5sum updates, as we did not
 | 
| +                # make any progress in this iteration.
 | 
| +                self.md5sum = self.md5sum_before_attempt
 | 
| +
 | 
| +        if self.progress_less_iterations > self.num_retries:
 | 
| +            # Don't retry any longer in the current process.
 | 
| +            raise ResumableUploadException(
 | 
| +                    'Too many resumable upload attempts failed without '
 | 
| +                    'progress. You might try this upload again later',
 | 
| +                    ResumableTransferDisposition.ABORT_CUR_PROCESS)
 | 
| +
 | 
| +        # Use binary exponential backoff to desynchronize client requests.
 | 
| +        sleep_time_secs = random.random() * (2**self.progress_less_iterations)
 | 
| +        if debug >= 1:
 | 
| +            print ('Got retryable failure (%d progress-less in a row).\n'
 | 
| +                   'Sleeping %3.1f seconds before re-trying' %
 | 
| +                   (self.progress_less_iterations, sleep_time_secs))
 | 
| +        time.sleep(sleep_time_secs)
 | 
| +
 | 
| +    def send_file(self, key, fp, headers, cb=None, num_cb=10):
 | 
| +        """
 | 
| +        Upload a file to a key into a bucket on GS, using GS resumable upload
 | 
| +        protocol.
 | 
| +
 | 
| +        :type key: :class:`boto.s3.key.Key` or subclass
 | 
| +        :param key: The Key object to which data is to be uploaded
 | 
| +
 | 
| +        :type fp: file-like object
 | 
| +        :param fp: The file pointer to upload
 | 
| +
 | 
| +        :type headers: dict
 | 
| +        :param headers: The headers to pass along with the PUT request
 | 
| +
 | 
| +        :type cb: function
 | 
| +        :param cb: a callback function that will be called to report progress on
 | 
| +            the upload.  The callback should accept two integer parameters, the
 | 
| +            first representing the number of bytes that have been successfully
 | 
| +            transmitted to GS, and the second representing the total number of
 | 
| +            bytes that need to be transmitted.
 | 
| +
 | 
| +        :type num_cb: int
 | 
| +        :param num_cb: (optional) If a callback is specified with the cb
 | 
| +            parameter, this parameter determines the granularity of the callback
 | 
| +            by defining the maximum number of times the callback will be called
 | 
| +            during the file transfer. Providing a negative integer will cause
 | 
| +            your callback to be called with each buffer read.
 | 
| +
 | 
| +        Raises ResumableUploadException if a problem occurs during the transfer.
 | 
| +        """
 | 
| +
 | 
| +        if not headers:
 | 
| +            headers = {}
 | 
| +        # If Content-Type header is present and set to None, remove it.
 | 
| +        # This is gsutil's way of asking boto to refrain from auto-generating
 | 
| +        # that header.
 | 
| +        CT = 'Content-Type'
 | 
| +        if CT in headers and headers[CT] is None:
 | 
| +          del headers[CT]
 | 
| +
 | 
| +        headers['User-Agent'] = UserAgent
 | 
| +
 | 
| +        # Determine file size different ways for case where fp is actually a
 | 
| +        # wrapper around a Key vs an actual file.
 | 
| +        if isinstance(fp, KeyFile):
 | 
| +          file_length = fp.getkey().size
 | 
| +        else:
 | 
| +          fp.seek(0, os.SEEK_END)
 | 
| +          file_length = fp.tell()
 | 
| +          fp.seek(0)
 | 
| +        debug = key.bucket.connection.debug
 | 
| +
 | 
| +        # Compute the MD5 checksum on the fly.
 | 
| +        self.md5sum = md5()
 | 
| +
 | 
| +        # Use num-retries from constructor if one was provided; else check
 | 
| +        # for a value specified in the boto config file; else default to 5.
 | 
| +        if self.num_retries is None:
 | 
| +            self.num_retries = config.getint('Boto', 'num_retries', 6)
 | 
| +        self.progress_less_iterations = 0
 | 
| +
 | 
| +        while True:  # Retry as long as we're making progress.
 | 
| +            server_had_bytes_before_attempt = self.server_has_bytes
 | 
| +            self.md5sum_before_attempt = self.md5sum.copy()
 | 
| +            try:
 | 
| +                # Save generation and meta_generation in class state so caller
 | 
| +                # can find these values, for use in preconditions of future
 | 
| +                # operations on the uploaded object.
 | 
| +                (etag, self.generation, self.meta_generation) = (
 | 
| +                    self._attempt_resumable_upload(key, fp, file_length,
 | 
| +                                                   headers, cb, num_cb,
 | 
| +                                                   self.md5sum))
 | 
| +
 | 
| +                # Get the final md5 for the uploaded content.
 | 
| +                hd = self.md5sum.hexdigest()
 | 
| +                key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd)
 | 
| +
 | 
| +                # Upload succceded, so remove the tracker file (if have one).
 | 
| +                self._remove_tracker_file()
 | 
| +                self._check_final_md5(key, etag)
 | 
| +                if debug >= 1:
 | 
| +                    print 'Resumable upload complete.'
 | 
| +                return
 | 
| +            except self.RETRYABLE_EXCEPTIONS, e:
 | 
| +                if debug >= 1:
 | 
| +                    print('Caught exception (%s)' % e.__repr__())
 | 
| +                if isinstance(e, IOError) and e.errno == errno.EPIPE:
 | 
| +                    # Broken pipe error causes httplib to immediately
 | 
| +                    # close the socket (http://bugs.python.org/issue5542),
 | 
| +                    # so we need to close the connection before we resume
 | 
| +                    # the upload (which will cause a new connection to be
 | 
| +                    # opened the next time an HTTP request is sent).
 | 
| +                    key.bucket.connection.connection.close()
 | 
| +            except ResumableUploadException, e:
 | 
| +                self.handle_resumable_upload_exception(e, debug)
 | 
| +
 | 
| +            self.track_progress_less_iterations(server_had_bytes_before_attempt,
 | 
| +                                                True, debug)
 | 
| 
 |