| Index: third_party/cloud_storage/cloudstorage/storage_api.py
|
| diff --git a/third_party/cloud_storage/cloudstorage/storage_api.py b/third_party/cloud_storage/cloudstorage/storage_api.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..910c365be348e8454ee2848500af8aa57cb05943
|
| --- /dev/null
|
| +++ b/third_party/cloud_storage/cloudstorage/storage_api.py
|
| @@ -0,0 +1,887 @@
|
| +# Copyright 2012 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing,
|
| +# software distributed under the License is distributed on an
|
| +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
| +# either express or implied. See the License for the specific
|
| +# language governing permissions and limitations under the License.
|
| +
|
| +"""Python wrappers for the Google Storage RESTful API."""
|
| +
|
| +
|
| +
|
| +
|
| +
|
| +__all__ = ['ReadBuffer',
|
| + 'StreamingBuffer',
|
| + ]
|
| +
|
| +import collections
|
| +import os
|
| +import urlparse
|
| +
|
| +from . import api_utils
|
| +from . import common
|
| +from . import errors
|
| +from . import rest_api
|
| +
|
| +try:
|
| + from google.appengine.api import urlfetch
|
| + from google.appengine.ext import ndb
|
| +except ImportError:
|
| + from google.appengine.api import urlfetch
|
| + from google.appengine.ext import ndb
|
| +
|
| +
|
| +
|
| +def _get_storage_api(retry_params, account_id=None):
|
| + """Returns storage_api instance for API methods.
|
| +
|
| + Args:
|
| + retry_params: An instance of api_utils.RetryParams. If none,
|
| + thread's default will be used.
|
| + account_id: Internal-use only.
|
| +
|
| + Returns:
|
| + A storage_api instance to handle urlfetch work to GCS.
|
| + On dev appserver, this instance by default will talk to a local stub
|
| + unless common.ACCESS_TOKEN is set. That token will be used to talk
|
| + to the real GCS.
|
| + """
|
| +
|
| +
|
| + api = _StorageApi(_StorageApi.full_control_scope,
|
| + service_account_id=account_id,
|
| + retry_params=retry_params)
|
| + if common.local_run() and not common.get_access_token():
|
| + api.api_url = common.local_api_url()
|
| + if common.get_access_token():
|
| + api.token = common.get_access_token()
|
| + return api
|
| +
|
| +
|
| +class _StorageApi(rest_api._RestApi):
|
| + """A simple wrapper for the Google Storage RESTful API.
|
| +
|
| + WARNING: Do NOT directly use this api. It's an implementation detail
|
| + and is subject to change at any release.
|
| +
|
| + All async methods have similar args and returns.
|
| +
|
| + Args:
|
| + path: The path to the Google Storage object or bucket, e.g.
|
| + '/mybucket/myfile' or '/mybucket'.
|
| + **kwd: Options for urlfetch. e.g.
|
| + headers={'content-type': 'text/plain'}, payload='blah'.
|
| +
|
| + Returns:
|
| + A ndb Future. When fulfilled, future.get_result() should return
|
| + a tuple of (status, headers, content) that represents a HTTP response
|
| + of Google Cloud Storage XML API.
|
| + """
|
| +
|
| + api_url = 'https://storage.googleapis.com'
|
| + read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
|
| + read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
|
| + full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
|
| +
|
| + def __getstate__(self):
|
| + """Store state as part of serialization/pickling.
|
| +
|
| + Returns:
|
| + A tuple (of dictionaries) with the state of this object
|
| + """
|
| + return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
|
| +
|
| + def __setstate__(self, state):
|
| + """Restore state as part of deserialization/unpickling.
|
| +
|
| + Args:
|
| + state: the tuple from a __getstate__ call
|
| + """
|
| + superstate, localstate = state
|
| + super(_StorageApi, self).__setstate__(superstate)
|
| + self.api_url = localstate['api_url']
|
| +
|
| + @api_utils._eager_tasklet
|
| + @ndb.tasklet
|
| + def do_request_async(self, url, method='GET', headers=None, payload=None,
|
| + deadline=None, callback=None):
|
| + """Inherit docs.
|
| +
|
| + This method translates urlfetch exceptions to more service specific ones.
|
| + """
|
| + if headers is None:
|
| + headers = {}
|
| + if 'x-goog-api-version' not in headers:
|
| + headers['x-goog-api-version'] = '2'
|
| + headers['accept-encoding'] = 'gzip, *'
|
| + try:
|
| + resp_tuple = yield super(_StorageApi, self).do_request_async(
|
| + url, method=method, headers=headers, payload=payload,
|
| + deadline=deadline, callback=callback)
|
| + except urlfetch.DownloadError, e:
|
| + raise errors.TimeoutError(
|
| + 'Request to Google Cloud Storage timed out.', e)
|
| +
|
| + raise ndb.Return(resp_tuple)
|
| +
|
| +
|
| + def post_object_async(self, path, **kwds):
|
| + """POST to an object."""
|
| + return self.do_request_async(self.api_url + path, 'POST', **kwds)
|
| +
|
| + def put_object_async(self, path, **kwds):
|
| + """PUT an object."""
|
| + return self.do_request_async(self.api_url + path, 'PUT', **kwds)
|
| +
|
| + def get_object_async(self, path, **kwds):
|
| + """GET an object.
|
| +
|
| + Note: No payload argument is supported.
|
| + """
|
| + return self.do_request_async(self.api_url + path, 'GET', **kwds)
|
| +
|
| + def delete_object_async(self, path, **kwds):
|
| + """DELETE an object.
|
| +
|
| + Note: No payload argument is supported.
|
| + """
|
| + return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
|
| +
|
| + def head_object_async(self, path, **kwds):
|
| + """HEAD an object.
|
| +
|
| + Depending on request headers, HEAD returns various object properties,
|
| + e.g. Content-Length, Last-Modified, and ETag.
|
| +
|
| + Note: No payload argument is supported.
|
| + """
|
| + return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
|
| +
|
| + def get_bucket_async(self, path, **kwds):
|
| + """GET a bucket."""
|
| + return self.do_request_async(self.api_url + path, 'GET', **kwds)
|
| +
|
| +
|
| +_StorageApi = rest_api.add_sync_methods(_StorageApi)
|
| +
|
| +
|
| +class ReadBuffer(object):
|
| + """A class for reading Google storage files."""
|
| +
|
| + DEFAULT_BUFFER_SIZE = 1024 * 1024
|
| + MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
|
| +
|
| + def __init__(self,
|
| + api,
|
| + path,
|
| + buffer_size=DEFAULT_BUFFER_SIZE,
|
| + max_request_size=MAX_REQUEST_SIZE):
|
| + """Constructor.
|
| +
|
| + Args:
|
| + api: A StorageApi instance.
|
| + path: Quoted/escaped path to the object, e.g. /mybucket/myfile
|
| + buffer_size: buffer size. The ReadBuffer keeps
|
| + one buffer. But there may be a pending future that contains
|
| + a second buffer. This size must be less than max_request_size.
|
| + max_request_size: Max bytes to request in one urlfetch.
|
| + """
|
| + self._api = api
|
| + self._path = path
|
| + self.name = api_utils._unquote_filename(path)
|
| + self.closed = False
|
| +
|
| + assert buffer_size <= max_request_size
|
| + self._buffer_size = buffer_size
|
| + self._max_request_size = max_request_size
|
| + self._offset = 0
|
| + self._buffer = _Buffer()
|
| + self._etag = None
|
| +
|
| + get_future = self._get_segment(0, self._buffer_size, check_response=False)
|
| +
|
| + status, headers, content = self._api.head_object(path)
|
| + errors.check_status(status, [200], path, resp_headers=headers, body=content)
|
| + self._file_size = long(common.get_stored_content_length(headers))
|
| + self._check_etag(headers.get('etag'))
|
| +
|
| + self._buffer_future = None
|
| +
|
| + if self._file_size != 0:
|
| + content, check_response_closure = get_future.get_result()
|
| + check_response_closure()
|
| + self._buffer.reset(content)
|
| + self._request_next_buffer()
|
| +
|
| + def __getstate__(self):
|
| + """Store state as part of serialization/pickling.
|
| +
|
| + The contents of the read buffer are not stored, only the current offset for
|
| + data read by the client. A new read buffer is established at unpickling.
|
| + The head information for the object (file size and etag) are stored to
|
| + reduce startup and ensure the file has not changed.
|
| +
|
| + Returns:
|
| + A dictionary with the state of this object
|
| + """
|
| + return {'api': self._api,
|
| + 'path': self._path,
|
| + 'buffer_size': self._buffer_size,
|
| + 'request_size': self._max_request_size,
|
| + 'etag': self._etag,
|
| + 'size': self._file_size,
|
| + 'offset': self._offset,
|
| + 'closed': self.closed}
|
| +
|
| + def __setstate__(self, state):
|
| + """Restore state as part of deserialization/unpickling.
|
| +
|
| + Args:
|
| + state: the dictionary from a __getstate__ call
|
| +
|
| + Along with restoring the state, pre-fetch the next read buffer.
|
| + """
|
| + self._api = state['api']
|
| + self._path = state['path']
|
| + self.name = api_utils._unquote_filename(self._path)
|
| + self._buffer_size = state['buffer_size']
|
| + self._max_request_size = state['request_size']
|
| + self._etag = state['etag']
|
| + self._file_size = state['size']
|
| + self._offset = state['offset']
|
| + self._buffer = _Buffer()
|
| + self.closed = state['closed']
|
| + self._buffer_future = None
|
| + if self._remaining() and not self.closed:
|
| + self._request_next_buffer()
|
| +
|
| + def __iter__(self):
|
| + """Iterator interface.
|
| +
|
| + Note the ReadBuffer container itself is the iterator. It's
|
| + (quote PEP0234)
|
| + 'destructive: they consumes all the values and a second iterator
|
| + cannot easily be created that iterates independently over the same values.
|
| + You could open the file for the second time, or seek() to the beginning.'
|
| +
|
| + Returns:
|
| + Self.
|
| + """
|
| + return self
|
| +
|
| + def next(self):
|
| + line = self.readline()
|
| + if not line:
|
| + raise StopIteration()
|
| + return line
|
| +
|
| + def readline(self, size=-1):
|
| + """Read one line delimited by '\n' from the file.
|
| +
|
| + A trailing newline character is kept in the string. It may be absent when a
|
| + file ends with an incomplete line. If the size argument is non-negative,
|
| + it specifies the maximum string size (counting the newline) to return.
|
| + A negative size is the same as unspecified. Empty string is returned
|
| + only when EOF is encountered immediately.
|
| +
|
| + Args:
|
| + size: Maximum number of bytes to read. If not specified, readline stops
|
| + only on '\n' or EOF.
|
| +
|
| + Returns:
|
| + The data read as a string.
|
| +
|
| + Raises:
|
| + IOError: When this buffer is closed.
|
| + """
|
| + self._check_open()
|
| + if size == 0 or not self._remaining():
|
| + return ''
|
| +
|
| + data_list = []
|
| + newline_offset = self._buffer.find_newline(size)
|
| + while newline_offset < 0:
|
| + data = self._buffer.read(size)
|
| + size -= len(data)
|
| + self._offset += len(data)
|
| + data_list.append(data)
|
| + if size == 0 or not self._remaining():
|
| + return ''.join(data_list)
|
| + self._buffer.reset(self._buffer_future.get_result())
|
| + self._request_next_buffer()
|
| + newline_offset = self._buffer.find_newline(size)
|
| +
|
| + data = self._buffer.read_to_offset(newline_offset + 1)
|
| + self._offset += len(data)
|
| + data_list.append(data)
|
| +
|
| + return ''.join(data_list)
|
| +
|
| + def read(self, size=-1):
|
| + """Read data from RAW file.
|
| +
|
| + Args:
|
| + size: Number of bytes to read as integer. Actual number of bytes
|
| + read is always equal to size unless EOF is reached. If size is
|
| + negative or unspecified, read the entire file.
|
| +
|
| + Returns:
|
| + data read as str.
|
| +
|
| + Raises:
|
| + IOError: When this buffer is closed.
|
| + """
|
| + self._check_open()
|
| + if not self._remaining():
|
| + return ''
|
| +
|
| + data_list = []
|
| + while True:
|
| + remaining = self._buffer.remaining()
|
| + if size >= 0 and size < remaining:
|
| + data_list.append(self._buffer.read(size))
|
| + self._offset += size
|
| + break
|
| + else:
|
| + size -= remaining
|
| + self._offset += remaining
|
| + data_list.append(self._buffer.read())
|
| +
|
| + if self._buffer_future is None:
|
| + if size < 0 or size >= self._remaining():
|
| + needs = self._remaining()
|
| + else:
|
| + needs = size
|
| + data_list.extend(self._get_segments(self._offset, needs))
|
| + self._offset += needs
|
| + break
|
| +
|
| + if self._buffer_future:
|
| + self._buffer.reset(self._buffer_future.get_result())
|
| + self._buffer_future = None
|
| +
|
| + if self._buffer_future is None:
|
| + self._request_next_buffer()
|
| + return ''.join(data_list)
|
| +
|
| + def _remaining(self):
|
| + return self._file_size - self._offset
|
| +
|
| + def _request_next_buffer(self):
|
| + """Request next buffer.
|
| +
|
| + Requires self._offset and self._buffer are in consistent state.
|
| + """
|
| + self._buffer_future = None
|
| + next_offset = self._offset + self._buffer.remaining()
|
| + if next_offset != self._file_size:
|
| + self._buffer_future = self._get_segment(next_offset,
|
| + self._buffer_size)
|
| +
|
| + def _get_segments(self, start, request_size):
|
| + """Get segments of the file from Google Storage as a list.
|
| +
|
| + A large request is broken into segments to avoid hitting urlfetch
|
| + response size limit. Each segment is returned from a separate urlfetch.
|
| +
|
| + Args:
|
| + start: start offset to request. Inclusive. Have to be within the
|
| + range of the file.
|
| + request_size: number of bytes to request.
|
| +
|
| + Returns:
|
| + A list of file segments in order
|
| + """
|
| + if not request_size:
|
| + return []
|
| +
|
| + end = start + request_size
|
| + futures = []
|
| +
|
| + while request_size > self._max_request_size:
|
| + futures.append(self._get_segment(start, self._max_request_size))
|
| + request_size -= self._max_request_size
|
| + start += self._max_request_size
|
| + if start < end:
|
| + futures.append(self._get_segment(start, end-start))
|
| + return [fut.get_result() for fut in futures]
|
| +
|
| + @ndb.tasklet
|
| + def _get_segment(self, start, request_size, check_response=True):
|
| + """Get a segment of the file from Google Storage.
|
| +
|
| + Args:
|
| + start: start offset of the segment. Inclusive. Have to be within the
|
| + range of the file.
|
| + request_size: number of bytes to request. Have to be small enough
|
| + for a single urlfetch request. May go over the logical range of the
|
| + file.
|
| + check_response: True to check the validity of GCS response automatically
|
| + before the future returns. False otherwise. See Yields section.
|
| +
|
| + Yields:
|
| + If check_response is True, the segment [start, start + request_size)
|
| + of the file.
|
| + Otherwise, a tuple. The first element is the unverified file segment.
|
| + The second element is a closure that checks response. Caller should
|
| + first invoke the closure before consuing the file segment.
|
| +
|
| + Raises:
|
| + ValueError: if the file has changed while reading.
|
| + """
|
| + end = start + request_size - 1
|
| + content_range = '%d-%d' % (start, end)
|
| + headers = {'Range': 'bytes=' + content_range}
|
| + status, resp_headers, content = yield self._api.get_object_async(
|
| + self._path, headers=headers)
|
| + def _checker():
|
| + errors.check_status(status, [200, 206], self._path, headers,
|
| + resp_headers, body=content)
|
| + self._check_etag(resp_headers.get('etag'))
|
| + if check_response:
|
| + _checker()
|
| + raise ndb.Return(content)
|
| + raise ndb.Return(content, _checker)
|
| +
|
| + def _check_etag(self, etag):
|
| + """Check if etag is the same across requests to GCS.
|
| +
|
| + If self._etag is None, set it. If etag is set, check that the new
|
| + etag equals the old one.
|
| +
|
| + In the __init__ method, we fire one HEAD and one GET request using
|
| + ndb tasklet. One of them would return first and set the first value.
|
| +
|
| + Args:
|
| + etag: etag from a GCS HTTP response. None if etag is not part of the
|
| + response header. It could be None for example in the case of GCS
|
| + composite file.
|
| +
|
| + Raises:
|
| + ValueError: if two etags are not equal.
|
| + """
|
| + if etag is None:
|
| + return
|
| + elif self._etag is None:
|
| + self._etag = etag
|
| + elif self._etag != etag:
|
| + raise ValueError('File on GCS has changed while reading.')
|
| +
|
| + def close(self):
|
| + self.closed = True
|
| + self._buffer = None
|
| + self._buffer_future = None
|
| +
|
| + def __enter__(self):
|
| + return self
|
| +
|
| + def __exit__(self, atype, value, traceback):
|
| + self.close()
|
| + return False
|
| +
|
| + def seek(self, offset, whence=os.SEEK_SET):
|
| + """Set the file's current offset.
|
| +
|
| + Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
|
| +
|
| + Args:
|
| + offset: seek offset as number.
|
| + whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
|
| + os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
|
| + (seek relative to the end, offset should be negative).
|
| +
|
| + Raises:
|
| + IOError: When this buffer is closed.
|
| + ValueError: When whence is invalid.
|
| + """
|
| + self._check_open()
|
| +
|
| + self._buffer.reset()
|
| + self._buffer_future = None
|
| +
|
| + if whence == os.SEEK_SET:
|
| + self._offset = offset
|
| + elif whence == os.SEEK_CUR:
|
| + self._offset += offset
|
| + elif whence == os.SEEK_END:
|
| + self._offset = self._file_size + offset
|
| + else:
|
| + raise ValueError('Whence mode %s is invalid.' % str(whence))
|
| +
|
| + self._offset = min(self._offset, self._file_size)
|
| + self._offset = max(self._offset, 0)
|
| + if self._remaining():
|
| + self._request_next_buffer()
|
| +
|
| + def tell(self):
|
| + """Tell the file's current offset.
|
| +
|
| + Returns:
|
| + current offset in reading this file.
|
| +
|
| + Raises:
|
| + IOError: When this buffer is closed.
|
| + """
|
| + self._check_open()
|
| + return self._offset
|
| +
|
| + def _check_open(self):
|
| + if self.closed:
|
| + raise IOError('Buffer is closed.')
|
| +
|
| + def seekable(self):
|
| + return True
|
| +
|
| + def readable(self):
|
| + return True
|
| +
|
| + def writable(self):
|
| + return False
|
| +
|
| +
|
| +class _Buffer(object):
|
| + """In memory buffer."""
|
| +
|
| + def __init__(self):
|
| + self.reset()
|
| +
|
| + def reset(self, content='', offset=0):
|
| + self._buffer = content
|
| + self._offset = offset
|
| +
|
| + def read(self, size=-1):
|
| + """Returns bytes from self._buffer and update related offsets.
|
| +
|
| + Args:
|
| + size: number of bytes to read starting from current offset.
|
| + Read the entire buffer if negative.
|
| +
|
| + Returns:
|
| + Requested bytes from buffer.
|
| + """
|
| + if size < 0:
|
| + offset = len(self._buffer)
|
| + else:
|
| + offset = self._offset + size
|
| + return self.read_to_offset(offset)
|
| +
|
| + def read_to_offset(self, offset):
|
| + """Returns bytes from self._buffer and update related offsets.
|
| +
|
| + Args:
|
| + offset: read from current offset to this offset, exclusive.
|
| +
|
| + Returns:
|
| + Requested bytes from buffer.
|
| + """
|
| + assert offset >= self._offset
|
| + result = self._buffer[self._offset: offset]
|
| + self._offset += len(result)
|
| + return result
|
| +
|
| + def remaining(self):
|
| + return len(self._buffer) - self._offset
|
| +
|
| + def find_newline(self, size=-1):
|
| + """Search for newline char in buffer starting from current offset.
|
| +
|
| + Args:
|
| + size: number of bytes to search. -1 means all.
|
| +
|
| + Returns:
|
| + offset of newline char in buffer. -1 if doesn't exist.
|
| + """
|
| + if size < 0:
|
| + return self._buffer.find('\n', self._offset)
|
| + return self._buffer.find('\n', self._offset, self._offset + size)
|
| +
|
| +
|
| +class StreamingBuffer(object):
|
| + """A class for creating large objects using the 'resumable' API.
|
| +
|
| + The API is a subset of the Python writable stream API sufficient to
|
| + support writing zip files using the zipfile module.
|
| +
|
| + The exact sequence of calls and use of headers is documented at
|
| + https://developers.google.com/storage/docs/developer-guide#unknownresumables
|
| + """
|
| +
|
| + _blocksize = 256 * 1024
|
| +
|
| + _flushsize = 8 * _blocksize
|
| +
|
| + _maxrequestsize = 9 * 4 * _blocksize
|
| +
|
| + def __init__(self,
|
| + api,
|
| + path,
|
| + content_type=None,
|
| + gcs_headers=None):
|
| + """Constructor.
|
| +
|
| + Args:
|
| + api: A StorageApi instance.
|
| + path: Quoted/escaped path to the object, e.g. /mybucket/myfile
|
| + content_type: Optional content-type; Default value is
|
| + delegate to Google Cloud Storage.
|
| + gcs_headers: additional gs headers as a str->str dict, e.g
|
| + {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
|
| + Raises:
|
| + IOError: When this location can not be found.
|
| + """
|
| + assert self._maxrequestsize > self._blocksize
|
| + assert self._maxrequestsize % self._blocksize == 0
|
| + assert self._maxrequestsize >= self._flushsize
|
| +
|
| + self._api = api
|
| + self._path = path
|
| +
|
| + self.name = api_utils._unquote_filename(path)
|
| + self.closed = False
|
| +
|
| + self._buffer = collections.deque()
|
| + self._buffered = 0
|
| + self._written = 0
|
| + self._offset = 0
|
| +
|
| + headers = {'x-goog-resumable': 'start'}
|
| + if content_type:
|
| + headers['content-type'] = content_type
|
| + if gcs_headers:
|
| + headers.update(gcs_headers)
|
| + status, resp_headers, content = self._api.post_object(path, headers=headers)
|
| + errors.check_status(status, [201], path, headers, resp_headers,
|
| + body=content)
|
| + loc = resp_headers.get('location')
|
| + if not loc:
|
| + raise IOError('No location header found in 201 response')
|
| + parsed = urlparse.urlparse(loc)
|
| + self._path_with_token = '%s?%s' % (self._path, parsed.query)
|
| +
|
| + def __getstate__(self):
|
| + """Store state as part of serialization/pickling.
|
| +
|
| + The contents of the write buffer are stored. Writes to the underlying
|
| + storage are required to be on block boundaries (_blocksize) except for the
|
| + last write. In the worst case the pickled version of this object may be
|
| + slightly larger than the blocksize.
|
| +
|
| + Returns:
|
| + A dictionary with the state of this object
|
| +
|
| + """
|
| + return {'api': self._api,
|
| + 'path': self._path,
|
| + 'path_token': self._path_with_token,
|
| + 'buffer': self._buffer,
|
| + 'buffered': self._buffered,
|
| + 'written': self._written,
|
| + 'offset': self._offset,
|
| + 'closed': self.closed}
|
| +
|
| + def __setstate__(self, state):
|
| + """Restore state as part of deserialization/unpickling.
|
| +
|
| + Args:
|
| + state: the dictionary from a __getstate__ call
|
| + """
|
| + self._api = state['api']
|
| + self._path_with_token = state['path_token']
|
| + self._buffer = state['buffer']
|
| + self._buffered = state['buffered']
|
| + self._written = state['written']
|
| + self._offset = state['offset']
|
| + self.closed = state['closed']
|
| + self._path = state['path']
|
| + self.name = api_utils._unquote_filename(self._path)
|
| +
|
| + def write(self, data):
|
| + """Write some bytes.
|
| +
|
| + Args:
|
| + data: data to write. str.
|
| +
|
| + Raises:
|
| + TypeError: if data is not of type str.
|
| + """
|
| + self._check_open()
|
| + if not isinstance(data, str):
|
| + raise TypeError('Expected str but got %s.' % type(data))
|
| + if not data:
|
| + return
|
| + self._buffer.append(data)
|
| + self._buffered += len(data)
|
| + self._offset += len(data)
|
| + if self._buffered >= self._flushsize:
|
| + self._flush()
|
| +
|
| + def flush(self):
|
| + """Flush as much as possible to GCS.
|
| +
|
| + GCS *requires* that all writes except for the final one align on
|
| + 256KB boundaries. So the internal buffer may still have < 256KB bytes left
|
| + after flush.
|
| + """
|
| + self._check_open()
|
| + self._flush(finish=False)
|
| +
|
| + def tell(self):
|
| + """Return the total number of bytes passed to write() so far.
|
| +
|
| + (There is no seek() method.)
|
| + """
|
| + return self._offset
|
| +
|
| + def close(self):
|
| + """Flush the buffer and finalize the file.
|
| +
|
| + When this returns the new file is available for reading.
|
| + """
|
| + if not self.closed:
|
| + self.closed = True
|
| + self._flush(finish=True)
|
| + self._buffer = None
|
| +
|
| + def __enter__(self):
|
| + return self
|
| +
|
| + def __exit__(self, atype, value, traceback):
|
| + self.close()
|
| + return False
|
| +
|
| + def _flush(self, finish=False):
|
| + """Internal API to flush.
|
| +
|
| + Buffer is flushed to GCS only when the total amount of buffered data is at
|
| + least self._blocksize, or to flush the final (incomplete) block of
|
| + the file with finish=True.
|
| + """
|
| + while ((finish and self._buffered >= 0) or
|
| + (not finish and self._buffered >= self._blocksize)):
|
| + tmp_buffer = []
|
| + tmp_buffer_len = 0
|
| +
|
| + excess = 0
|
| + while self._buffer:
|
| + buf = self._buffer.popleft()
|
| + size = len(buf)
|
| + self._buffered -= size
|
| + tmp_buffer.append(buf)
|
| + tmp_buffer_len += size
|
| + if tmp_buffer_len >= self._maxrequestsize:
|
| + excess = tmp_buffer_len - self._maxrequestsize
|
| + break
|
| + if not finish and (
|
| + tmp_buffer_len % self._blocksize + self._buffered <
|
| + self._blocksize):
|
| + excess = tmp_buffer_len % self._blocksize
|
| + break
|
| +
|
| + if excess:
|
| + over = tmp_buffer.pop()
|
| + size = len(over)
|
| + assert size >= excess
|
| + tmp_buffer_len -= size
|
| + head, tail = over[:-excess], over[-excess:]
|
| + self._buffer.appendleft(tail)
|
| + self._buffered += len(tail)
|
| + if head:
|
| + tmp_buffer.append(head)
|
| + tmp_buffer_len += len(head)
|
| +
|
| + data = ''.join(tmp_buffer)
|
| + file_len = '*'
|
| + if finish and not self._buffered:
|
| + file_len = self._written + len(data)
|
| + self._send_data(data, self._written, file_len)
|
| + self._written += len(data)
|
| + if file_len != '*':
|
| + break
|
| +
|
| + def _send_data(self, data, start_offset, file_len):
|
| + """Send the block to the storage service.
|
| +
|
| + This is a utility method that does not modify self.
|
| +
|
| + Args:
|
| + data: data to send in str.
|
| + start_offset: start offset of the data in relation to the file.
|
| + file_len: an int if this is the last data to append to the file.
|
| + Otherwise '*'.
|
| + """
|
| + headers = {}
|
| + end_offset = start_offset + len(data) - 1
|
| +
|
| + if data:
|
| + headers['content-range'] = ('bytes %d-%d/%s' %
|
| + (start_offset, end_offset, file_len))
|
| + else:
|
| + headers['content-range'] = ('bytes */%s' % file_len)
|
| +
|
| + status, response_headers, content = self._api.put_object(
|
| + self._path_with_token, payload=data, headers=headers)
|
| + if file_len == '*':
|
| + expected = 308
|
| + else:
|
| + expected = 200
|
| + errors.check_status(status, [expected], self._path, headers,
|
| + response_headers, content,
|
| + {'upload_path': self._path_with_token})
|
| +
|
| + def _get_offset_from_gcs(self):
|
| + """Get the last offset that has been written to GCS.
|
| +
|
| + This is a utility method that does not modify self.
|
| +
|
| + Returns:
|
| + an int of the last offset written to GCS by this upload, inclusive.
|
| + -1 means nothing has been written.
|
| + """
|
| + headers = {'content-range': 'bytes */*'}
|
| + status, response_headers, content = self._api.put_object(
|
| + self._path_with_token, headers=headers)
|
| + errors.check_status(status, [308], self._path, headers,
|
| + response_headers, content,
|
| + {'upload_path': self._path_with_token})
|
| + val = response_headers.get('range')
|
| + if val is None:
|
| + return -1
|
| + _, offset = val.rsplit('-', 1)
|
| + return int(offset)
|
| +
|
| + def _force_close(self, file_length=None):
|
| + """Close this buffer on file_length.
|
| +
|
| + Finalize this upload immediately on file_length.
|
| + Contents that are still in memory will not be uploaded.
|
| +
|
| + This is a utility method that does not modify self.
|
| +
|
| + Args:
|
| + file_length: file length. Must match what has been uploaded. If None,
|
| + it will be queried from GCS.
|
| + """
|
| + if file_length is None:
|
| + file_length = self._get_offset_from_gcs() + 1
|
| + self._send_data('', 0, file_length)
|
| +
|
| + def _check_open(self):
|
| + if self.closed:
|
| + raise IOError('Buffer is closed.')
|
| +
|
| + def seekable(self):
|
| + return False
|
| +
|
| + def readable(self):
|
| + return False
|
| +
|
| + def writable(self):
|
| + return True
|
|
|