Index: third_party/cloud_storage/cloudstorage/storage_api.py |
diff --git a/third_party/cloud_storage/cloudstorage/storage_api.py b/third_party/cloud_storage/cloudstorage/storage_api.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..910c365be348e8454ee2848500af8aa57cb05943 |
--- /dev/null |
+++ b/third_party/cloud_storage/cloudstorage/storage_api.py |
@@ -0,0 +1,887 @@ |
+# Copyright 2012 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, |
+# software distributed under the License is distributed on an |
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
+# either express or implied. See the License for the specific |
+# language governing permissions and limitations under the License. |
+ |
+"""Python wrappers for the Google Storage RESTful API.""" |
+ |
+ |
+ |
+ |
+ |
+__all__ = ['ReadBuffer', |
+ 'StreamingBuffer', |
+ ] |
+ |
+import collections |
+import os |
+import urlparse |
+ |
+from . import api_utils |
+from . import common |
+from . import errors |
+from . import rest_api |
+ |
+try: |
+ from google.appengine.api import urlfetch |
+ from google.appengine.ext import ndb |
+except ImportError: |
+ from google.appengine.api import urlfetch |
+ from google.appengine.ext import ndb |
+ |
+ |
+ |
+def _get_storage_api(retry_params, account_id=None): |
+ """Returns storage_api instance for API methods. |
+ |
+ Args: |
+ retry_params: An instance of api_utils.RetryParams. If none, |
+ thread's default will be used. |
+ account_id: Internal-use only. |
+ |
+ Returns: |
+ A storage_api instance to handle urlfetch work to GCS. |
+ On dev appserver, this instance by default will talk to a local stub |
+ unless common.ACCESS_TOKEN is set. That token will be used to talk |
+ to the real GCS. |
+ """ |
+ |
+ |
+ api = _StorageApi(_StorageApi.full_control_scope, |
+ service_account_id=account_id, |
+ retry_params=retry_params) |
+ if common.local_run() and not common.get_access_token(): |
+ api.api_url = common.local_api_url() |
+ if common.get_access_token(): |
+ api.token = common.get_access_token() |
+ return api |
+ |
+ |
+class _StorageApi(rest_api._RestApi): |
+ """A simple wrapper for the Google Storage RESTful API. |
+ |
+ WARNING: Do NOT directly use this api. It's an implementation detail |
+ and is subject to change at any release. |
+ |
+ All async methods have similar args and returns. |
+ |
+ Args: |
+ path: The path to the Google Storage object or bucket, e.g. |
+ '/mybucket/myfile' or '/mybucket'. |
+ **kwd: Options for urlfetch. e.g. |
+ headers={'content-type': 'text/plain'}, payload='blah'. |
+ |
+ Returns: |
+ A ndb Future. When fulfilled, future.get_result() should return |
+ a tuple of (status, headers, content) that represents a HTTP response |
+ of Google Cloud Storage XML API. |
+ """ |
+ |
+ api_url = 'https://storage.googleapis.com' |
+ read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only' |
+ read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write' |
+ full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control' |
+ |
+ def __getstate__(self): |
+ """Store state as part of serialization/pickling. |
+ |
+ Returns: |
+ A tuple (of dictionaries) with the state of this object |
+ """ |
+ return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url}) |
+ |
+ def __setstate__(self, state): |
+ """Restore state as part of deserialization/unpickling. |
+ |
+ Args: |
+ state: the tuple from a __getstate__ call |
+ """ |
+ superstate, localstate = state |
+ super(_StorageApi, self).__setstate__(superstate) |
+ self.api_url = localstate['api_url'] |
+ |
+ @api_utils._eager_tasklet |
+ @ndb.tasklet |
+ def do_request_async(self, url, method='GET', headers=None, payload=None, |
+ deadline=None, callback=None): |
+ """Inherit docs. |
+ |
+ This method translates urlfetch exceptions to more service specific ones. |
+ """ |
+ if headers is None: |
+ headers = {} |
+ if 'x-goog-api-version' not in headers: |
+ headers['x-goog-api-version'] = '2' |
+ headers['accept-encoding'] = 'gzip, *' |
+ try: |
+ resp_tuple = yield super(_StorageApi, self).do_request_async( |
+ url, method=method, headers=headers, payload=payload, |
+ deadline=deadline, callback=callback) |
+ except urlfetch.DownloadError, e: |
+ raise errors.TimeoutError( |
+ 'Request to Google Cloud Storage timed out.', e) |
+ |
+ raise ndb.Return(resp_tuple) |
+ |
+ |
+ def post_object_async(self, path, **kwds): |
+ """POST to an object.""" |
+ return self.do_request_async(self.api_url + path, 'POST', **kwds) |
+ |
+ def put_object_async(self, path, **kwds): |
+ """PUT an object.""" |
+ return self.do_request_async(self.api_url + path, 'PUT', **kwds) |
+ |
+ def get_object_async(self, path, **kwds): |
+ """GET an object. |
+ |
+ Note: No payload argument is supported. |
+ """ |
+ return self.do_request_async(self.api_url + path, 'GET', **kwds) |
+ |
+ def delete_object_async(self, path, **kwds): |
+ """DELETE an object. |
+ |
+ Note: No payload argument is supported. |
+ """ |
+ return self.do_request_async(self.api_url + path, 'DELETE', **kwds) |
+ |
+ def head_object_async(self, path, **kwds): |
+ """HEAD an object. |
+ |
+ Depending on request headers, HEAD returns various object properties, |
+ e.g. Content-Length, Last-Modified, and ETag. |
+ |
+ Note: No payload argument is supported. |
+ """ |
+ return self.do_request_async(self.api_url + path, 'HEAD', **kwds) |
+ |
+ def get_bucket_async(self, path, **kwds): |
+ """GET a bucket.""" |
+ return self.do_request_async(self.api_url + path, 'GET', **kwds) |
+ |
+ |
+_StorageApi = rest_api.add_sync_methods(_StorageApi) |
+ |
+ |
+class ReadBuffer(object): |
+ """A class for reading Google storage files.""" |
+ |
+ DEFAULT_BUFFER_SIZE = 1024 * 1024 |
+ MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE |
+ |
+ def __init__(self, |
+ api, |
+ path, |
+ buffer_size=DEFAULT_BUFFER_SIZE, |
+ max_request_size=MAX_REQUEST_SIZE): |
+ """Constructor. |
+ |
+ Args: |
+ api: A StorageApi instance. |
+ path: Quoted/escaped path to the object, e.g. /mybucket/myfile |
+ buffer_size: buffer size. The ReadBuffer keeps |
+ one buffer. But there may be a pending future that contains |
+ a second buffer. This size must be less than max_request_size. |
+ max_request_size: Max bytes to request in one urlfetch. |
+ """ |
+ self._api = api |
+ self._path = path |
+ self.name = api_utils._unquote_filename(path) |
+ self.closed = False |
+ |
+ assert buffer_size <= max_request_size |
+ self._buffer_size = buffer_size |
+ self._max_request_size = max_request_size |
+ self._offset = 0 |
+ self._buffer = _Buffer() |
+ self._etag = None |
+ |
+ get_future = self._get_segment(0, self._buffer_size, check_response=False) |
+ |
+ status, headers, content = self._api.head_object(path) |
+ errors.check_status(status, [200], path, resp_headers=headers, body=content) |
+ self._file_size = long(common.get_stored_content_length(headers)) |
+ self._check_etag(headers.get('etag')) |
+ |
+ self._buffer_future = None |
+ |
+ if self._file_size != 0: |
+ content, check_response_closure = get_future.get_result() |
+ check_response_closure() |
+ self._buffer.reset(content) |
+ self._request_next_buffer() |
+ |
+ def __getstate__(self): |
+ """Store state as part of serialization/pickling. |
+ |
+ The contents of the read buffer are not stored, only the current offset for |
+ data read by the client. A new read buffer is established at unpickling. |
+ The head information for the object (file size and etag) are stored to |
+ reduce startup and ensure the file has not changed. |
+ |
+ Returns: |
+ A dictionary with the state of this object |
+ """ |
+ return {'api': self._api, |
+ 'path': self._path, |
+ 'buffer_size': self._buffer_size, |
+ 'request_size': self._max_request_size, |
+ 'etag': self._etag, |
+ 'size': self._file_size, |
+ 'offset': self._offset, |
+ 'closed': self.closed} |
+ |
+ def __setstate__(self, state): |
+ """Restore state as part of deserialization/unpickling. |
+ |
+ Args: |
+ state: the dictionary from a __getstate__ call |
+ |
+ Along with restoring the state, pre-fetch the next read buffer. |
+ """ |
+ self._api = state['api'] |
+ self._path = state['path'] |
+ self.name = api_utils._unquote_filename(self._path) |
+ self._buffer_size = state['buffer_size'] |
+ self._max_request_size = state['request_size'] |
+ self._etag = state['etag'] |
+ self._file_size = state['size'] |
+ self._offset = state['offset'] |
+ self._buffer = _Buffer() |
+ self.closed = state['closed'] |
+ self._buffer_future = None |
+ if self._remaining() and not self.closed: |
+ self._request_next_buffer() |
+ |
+ def __iter__(self): |
+ """Iterator interface. |
+ |
+ Note the ReadBuffer container itself is the iterator. It's |
+ (quote PEP0234) |
+ 'destructive: they consumes all the values and a second iterator |
+ cannot easily be created that iterates independently over the same values. |
+ You could open the file for the second time, or seek() to the beginning.' |
+ |
+ Returns: |
+ Self. |
+ """ |
+ return self |
+ |
+ def next(self): |
+ line = self.readline() |
+ if not line: |
+ raise StopIteration() |
+ return line |
+ |
+ def readline(self, size=-1): |
+ """Read one line delimited by '\n' from the file. |
+ |
+ A trailing newline character is kept in the string. It may be absent when a |
+ file ends with an incomplete line. If the size argument is non-negative, |
+ it specifies the maximum string size (counting the newline) to return. |
+ A negative size is the same as unspecified. Empty string is returned |
+ only when EOF is encountered immediately. |
+ |
+ Args: |
+ size: Maximum number of bytes to read. If not specified, readline stops |
+ only on '\n' or EOF. |
+ |
+ Returns: |
+ The data read as a string. |
+ |
+ Raises: |
+ IOError: When this buffer is closed. |
+ """ |
+ self._check_open() |
+ if size == 0 or not self._remaining(): |
+ return '' |
+ |
+ data_list = [] |
+ newline_offset = self._buffer.find_newline(size) |
+ while newline_offset < 0: |
+ data = self._buffer.read(size) |
+ size -= len(data) |
+ self._offset += len(data) |
+ data_list.append(data) |
+ if size == 0 or not self._remaining(): |
+ return ''.join(data_list) |
+ self._buffer.reset(self._buffer_future.get_result()) |
+ self._request_next_buffer() |
+ newline_offset = self._buffer.find_newline(size) |
+ |
+ data = self._buffer.read_to_offset(newline_offset + 1) |
+ self._offset += len(data) |
+ data_list.append(data) |
+ |
+ return ''.join(data_list) |
+ |
+ def read(self, size=-1): |
+ """Read data from RAW file. |
+ |
+ Args: |
+ size: Number of bytes to read as integer. Actual number of bytes |
+ read is always equal to size unless EOF is reached. If size is |
+ negative or unspecified, read the entire file. |
+ |
+ Returns: |
+ data read as str. |
+ |
+ Raises: |
+ IOError: When this buffer is closed. |
+ """ |
+ self._check_open() |
+ if not self._remaining(): |
+ return '' |
+ |
+ data_list = [] |
+ while True: |
+ remaining = self._buffer.remaining() |
+ if size >= 0 and size < remaining: |
+ data_list.append(self._buffer.read(size)) |
+ self._offset += size |
+ break |
+ else: |
+ size -= remaining |
+ self._offset += remaining |
+ data_list.append(self._buffer.read()) |
+ |
+ if self._buffer_future is None: |
+ if size < 0 or size >= self._remaining(): |
+ needs = self._remaining() |
+ else: |
+ needs = size |
+ data_list.extend(self._get_segments(self._offset, needs)) |
+ self._offset += needs |
+ break |
+ |
+ if self._buffer_future: |
+ self._buffer.reset(self._buffer_future.get_result()) |
+ self._buffer_future = None |
+ |
+ if self._buffer_future is None: |
+ self._request_next_buffer() |
+ return ''.join(data_list) |
+ |
+ def _remaining(self): |
+ return self._file_size - self._offset |
+ |
+ def _request_next_buffer(self): |
+ """Request next buffer. |
+ |
+ Requires self._offset and self._buffer are in consistent state. |
+ """ |
+ self._buffer_future = None |
+ next_offset = self._offset + self._buffer.remaining() |
+ if next_offset != self._file_size: |
+ self._buffer_future = self._get_segment(next_offset, |
+ self._buffer_size) |
+ |
+ def _get_segments(self, start, request_size): |
+ """Get segments of the file from Google Storage as a list. |
+ |
+ A large request is broken into segments to avoid hitting urlfetch |
+ response size limit. Each segment is returned from a separate urlfetch. |
+ |
+ Args: |
+ start: start offset to request. Inclusive. Have to be within the |
+ range of the file. |
+ request_size: number of bytes to request. |
+ |
+ Returns: |
+ A list of file segments in order |
+ """ |
+ if not request_size: |
+ return [] |
+ |
+ end = start + request_size |
+ futures = [] |
+ |
+ while request_size > self._max_request_size: |
+ futures.append(self._get_segment(start, self._max_request_size)) |
+ request_size -= self._max_request_size |
+ start += self._max_request_size |
+ if start < end: |
+ futures.append(self._get_segment(start, end-start)) |
+ return [fut.get_result() for fut in futures] |
+ |
+ @ndb.tasklet |
+ def _get_segment(self, start, request_size, check_response=True): |
+ """Get a segment of the file from Google Storage. |
+ |
+ Args: |
+ start: start offset of the segment. Inclusive. Have to be within the |
+ range of the file. |
+ request_size: number of bytes to request. Have to be small enough |
+ for a single urlfetch request. May go over the logical range of the |
+ file. |
+ check_response: True to check the validity of GCS response automatically |
+ before the future returns. False otherwise. See Yields section. |
+ |
+ Yields: |
+ If check_response is True, the segment [start, start + request_size) |
+ of the file. |
+ Otherwise, a tuple. The first element is the unverified file segment. |
+ The second element is a closure that checks response. Caller should |
+ first invoke the closure before consuing the file segment. |
+ |
+ Raises: |
+ ValueError: if the file has changed while reading. |
+ """ |
+ end = start + request_size - 1 |
+ content_range = '%d-%d' % (start, end) |
+ headers = {'Range': 'bytes=' + content_range} |
+ status, resp_headers, content = yield self._api.get_object_async( |
+ self._path, headers=headers) |
+ def _checker(): |
+ errors.check_status(status, [200, 206], self._path, headers, |
+ resp_headers, body=content) |
+ self._check_etag(resp_headers.get('etag')) |
+ if check_response: |
+ _checker() |
+ raise ndb.Return(content) |
+ raise ndb.Return(content, _checker) |
+ |
+ def _check_etag(self, etag): |
+ """Check if etag is the same across requests to GCS. |
+ |
+ If self._etag is None, set it. If etag is set, check that the new |
+ etag equals the old one. |
+ |
+ In the __init__ method, we fire one HEAD and one GET request using |
+ ndb tasklet. One of them would return first and set the first value. |
+ |
+ Args: |
+ etag: etag from a GCS HTTP response. None if etag is not part of the |
+ response header. It could be None for example in the case of GCS |
+ composite file. |
+ |
+ Raises: |
+ ValueError: if two etags are not equal. |
+ """ |
+ if etag is None: |
+ return |
+ elif self._etag is None: |
+ self._etag = etag |
+ elif self._etag != etag: |
+ raise ValueError('File on GCS has changed while reading.') |
+ |
+ def close(self): |
+ self.closed = True |
+ self._buffer = None |
+ self._buffer_future = None |
+ |
+ def __enter__(self): |
+ return self |
+ |
+ def __exit__(self, atype, value, traceback): |
+ self.close() |
+ return False |
+ |
+ def seek(self, offset, whence=os.SEEK_SET): |
+ """Set the file's current offset. |
+ |
+ Note if the new offset is out of bound, it is adjusted to either 0 or EOF. |
+ |
+ Args: |
+ offset: seek offset as number. |
+ whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), |
+ os.SEEK_CUR (seek relative to the current position), and os.SEEK_END |
+ (seek relative to the end, offset should be negative). |
+ |
+ Raises: |
+ IOError: When this buffer is closed. |
+ ValueError: When whence is invalid. |
+ """ |
+ self._check_open() |
+ |
+ self._buffer.reset() |
+ self._buffer_future = None |
+ |
+ if whence == os.SEEK_SET: |
+ self._offset = offset |
+ elif whence == os.SEEK_CUR: |
+ self._offset += offset |
+ elif whence == os.SEEK_END: |
+ self._offset = self._file_size + offset |
+ else: |
+ raise ValueError('Whence mode %s is invalid.' % str(whence)) |
+ |
+ self._offset = min(self._offset, self._file_size) |
+ self._offset = max(self._offset, 0) |
+ if self._remaining(): |
+ self._request_next_buffer() |
+ |
+ def tell(self): |
+ """Tell the file's current offset. |
+ |
+ Returns: |
+ current offset in reading this file. |
+ |
+ Raises: |
+ IOError: When this buffer is closed. |
+ """ |
+ self._check_open() |
+ return self._offset |
+ |
+ def _check_open(self): |
+ if self.closed: |
+ raise IOError('Buffer is closed.') |
+ |
+ def seekable(self): |
+ return True |
+ |
+ def readable(self): |
+ return True |
+ |
+ def writable(self): |
+ return False |
+ |
+ |
+class _Buffer(object): |
+ """In memory buffer.""" |
+ |
+ def __init__(self): |
+ self.reset() |
+ |
+ def reset(self, content='', offset=0): |
+ self._buffer = content |
+ self._offset = offset |
+ |
+ def read(self, size=-1): |
+ """Returns bytes from self._buffer and update related offsets. |
+ |
+ Args: |
+ size: number of bytes to read starting from current offset. |
+ Read the entire buffer if negative. |
+ |
+ Returns: |
+ Requested bytes from buffer. |
+ """ |
+ if size < 0: |
+ offset = len(self._buffer) |
+ else: |
+ offset = self._offset + size |
+ return self.read_to_offset(offset) |
+ |
+ def read_to_offset(self, offset): |
+ """Returns bytes from self._buffer and update related offsets. |
+ |
+ Args: |
+ offset: read from current offset to this offset, exclusive. |
+ |
+ Returns: |
+ Requested bytes from buffer. |
+ """ |
+ assert offset >= self._offset |
+ result = self._buffer[self._offset: offset] |
+ self._offset += len(result) |
+ return result |
+ |
+ def remaining(self): |
+ return len(self._buffer) - self._offset |
+ |
+ def find_newline(self, size=-1): |
+ """Search for newline char in buffer starting from current offset. |
+ |
+ Args: |
+ size: number of bytes to search. -1 means all. |
+ |
+ Returns: |
+ offset of newline char in buffer. -1 if doesn't exist. |
+ """ |
+ if size < 0: |
+ return self._buffer.find('\n', self._offset) |
+ return self._buffer.find('\n', self._offset, self._offset + size) |
+ |
+ |
+class StreamingBuffer(object): |
+ """A class for creating large objects using the 'resumable' API. |
+ |
+ The API is a subset of the Python writable stream API sufficient to |
+ support writing zip files using the zipfile module. |
+ |
+ The exact sequence of calls and use of headers is documented at |
+ https://developers.google.com/storage/docs/developer-guide#unknownresumables |
+ """ |
+ |
+ _blocksize = 256 * 1024 |
+ |
+ _flushsize = 8 * _blocksize |
+ |
+ _maxrequestsize = 9 * 4 * _blocksize |
+ |
+ def __init__(self, |
+ api, |
+ path, |
+ content_type=None, |
+ gcs_headers=None): |
+ """Constructor. |
+ |
+ Args: |
+ api: A StorageApi instance. |
+ path: Quoted/escaped path to the object, e.g. /mybucket/myfile |
+ content_type: Optional content-type; Default value is |
+ delegate to Google Cloud Storage. |
+ gcs_headers: additional gs headers as a str->str dict, e.g |
+ {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. |
+ Raises: |
+ IOError: When this location can not be found. |
+ """ |
+ assert self._maxrequestsize > self._blocksize |
+ assert self._maxrequestsize % self._blocksize == 0 |
+ assert self._maxrequestsize >= self._flushsize |
+ |
+ self._api = api |
+ self._path = path |
+ |
+ self.name = api_utils._unquote_filename(path) |
+ self.closed = False |
+ |
+ self._buffer = collections.deque() |
+ self._buffered = 0 |
+ self._written = 0 |
+ self._offset = 0 |
+ |
+ headers = {'x-goog-resumable': 'start'} |
+ if content_type: |
+ headers['content-type'] = content_type |
+ if gcs_headers: |
+ headers.update(gcs_headers) |
+ status, resp_headers, content = self._api.post_object(path, headers=headers) |
+ errors.check_status(status, [201], path, headers, resp_headers, |
+ body=content) |
+ loc = resp_headers.get('location') |
+ if not loc: |
+ raise IOError('No location header found in 201 response') |
+ parsed = urlparse.urlparse(loc) |
+ self._path_with_token = '%s?%s' % (self._path, parsed.query) |
+ |
+ def __getstate__(self): |
+ """Store state as part of serialization/pickling. |
+ |
+ The contents of the write buffer are stored. Writes to the underlying |
+ storage are required to be on block boundaries (_blocksize) except for the |
+ last write. In the worst case the pickled version of this object may be |
+ slightly larger than the blocksize. |
+ |
+ Returns: |
+ A dictionary with the state of this object |
+ |
+ """ |
+ return {'api': self._api, |
+ 'path': self._path, |
+ 'path_token': self._path_with_token, |
+ 'buffer': self._buffer, |
+ 'buffered': self._buffered, |
+ 'written': self._written, |
+ 'offset': self._offset, |
+ 'closed': self.closed} |
+ |
+ def __setstate__(self, state): |
+ """Restore state as part of deserialization/unpickling. |
+ |
+ Args: |
+ state: the dictionary from a __getstate__ call |
+ """ |
+ self._api = state['api'] |
+ self._path_with_token = state['path_token'] |
+ self._buffer = state['buffer'] |
+ self._buffered = state['buffered'] |
+ self._written = state['written'] |
+ self._offset = state['offset'] |
+ self.closed = state['closed'] |
+ self._path = state['path'] |
+ self.name = api_utils._unquote_filename(self._path) |
+ |
+ def write(self, data): |
+ """Write some bytes. |
+ |
+ Args: |
+ data: data to write. str. |
+ |
+ Raises: |
+ TypeError: if data is not of type str. |
+ """ |
+ self._check_open() |
+ if not isinstance(data, str): |
+ raise TypeError('Expected str but got %s.' % type(data)) |
+ if not data: |
+ return |
+ self._buffer.append(data) |
+ self._buffered += len(data) |
+ self._offset += len(data) |
+ if self._buffered >= self._flushsize: |
+ self._flush() |
+ |
+ def flush(self): |
+ """Flush as much as possible to GCS. |
+ |
+ GCS *requires* that all writes except for the final one align on |
+ 256KB boundaries. So the internal buffer may still have < 256KB bytes left |
+ after flush. |
+ """ |
+ self._check_open() |
+ self._flush(finish=False) |
+ |
+ def tell(self): |
+ """Return the total number of bytes passed to write() so far. |
+ |
+ (There is no seek() method.) |
+ """ |
+ return self._offset |
+ |
+ def close(self): |
+ """Flush the buffer and finalize the file. |
+ |
+ When this returns the new file is available for reading. |
+ """ |
+ if not self.closed: |
+ self.closed = True |
+ self._flush(finish=True) |
+ self._buffer = None |
+ |
+ def __enter__(self): |
+ return self |
+ |
+ def __exit__(self, atype, value, traceback): |
+ self.close() |
+ return False |
+ |
+ def _flush(self, finish=False): |
+ """Internal API to flush. |
+ |
+ Buffer is flushed to GCS only when the total amount of buffered data is at |
+ least self._blocksize, or to flush the final (incomplete) block of |
+ the file with finish=True. |
+ """ |
+ while ((finish and self._buffered >= 0) or |
+ (not finish and self._buffered >= self._blocksize)): |
+ tmp_buffer = [] |
+ tmp_buffer_len = 0 |
+ |
+ excess = 0 |
+ while self._buffer: |
+ buf = self._buffer.popleft() |
+ size = len(buf) |
+ self._buffered -= size |
+ tmp_buffer.append(buf) |
+ tmp_buffer_len += size |
+ if tmp_buffer_len >= self._maxrequestsize: |
+ excess = tmp_buffer_len - self._maxrequestsize |
+ break |
+ if not finish and ( |
+ tmp_buffer_len % self._blocksize + self._buffered < |
+ self._blocksize): |
+ excess = tmp_buffer_len % self._blocksize |
+ break |
+ |
+ if excess: |
+ over = tmp_buffer.pop() |
+ size = len(over) |
+ assert size >= excess |
+ tmp_buffer_len -= size |
+ head, tail = over[:-excess], over[-excess:] |
+ self._buffer.appendleft(tail) |
+ self._buffered += len(tail) |
+ if head: |
+ tmp_buffer.append(head) |
+ tmp_buffer_len += len(head) |
+ |
+ data = ''.join(tmp_buffer) |
+ file_len = '*' |
+ if finish and not self._buffered: |
+ file_len = self._written + len(data) |
+ self._send_data(data, self._written, file_len) |
+ self._written += len(data) |
+ if file_len != '*': |
+ break |
+ |
+ def _send_data(self, data, start_offset, file_len): |
+ """Send the block to the storage service. |
+ |
+ This is a utility method that does not modify self. |
+ |
+ Args: |
+ data: data to send in str. |
+ start_offset: start offset of the data in relation to the file. |
+ file_len: an int if this is the last data to append to the file. |
+ Otherwise '*'. |
+ """ |
+ headers = {} |
+ end_offset = start_offset + len(data) - 1 |
+ |
+ if data: |
+ headers['content-range'] = ('bytes %d-%d/%s' % |
+ (start_offset, end_offset, file_len)) |
+ else: |
+ headers['content-range'] = ('bytes */%s' % file_len) |
+ |
+ status, response_headers, content = self._api.put_object( |
+ self._path_with_token, payload=data, headers=headers) |
+ if file_len == '*': |
+ expected = 308 |
+ else: |
+ expected = 200 |
+ errors.check_status(status, [expected], self._path, headers, |
+ response_headers, content, |
+ {'upload_path': self._path_with_token}) |
+ |
+ def _get_offset_from_gcs(self): |
+ """Get the last offset that has been written to GCS. |
+ |
+ This is a utility method that does not modify self. |
+ |
+ Returns: |
+ an int of the last offset written to GCS by this upload, inclusive. |
+ -1 means nothing has been written. |
+ """ |
+ headers = {'content-range': 'bytes */*'} |
+ status, response_headers, content = self._api.put_object( |
+ self._path_with_token, headers=headers) |
+ errors.check_status(status, [308], self._path, headers, |
+ response_headers, content, |
+ {'upload_path': self._path_with_token}) |
+ val = response_headers.get('range') |
+ if val is None: |
+ return -1 |
+ _, offset = val.rsplit('-', 1) |
+ return int(offset) |
+ |
+ def _force_close(self, file_length=None): |
+ """Close this buffer on file_length. |
+ |
+ Finalize this upload immediately on file_length. |
+ Contents that are still in memory will not be uploaded. |
+ |
+ This is a utility method that does not modify self. |
+ |
+ Args: |
+ file_length: file length. Must match what has been uploaded. If None, |
+ it will be queried from GCS. |
+ """ |
+ if file_length is None: |
+ file_length = self._get_offset_from_gcs() + 1 |
+ self._send_data('', 0, file_length) |
+ |
+ def _check_open(self): |
+ if self.closed: |
+ raise IOError('Buffer is closed.') |
+ |
+ def seekable(self): |
+ return False |
+ |
+ def readable(self): |
+ return False |
+ |
+ def writable(self): |
+ return True |