OLD | NEW |
(Empty) | |
| 1 # Copyright 2012 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, |
| 10 # software distributed under the License is distributed on an |
| 11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
| 12 # either express or implied. See the License for the specific |
| 13 # language governing permissions and limitations under the License. |
| 14 |
| 15 """File Interface for Google Cloud Storage.""" |
| 16 |
| 17 |
| 18 |
| 19 from __future__ import with_statement |
| 20 |
| 21 |
| 22 |
| 23 __all__ = ['delete', |
| 24 'listbucket', |
| 25 'open', |
| 26 'stat', |
| 27 ] |
| 28 |
| 29 import logging |
| 30 import StringIO |
| 31 import urllib |
| 32 import xml.etree.cElementTree as ET |
| 33 from . import api_utils |
| 34 from . import common |
| 35 from . import errors |
| 36 from . import storage_api |
| 37 |
| 38 |
| 39 |
| 40 def open(filename, |
| 41 mode='r', |
| 42 content_type=None, |
| 43 options=None, |
| 44 read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, |
| 45 retry_params=None, |
| 46 _account_id=None): |
| 47 """Opens a Google Cloud Storage file and returns it as a File-like object. |
| 48 |
| 49 Args: |
| 50 filename: A Google Cloud Storage filename of form '/bucket/filename'. |
| 51 mode: 'r' for reading mode. 'w' for writing mode. |
| 52 In reading mode, the file must exist. In writing mode, a file will |
| 53 be created or be overrode. |
| 54 content_type: The MIME type of the file. str. Only valid in writing mode. |
| 55 options: A str->basestring dict to specify additional headers to pass to |
| 56 GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. |
| 57 Supported options are x-goog-acl, x-goog-meta-, cache-control, |
| 58 content-disposition, and content-encoding. |
| 59 Only valid in writing mode. |
| 60 See https://developers.google.com/storage/docs/reference-headers |
| 61 for details. |
| 62 read_buffer_size: The buffer size for read. Read keeps a buffer |
| 63 and prefetches another one. To minimize blocking for large files, |
| 64 always read by buffer size. To minimize number of RPC requests for |
| 65 small files, set a large buffer size. Max is 30MB. |
| 66 retry_params: An instance of api_utils.RetryParams for subsequent calls |
| 67 to GCS from this file handle. If None, the default one is used. |
| 68 _account_id: Internal-use only. |
| 69 |
| 70 Returns: |
| 71 A reading or writing buffer that supports File-like interface. Buffer |
| 72 must be closed after operations are done. |
| 73 |
| 74 Raises: |
| 75 errors.AuthorizationError: if authorization failed. |
| 76 errors.NotFoundError: if an object that's expected to exist doesn't. |
| 77 ValueError: invalid open mode or if content_type or options are specified |
| 78 in reading mode. |
| 79 """ |
| 80 common.validate_file_path(filename) |
| 81 api = storage_api._get_storage_api(retry_params=retry_params, |
| 82 account_id=_account_id) |
| 83 filename = api_utils._quote_filename(filename) |
| 84 |
| 85 if mode == 'w': |
| 86 common.validate_options(options) |
| 87 return storage_api.StreamingBuffer(api, filename, content_type, options) |
| 88 elif mode == 'r': |
| 89 if content_type or options: |
| 90 raise ValueError('Options and content_type can only be specified ' |
| 91 'for writing mode.') |
| 92 return storage_api.ReadBuffer(api, |
| 93 filename, |
| 94 buffer_size=read_buffer_size) |
| 95 else: |
| 96 raise ValueError('Invalid mode %s.' % mode) |
| 97 |
| 98 |
| 99 def delete(filename, retry_params=None, _account_id=None): |
| 100 """Delete a Google Cloud Storage file. |
| 101 |
| 102 Args: |
| 103 filename: A Google Cloud Storage filename of form '/bucket/filename'. |
| 104 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 105 the default one is used. |
| 106 _account_id: Internal-use only. |
| 107 |
| 108 Raises: |
| 109 errors.NotFoundError: if the file doesn't exist prior to deletion. |
| 110 """ |
| 111 api = storage_api._get_storage_api(retry_params=retry_params, |
| 112 account_id=_account_id) |
| 113 common.validate_file_path(filename) |
| 114 filename = api_utils._quote_filename(filename) |
| 115 status, resp_headers, content = api.delete_object(filename) |
| 116 errors.check_status(status, [204], filename, resp_headers=resp_headers, |
| 117 body=content) |
| 118 |
| 119 |
| 120 def stat(filename, retry_params=None, _account_id=None): |
| 121 """Get GCSFileStat of a Google Cloud storage file. |
| 122 |
| 123 Args: |
| 124 filename: A Google Cloud Storage filename of form '/bucket/filename'. |
| 125 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 126 the default one is used. |
| 127 _account_id: Internal-use only. |
| 128 |
| 129 Returns: |
| 130 a GCSFileStat object containing info about this file. |
| 131 |
| 132 Raises: |
| 133 errors.AuthorizationError: if authorization failed. |
| 134 errors.NotFoundError: if an object that's expected to exist doesn't. |
| 135 """ |
| 136 common.validate_file_path(filename) |
| 137 api = storage_api._get_storage_api(retry_params=retry_params, |
| 138 account_id=_account_id) |
| 139 status, headers, content = api.head_object( |
| 140 api_utils._quote_filename(filename)) |
| 141 errors.check_status(status, [200], filename, resp_headers=headers, |
| 142 body=content) |
| 143 file_stat = common.GCSFileStat( |
| 144 filename=filename, |
| 145 st_size=common.get_stored_content_length(headers), |
| 146 st_ctime=common.http_time_to_posix(headers.get('last-modified')), |
| 147 etag=headers.get('etag'), |
| 148 content_type=headers.get('content-type'), |
| 149 metadata=common.get_metadata(headers)) |
| 150 |
| 151 return file_stat |
| 152 |
| 153 |
| 154 def _copy2(src, dst, metadata=None, retry_params=None): |
| 155 """Copy the file content from src to dst. |
| 156 |
| 157 Internal use only! |
| 158 |
| 159 Args: |
| 160 src: /bucket/filename |
| 161 dst: /bucket/filename |
| 162 metadata: a dict of metadata for this copy. If None, old metadata is copied. |
| 163 For example, {'x-goog-meta-foo': 'bar'}. |
| 164 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 165 the default one is used. |
| 166 |
| 167 Raises: |
| 168 errors.AuthorizationError: if authorization failed. |
| 169 errors.NotFoundError: if an object that's expected to exist doesn't. |
| 170 """ |
| 171 common.validate_file_path(src) |
| 172 common.validate_file_path(dst) |
| 173 |
| 174 if metadata is None: |
| 175 metadata = {} |
| 176 copy_meta = 'COPY' |
| 177 else: |
| 178 copy_meta = 'REPLACE' |
| 179 metadata.update({'x-goog-copy-source': src, |
| 180 'x-goog-metadata-directive': copy_meta}) |
| 181 |
| 182 api = storage_api._get_storage_api(retry_params=retry_params) |
| 183 status, resp_headers, content = api.put_object( |
| 184 api_utils._quote_filename(dst), headers=metadata) |
| 185 errors.check_status(status, [200], src, metadata, resp_headers, body=content) |
| 186 |
| 187 |
| 188 def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, |
| 189 delimiter=None, retry_params=None, _account_id=None): |
| 190 """Returns a GCSFileStat iterator over a bucket. |
| 191 |
| 192 Optional arguments can limit the result to a subset of files under bucket. |
| 193 |
| 194 This function has two modes: |
| 195 1. List bucket mode: Lists all files in the bucket without any concept of |
| 196 hierarchy. GCS doesn't have real directory hierarchies. |
| 197 2. Directory emulation mode: If you specify the 'delimiter' argument, |
| 198 it is used as a path separator to emulate a hierarchy of directories. |
| 199 In this mode, the "path_prefix" argument should end in the delimiter |
| 200 specified (thus designates a logical directory). The logical directory's |
| 201 contents, both files and subdirectories, are listed. The names of |
| 202 subdirectories returned will end with the delimiter. So listbucket |
| 203 can be called with the subdirectory name to list the subdirectory's |
| 204 contents. |
| 205 |
| 206 Args: |
| 207 path_prefix: A Google Cloud Storage path of format "/bucket" or |
| 208 "/bucket/prefix". Only objects whose fullpath starts with the |
| 209 path_prefix will be returned. |
| 210 marker: Another path prefix. Only objects whose fullpath starts |
| 211 lexicographically after marker will be returned (exclusive). |
| 212 prefix: Deprecated. Use path_prefix. |
| 213 max_keys: The limit on the number of objects to return. int. |
| 214 For best performance, specify max_keys only if you know how many objects |
| 215 you want. Otherwise, this method requests large batches and handles |
| 216 pagination for you. |
| 217 delimiter: Use to turn on directory mode. str of one or multiple chars |
| 218 that your bucket uses as its directory separator. |
| 219 retry_params: An api_utils.RetryParams for this call to GCS. If None, |
| 220 the default one is used. |
| 221 _account_id: Internal-use only. |
| 222 |
| 223 Examples: |
| 224 For files "/bucket/a", |
| 225 "/bucket/bar/1" |
| 226 "/bucket/foo", |
| 227 "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", |
| 228 |
| 229 Regular mode: |
| 230 listbucket("/bucket/f", marker="/bucket/foo/1") |
| 231 will match "/bucket/foo/2/1", "/bucket/foo/3/1". |
| 232 |
| 233 Directory mode: |
| 234 listbucket("/bucket/", delimiter="/") |
| 235 will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". |
| 236 listbucket("/bucket/foo/", delimiter="/") |
| 237 will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" |
| 238 |
| 239 Returns: |
| 240 Regular mode: |
| 241 A GCSFileStat iterator over matched files ordered by filename. |
| 242 The iterator returns GCSFileStat objects. filename, etag, st_size, |
| 243 st_ctime, and is_dir are set. |
| 244 |
| 245 Directory emulation mode: |
| 246 A GCSFileStat iterator over matched files and directories ordered by |
| 247 name. The iterator returns GCSFileStat objects. For directories, |
| 248 only the filename and is_dir fields are set. |
| 249 |
| 250 The last name yielded can be used as next call's marker. |
| 251 """ |
| 252 if prefix: |
| 253 common.validate_bucket_path(path_prefix) |
| 254 bucket = path_prefix |
| 255 else: |
| 256 bucket, prefix = common._process_path_prefix(path_prefix) |
| 257 |
| 258 if marker and marker.startswith(bucket): |
| 259 marker = marker[len(bucket) + 1:] |
| 260 |
| 261 api = storage_api._get_storage_api(retry_params=retry_params, |
| 262 account_id=_account_id) |
| 263 options = {} |
| 264 if marker: |
| 265 options['marker'] = marker |
| 266 if max_keys: |
| 267 options['max-keys'] = max_keys |
| 268 if prefix: |
| 269 options['prefix'] = prefix |
| 270 if delimiter: |
| 271 options['delimiter'] = delimiter |
| 272 |
| 273 return _Bucket(api, bucket, options) |
| 274 |
| 275 |
| 276 class _Bucket(object): |
| 277 """A wrapper for a GCS bucket as the return value of listbucket.""" |
| 278 |
| 279 def __init__(self, api, path, options): |
| 280 """Initialize. |
| 281 |
| 282 Args: |
| 283 api: storage_api instance. |
| 284 path: bucket path of form '/bucket'. |
| 285 options: a dict of listbucket options. Please see listbucket doc. |
| 286 """ |
| 287 self._init(api, path, options) |
| 288 |
| 289 def _init(self, api, path, options): |
| 290 self._api = api |
| 291 self._path = path |
| 292 self._options = options.copy() |
| 293 self._get_bucket_fut = self._api.get_bucket_async( |
| 294 self._path + '?' + urllib.urlencode(self._options)) |
| 295 self._last_yield = None |
| 296 self._new_max_keys = self._options.get('max-keys') |
| 297 |
| 298 def __getstate__(self): |
| 299 options = self._options |
| 300 if self._last_yield: |
| 301 options['marker'] = self._last_yield.filename[len(self._path) + 1:] |
| 302 if self._new_max_keys is not None: |
| 303 options['max-keys'] = self._new_max_keys |
| 304 return {'api': self._api, |
| 305 'path': self._path, |
| 306 'options': options} |
| 307 |
| 308 def __setstate__(self, state): |
| 309 self._init(state['api'], state['path'], state['options']) |
| 310 |
| 311 def __iter__(self): |
| 312 """Iter over the bucket. |
| 313 |
| 314 Yields: |
| 315 GCSFileStat: a GCSFileStat for an object in the bucket. |
| 316 They are ordered by GCSFileStat.filename. |
| 317 """ |
| 318 total = 0 |
| 319 max_keys = self._options.get('max-keys') |
| 320 |
| 321 while self._get_bucket_fut: |
| 322 status, resp_headers, content = self._get_bucket_fut.get_result() |
| 323 errors.check_status(status, [200], self._path, resp_headers=resp_headers, |
| 324 body=content, extras=self._options) |
| 325 |
| 326 if self._should_get_another_batch(content): |
| 327 self._get_bucket_fut = self._api.get_bucket_async( |
| 328 self._path + '?' + urllib.urlencode(self._options)) |
| 329 else: |
| 330 self._get_bucket_fut = None |
| 331 |
| 332 root = ET.fromstring(content) |
| 333 dirs = self._next_dir_gen(root) |
| 334 files = self._next_file_gen(root) |
| 335 next_file = files.next() |
| 336 next_dir = dirs.next() |
| 337 |
| 338 while ((max_keys is None or total < max_keys) and |
| 339 not (next_file is None and next_dir is None)): |
| 340 total += 1 |
| 341 if next_file is None: |
| 342 self._last_yield = next_dir |
| 343 next_dir = dirs.next() |
| 344 elif next_dir is None: |
| 345 self._last_yield = next_file |
| 346 next_file = files.next() |
| 347 elif next_dir < next_file: |
| 348 self._last_yield = next_dir |
| 349 next_dir = dirs.next() |
| 350 elif next_file < next_dir: |
| 351 self._last_yield = next_file |
| 352 next_file = files.next() |
| 353 else: |
| 354 logging.error( |
| 355 'Should never reach. next file is %r. next dir is %r.', |
| 356 next_file, next_dir) |
| 357 if self._new_max_keys: |
| 358 self._new_max_keys -= 1 |
| 359 yield self._last_yield |
| 360 |
| 361 def _next_file_gen(self, root): |
| 362 """Generator for next file element in the document. |
| 363 |
| 364 Args: |
| 365 root: root element of the XML tree. |
| 366 |
| 367 Yields: |
| 368 GCSFileStat for the next file. |
| 369 """ |
| 370 for e in root.getiterator(common._T_CONTENTS): |
| 371 st_ctime, size, etag, key = None, None, None, None |
| 372 for child in e.getiterator('*'): |
| 373 if child.tag == common._T_LAST_MODIFIED: |
| 374 st_ctime = common.dt_str_to_posix(child.text) |
| 375 elif child.tag == common._T_ETAG: |
| 376 etag = child.text |
| 377 elif child.tag == common._T_SIZE: |
| 378 size = child.text |
| 379 elif child.tag == common._T_KEY: |
| 380 key = child.text |
| 381 yield common.GCSFileStat(self._path + '/' + key, |
| 382 size, etag, st_ctime) |
| 383 e.clear() |
| 384 yield None |
| 385 |
| 386 def _next_dir_gen(self, root): |
| 387 """Generator for next directory element in the document. |
| 388 |
| 389 Args: |
| 390 root: root element in the XML tree. |
| 391 |
| 392 Yields: |
| 393 GCSFileStat for the next directory. |
| 394 """ |
| 395 for e in root.getiterator(common._T_COMMON_PREFIXES): |
| 396 yield common.GCSFileStat( |
| 397 self._path + '/' + e.find(common._T_PREFIX).text, |
| 398 st_size=None, etag=None, st_ctime=None, is_dir=True) |
| 399 e.clear() |
| 400 yield None |
| 401 |
| 402 def _should_get_another_batch(self, content): |
| 403 """Whether to issue another GET bucket call. |
| 404 |
| 405 Args: |
| 406 content: response XML. |
| 407 |
| 408 Returns: |
| 409 True if should, also update self._options for the next request. |
| 410 False otherwise. |
| 411 """ |
| 412 if ('max-keys' in self._options and |
| 413 self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): |
| 414 return False |
| 415 |
| 416 elements = self._find_elements( |
| 417 content, set([common._T_IS_TRUNCATED, |
| 418 common._T_NEXT_MARKER])) |
| 419 if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': |
| 420 return False |
| 421 |
| 422 next_marker = elements.get(common._T_NEXT_MARKER) |
| 423 if next_marker is None: |
| 424 self._options.pop('marker', None) |
| 425 return False |
| 426 self._options['marker'] = next_marker |
| 427 return True |
| 428 |
| 429 def _find_elements(self, result, elements): |
| 430 """Find interesting elements from XML. |
| 431 |
| 432 This function tries to only look for specified elements |
| 433 without parsing the entire XML. The specified elements is better |
| 434 located near the beginning. |
| 435 |
| 436 Args: |
| 437 result: response XML. |
| 438 elements: a set of interesting element tags. |
| 439 |
| 440 Returns: |
| 441 A dict from element tag to element value. |
| 442 """ |
| 443 element_mapping = {} |
| 444 result = StringIO.StringIO(result) |
| 445 for _, e in ET.iterparse(result, events=('end',)): |
| 446 if not elements: |
| 447 break |
| 448 if e.tag in elements: |
| 449 element_mapping[e.tag] = e.text |
| 450 elements.remove(e.tag) |
| 451 return element_mapping |
OLD | NEW |