Index: third_party/cloud_storage/cloudstorage/cloudstorage_api.py |
diff --git a/third_party/cloud_storage/cloudstorage/cloudstorage_api.py b/third_party/cloud_storage/cloudstorage/cloudstorage_api.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ba8be862fb5210e65d1b0525069679f58d5bfb12 |
--- /dev/null |
+++ b/third_party/cloud_storage/cloudstorage/cloudstorage_api.py |
@@ -0,0 +1,451 @@ |
+# Copyright 2012 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, |
+# software distributed under the License is distributed on an |
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
+# either express or implied. See the License for the specific |
+# language governing permissions and limitations under the License. |
+ |
+"""File Interface for Google Cloud Storage.""" |
+ |
+ |
+ |
+from __future__ import with_statement |
+ |
+ |
+ |
+__all__ = ['delete', |
+ 'listbucket', |
+ 'open', |
+ 'stat', |
+ ] |
+ |
+import logging |
+import StringIO |
+import urllib |
+import xml.etree.cElementTree as ET |
+from . import api_utils |
+from . import common |
+from . import errors |
+from . import storage_api |
+ |
+ |
+ |
+def open(filename, |
+ mode='r', |
+ content_type=None, |
+ options=None, |
+ read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, |
+ retry_params=None, |
+ _account_id=None): |
+ """Opens a Google Cloud Storage file and returns it as a File-like object. |
+ |
+ Args: |
+ filename: A Google Cloud Storage filename of form '/bucket/filename'. |
+ mode: 'r' for reading mode. 'w' for writing mode. |
+ In reading mode, the file must exist. In writing mode, a file will |
+ be created or be overrode. |
+ content_type: The MIME type of the file. str. Only valid in writing mode. |
+ options: A str->basestring dict to specify additional headers to pass to |
+ GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. |
+ Supported options are x-goog-acl, x-goog-meta-, cache-control, |
+ content-disposition, and content-encoding. |
+ Only valid in writing mode. |
+ See https://developers.google.com/storage/docs/reference-headers |
+ for details. |
+ read_buffer_size: The buffer size for read. Read keeps a buffer |
+ and prefetches another one. To minimize blocking for large files, |
+ always read by buffer size. To minimize number of RPC requests for |
+ small files, set a large buffer size. Max is 30MB. |
+ retry_params: An instance of api_utils.RetryParams for subsequent calls |
+ to GCS from this file handle. If None, the default one is used. |
+ _account_id: Internal-use only. |
+ |
+ Returns: |
+ A reading or writing buffer that supports File-like interface. Buffer |
+ must be closed after operations are done. |
+ |
+ Raises: |
+ errors.AuthorizationError: if authorization failed. |
+ errors.NotFoundError: if an object that's expected to exist doesn't. |
+ ValueError: invalid open mode or if content_type or options are specified |
+ in reading mode. |
+ """ |
+ common.validate_file_path(filename) |
+ api = storage_api._get_storage_api(retry_params=retry_params, |
+ account_id=_account_id) |
+ filename = api_utils._quote_filename(filename) |
+ |
+ if mode == 'w': |
+ common.validate_options(options) |
+ return storage_api.StreamingBuffer(api, filename, content_type, options) |
+ elif mode == 'r': |
+ if content_type or options: |
+ raise ValueError('Options and content_type can only be specified ' |
+ 'for writing mode.') |
+ return storage_api.ReadBuffer(api, |
+ filename, |
+ buffer_size=read_buffer_size) |
+ else: |
+ raise ValueError('Invalid mode %s.' % mode) |
+ |
+ |
+def delete(filename, retry_params=None, _account_id=None): |
+ """Delete a Google Cloud Storage file. |
+ |
+ Args: |
+ filename: A Google Cloud Storage filename of form '/bucket/filename'. |
+ retry_params: An api_utils.RetryParams for this call to GCS. If None, |
+ the default one is used. |
+ _account_id: Internal-use only. |
+ |
+ Raises: |
+ errors.NotFoundError: if the file doesn't exist prior to deletion. |
+ """ |
+ api = storage_api._get_storage_api(retry_params=retry_params, |
+ account_id=_account_id) |
+ common.validate_file_path(filename) |
+ filename = api_utils._quote_filename(filename) |
+ status, resp_headers, content = api.delete_object(filename) |
+ errors.check_status(status, [204], filename, resp_headers=resp_headers, |
+ body=content) |
+ |
+ |
+def stat(filename, retry_params=None, _account_id=None): |
+ """Get GCSFileStat of a Google Cloud storage file. |
+ |
+ Args: |
+ filename: A Google Cloud Storage filename of form '/bucket/filename'. |
+ retry_params: An api_utils.RetryParams for this call to GCS. If None, |
+ the default one is used. |
+ _account_id: Internal-use only. |
+ |
+ Returns: |
+ a GCSFileStat object containing info about this file. |
+ |
+ Raises: |
+ errors.AuthorizationError: if authorization failed. |
+ errors.NotFoundError: if an object that's expected to exist doesn't. |
+ """ |
+ common.validate_file_path(filename) |
+ api = storage_api._get_storage_api(retry_params=retry_params, |
+ account_id=_account_id) |
+ status, headers, content = api.head_object( |
+ api_utils._quote_filename(filename)) |
+ errors.check_status(status, [200], filename, resp_headers=headers, |
+ body=content) |
+ file_stat = common.GCSFileStat( |
+ filename=filename, |
+ st_size=common.get_stored_content_length(headers), |
+ st_ctime=common.http_time_to_posix(headers.get('last-modified')), |
+ etag=headers.get('etag'), |
+ content_type=headers.get('content-type'), |
+ metadata=common.get_metadata(headers)) |
+ |
+ return file_stat |
+ |
+ |
+def _copy2(src, dst, metadata=None, retry_params=None): |
+ """Copy the file content from src to dst. |
+ |
+ Internal use only! |
+ |
+ Args: |
+ src: /bucket/filename |
+ dst: /bucket/filename |
+ metadata: a dict of metadata for this copy. If None, old metadata is copied. |
+ For example, {'x-goog-meta-foo': 'bar'}. |
+ retry_params: An api_utils.RetryParams for this call to GCS. If None, |
+ the default one is used. |
+ |
+ Raises: |
+ errors.AuthorizationError: if authorization failed. |
+ errors.NotFoundError: if an object that's expected to exist doesn't. |
+ """ |
+ common.validate_file_path(src) |
+ common.validate_file_path(dst) |
+ |
+ if metadata is None: |
+ metadata = {} |
+ copy_meta = 'COPY' |
+ else: |
+ copy_meta = 'REPLACE' |
+ metadata.update({'x-goog-copy-source': src, |
+ 'x-goog-metadata-directive': copy_meta}) |
+ |
+ api = storage_api._get_storage_api(retry_params=retry_params) |
+ status, resp_headers, content = api.put_object( |
+ api_utils._quote_filename(dst), headers=metadata) |
+ errors.check_status(status, [200], src, metadata, resp_headers, body=content) |
+ |
+ |
+def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, |
+ delimiter=None, retry_params=None, _account_id=None): |
+ """Returns a GCSFileStat iterator over a bucket. |
+ |
+ Optional arguments can limit the result to a subset of files under bucket. |
+ |
+ This function has two modes: |
+ 1. List bucket mode: Lists all files in the bucket without any concept of |
+ hierarchy. GCS doesn't have real directory hierarchies. |
+ 2. Directory emulation mode: If you specify the 'delimiter' argument, |
+ it is used as a path separator to emulate a hierarchy of directories. |
+ In this mode, the "path_prefix" argument should end in the delimiter |
+ specified (thus designates a logical directory). The logical directory's |
+ contents, both files and subdirectories, are listed. The names of |
+ subdirectories returned will end with the delimiter. So listbucket |
+ can be called with the subdirectory name to list the subdirectory's |
+ contents. |
+ |
+ Args: |
+ path_prefix: A Google Cloud Storage path of format "/bucket" or |
+ "/bucket/prefix". Only objects whose fullpath starts with the |
+ path_prefix will be returned. |
+ marker: Another path prefix. Only objects whose fullpath starts |
+ lexicographically after marker will be returned (exclusive). |
+ prefix: Deprecated. Use path_prefix. |
+ max_keys: The limit on the number of objects to return. int. |
+ For best performance, specify max_keys only if you know how many objects |
+ you want. Otherwise, this method requests large batches and handles |
+ pagination for you. |
+ delimiter: Use to turn on directory mode. str of one or multiple chars |
+ that your bucket uses as its directory separator. |
+ retry_params: An api_utils.RetryParams for this call to GCS. If None, |
+ the default one is used. |
+ _account_id: Internal-use only. |
+ |
+ Examples: |
+ For files "/bucket/a", |
+ "/bucket/bar/1" |
+ "/bucket/foo", |
+ "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", |
+ |
+ Regular mode: |
+ listbucket("/bucket/f", marker="/bucket/foo/1") |
+ will match "/bucket/foo/2/1", "/bucket/foo/3/1". |
+ |
+ Directory mode: |
+ listbucket("/bucket/", delimiter="/") |
+ will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". |
+ listbucket("/bucket/foo/", delimiter="/") |
+ will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" |
+ |
+ Returns: |
+ Regular mode: |
+ A GCSFileStat iterator over matched files ordered by filename. |
+ The iterator returns GCSFileStat objects. filename, etag, st_size, |
+ st_ctime, and is_dir are set. |
+ |
+ Directory emulation mode: |
+ A GCSFileStat iterator over matched files and directories ordered by |
+ name. The iterator returns GCSFileStat objects. For directories, |
+ only the filename and is_dir fields are set. |
+ |
+ The last name yielded can be used as next call's marker. |
+ """ |
+ if prefix: |
+ common.validate_bucket_path(path_prefix) |
+ bucket = path_prefix |
+ else: |
+ bucket, prefix = common._process_path_prefix(path_prefix) |
+ |
+ if marker and marker.startswith(bucket): |
+ marker = marker[len(bucket) + 1:] |
+ |
+ api = storage_api._get_storage_api(retry_params=retry_params, |
+ account_id=_account_id) |
+ options = {} |
+ if marker: |
+ options['marker'] = marker |
+ if max_keys: |
+ options['max-keys'] = max_keys |
+ if prefix: |
+ options['prefix'] = prefix |
+ if delimiter: |
+ options['delimiter'] = delimiter |
+ |
+ return _Bucket(api, bucket, options) |
+ |
+ |
+class _Bucket(object): |
+ """A wrapper for a GCS bucket as the return value of listbucket.""" |
+ |
+ def __init__(self, api, path, options): |
+ """Initialize. |
+ |
+ Args: |
+ api: storage_api instance. |
+ path: bucket path of form '/bucket'. |
+ options: a dict of listbucket options. Please see listbucket doc. |
+ """ |
+ self._init(api, path, options) |
+ |
+ def _init(self, api, path, options): |
+ self._api = api |
+ self._path = path |
+ self._options = options.copy() |
+ self._get_bucket_fut = self._api.get_bucket_async( |
+ self._path + '?' + urllib.urlencode(self._options)) |
+ self._last_yield = None |
+ self._new_max_keys = self._options.get('max-keys') |
+ |
+ def __getstate__(self): |
+ options = self._options |
+ if self._last_yield: |
+ options['marker'] = self._last_yield.filename[len(self._path) + 1:] |
+ if self._new_max_keys is not None: |
+ options['max-keys'] = self._new_max_keys |
+ return {'api': self._api, |
+ 'path': self._path, |
+ 'options': options} |
+ |
+ def __setstate__(self, state): |
+ self._init(state['api'], state['path'], state['options']) |
+ |
+ def __iter__(self): |
+ """Iter over the bucket. |
+ |
+ Yields: |
+ GCSFileStat: a GCSFileStat for an object in the bucket. |
+ They are ordered by GCSFileStat.filename. |
+ """ |
+ total = 0 |
+ max_keys = self._options.get('max-keys') |
+ |
+ while self._get_bucket_fut: |
+ status, resp_headers, content = self._get_bucket_fut.get_result() |
+ errors.check_status(status, [200], self._path, resp_headers=resp_headers, |
+ body=content, extras=self._options) |
+ |
+ if self._should_get_another_batch(content): |
+ self._get_bucket_fut = self._api.get_bucket_async( |
+ self._path + '?' + urllib.urlencode(self._options)) |
+ else: |
+ self._get_bucket_fut = None |
+ |
+ root = ET.fromstring(content) |
+ dirs = self._next_dir_gen(root) |
+ files = self._next_file_gen(root) |
+ next_file = files.next() |
+ next_dir = dirs.next() |
+ |
+ while ((max_keys is None or total < max_keys) and |
+ not (next_file is None and next_dir is None)): |
+ total += 1 |
+ if next_file is None: |
+ self._last_yield = next_dir |
+ next_dir = dirs.next() |
+ elif next_dir is None: |
+ self._last_yield = next_file |
+ next_file = files.next() |
+ elif next_dir < next_file: |
+ self._last_yield = next_dir |
+ next_dir = dirs.next() |
+ elif next_file < next_dir: |
+ self._last_yield = next_file |
+ next_file = files.next() |
+ else: |
+ logging.error( |
+ 'Should never reach. next file is %r. next dir is %r.', |
+ next_file, next_dir) |
+ if self._new_max_keys: |
+ self._new_max_keys -= 1 |
+ yield self._last_yield |
+ |
+ def _next_file_gen(self, root): |
+ """Generator for next file element in the document. |
+ |
+ Args: |
+ root: root element of the XML tree. |
+ |
+ Yields: |
+ GCSFileStat for the next file. |
+ """ |
+ for e in root.getiterator(common._T_CONTENTS): |
+ st_ctime, size, etag, key = None, None, None, None |
+ for child in e.getiterator('*'): |
+ if child.tag == common._T_LAST_MODIFIED: |
+ st_ctime = common.dt_str_to_posix(child.text) |
+ elif child.tag == common._T_ETAG: |
+ etag = child.text |
+ elif child.tag == common._T_SIZE: |
+ size = child.text |
+ elif child.tag == common._T_KEY: |
+ key = child.text |
+ yield common.GCSFileStat(self._path + '/' + key, |
+ size, etag, st_ctime) |
+ e.clear() |
+ yield None |
+ |
+ def _next_dir_gen(self, root): |
+ """Generator for next directory element in the document. |
+ |
+ Args: |
+ root: root element in the XML tree. |
+ |
+ Yields: |
+ GCSFileStat for the next directory. |
+ """ |
+ for e in root.getiterator(common._T_COMMON_PREFIXES): |
+ yield common.GCSFileStat( |
+ self._path + '/' + e.find(common._T_PREFIX).text, |
+ st_size=None, etag=None, st_ctime=None, is_dir=True) |
+ e.clear() |
+ yield None |
+ |
+ def _should_get_another_batch(self, content): |
+ """Whether to issue another GET bucket call. |
+ |
+ Args: |
+ content: response XML. |
+ |
+ Returns: |
+ True if should, also update self._options for the next request. |
+ False otherwise. |
+ """ |
+ if ('max-keys' in self._options and |
+ self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): |
+ return False |
+ |
+ elements = self._find_elements( |
+ content, set([common._T_IS_TRUNCATED, |
+ common._T_NEXT_MARKER])) |
+ if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': |
+ return False |
+ |
+ next_marker = elements.get(common._T_NEXT_MARKER) |
+ if next_marker is None: |
+ self._options.pop('marker', None) |
+ return False |
+ self._options['marker'] = next_marker |
+ return True |
+ |
+ def _find_elements(self, result, elements): |
+ """Find interesting elements from XML. |
+ |
+ This function tries to only look for specified elements |
+ without parsing the entire XML. The specified elements is better |
+ located near the beginning. |
+ |
+ Args: |
+ result: response XML. |
+ elements: a set of interesting element tags. |
+ |
+ Returns: |
+ A dict from element tag to element value. |
+ """ |
+ element_mapping = {} |
+ result = StringIO.StringIO(result) |
+ for _, e in ET.iterparse(result, events=('end',)): |
+ if not elements: |
+ break |
+ if e.tag in elements: |
+ element_mapping[e.tag] = e.text |
+ elements.remove(e.tag) |
+ return element_mapping |