Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(713)

Side by Side Diff: third_party/cloud_storage/cloudstorage/storage_api.py

Issue 1031663002: Increase maximum file upload to 100MB, use cloudstorage python library (Closed) Base URL: https://github.com/dart-lang/pub-dartlang.git@master
Patch Set: Add deprecation comment to old cloud_storage.py:open() function Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2012 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing,
10 # software distributed under the License is distributed on an
11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12 # either express or implied. See the License for the specific
13 # language governing permissions and limitations under the License.
14
15 """Python wrappers for the Google Storage RESTful API."""
16
17
18
19
20
21 __all__ = ['ReadBuffer',
22 'StreamingBuffer',
23 ]
24
25 import collections
26 import os
27 import urlparse
28
29 from . import api_utils
30 from . import common
31 from . import errors
32 from . import rest_api
33
34 try:
35 from google.appengine.api import urlfetch
36 from google.appengine.ext import ndb
37 except ImportError:
38 from google.appengine.api import urlfetch
39 from google.appengine.ext import ndb
40
41
42
43 def _get_storage_api(retry_params, account_id=None):
44 """Returns storage_api instance for API methods.
45
46 Args:
47 retry_params: An instance of api_utils.RetryParams. If none,
48 thread's default will be used.
49 account_id: Internal-use only.
50
51 Returns:
52 A storage_api instance to handle urlfetch work to GCS.
53 On dev appserver, this instance by default will talk to a local stub
54 unless common.ACCESS_TOKEN is set. That token will be used to talk
55 to the real GCS.
56 """
57
58
59 api = _StorageApi(_StorageApi.full_control_scope,
60 service_account_id=account_id,
61 retry_params=retry_params)
62 if common.local_run() and not common.get_access_token():
63 api.api_url = common.local_api_url()
64 if common.get_access_token():
65 api.token = common.get_access_token()
66 return api
67
68
69 class _StorageApi(rest_api._RestApi):
70 """A simple wrapper for the Google Storage RESTful API.
71
72 WARNING: Do NOT directly use this api. It's an implementation detail
73 and is subject to change at any release.
74
75 All async methods have similar args and returns.
76
77 Args:
78 path: The path to the Google Storage object or bucket, e.g.
79 '/mybucket/myfile' or '/mybucket'.
80 **kwd: Options for urlfetch. e.g.
81 headers={'content-type': 'text/plain'}, payload='blah'.
82
83 Returns:
84 A ndb Future. When fulfilled, future.get_result() should return
85 a tuple of (status, headers, content) that represents a HTTP response
86 of Google Cloud Storage XML API.
87 """
88
89 api_url = 'https://storage.googleapis.com'
90 read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
91 read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
92 full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
93
94 def __getstate__(self):
95 """Store state as part of serialization/pickling.
96
97 Returns:
98 A tuple (of dictionaries) with the state of this object
99 """
100 return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
101
102 def __setstate__(self, state):
103 """Restore state as part of deserialization/unpickling.
104
105 Args:
106 state: the tuple from a __getstate__ call
107 """
108 superstate, localstate = state
109 super(_StorageApi, self).__setstate__(superstate)
110 self.api_url = localstate['api_url']
111
112 @api_utils._eager_tasklet
113 @ndb.tasklet
114 def do_request_async(self, url, method='GET', headers=None, payload=None,
115 deadline=None, callback=None):
116 """Inherit docs.
117
118 This method translates urlfetch exceptions to more service specific ones.
119 """
120 if headers is None:
121 headers = {}
122 if 'x-goog-api-version' not in headers:
123 headers['x-goog-api-version'] = '2'
124 headers['accept-encoding'] = 'gzip, *'
125 try:
126 resp_tuple = yield super(_StorageApi, self).do_request_async(
127 url, method=method, headers=headers, payload=payload,
128 deadline=deadline, callback=callback)
129 except urlfetch.DownloadError, e:
130 raise errors.TimeoutError(
131 'Request to Google Cloud Storage timed out.', e)
132
133 raise ndb.Return(resp_tuple)
134
135
136 def post_object_async(self, path, **kwds):
137 """POST to an object."""
138 return self.do_request_async(self.api_url + path, 'POST', **kwds)
139
140 def put_object_async(self, path, **kwds):
141 """PUT an object."""
142 return self.do_request_async(self.api_url + path, 'PUT', **kwds)
143
144 def get_object_async(self, path, **kwds):
145 """GET an object.
146
147 Note: No payload argument is supported.
148 """
149 return self.do_request_async(self.api_url + path, 'GET', **kwds)
150
151 def delete_object_async(self, path, **kwds):
152 """DELETE an object.
153
154 Note: No payload argument is supported.
155 """
156 return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
157
158 def head_object_async(self, path, **kwds):
159 """HEAD an object.
160
161 Depending on request headers, HEAD returns various object properties,
162 e.g. Content-Length, Last-Modified, and ETag.
163
164 Note: No payload argument is supported.
165 """
166 return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
167
168 def get_bucket_async(self, path, **kwds):
169 """GET a bucket."""
170 return self.do_request_async(self.api_url + path, 'GET', **kwds)
171
172
173 _StorageApi = rest_api.add_sync_methods(_StorageApi)
174
175
176 class ReadBuffer(object):
177 """A class for reading Google storage files."""
178
179 DEFAULT_BUFFER_SIZE = 1024 * 1024
180 MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
181
182 def __init__(self,
183 api,
184 path,
185 buffer_size=DEFAULT_BUFFER_SIZE,
186 max_request_size=MAX_REQUEST_SIZE):
187 """Constructor.
188
189 Args:
190 api: A StorageApi instance.
191 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
192 buffer_size: buffer size. The ReadBuffer keeps
193 one buffer. But there may be a pending future that contains
194 a second buffer. This size must be less than max_request_size.
195 max_request_size: Max bytes to request in one urlfetch.
196 """
197 self._api = api
198 self._path = path
199 self.name = api_utils._unquote_filename(path)
200 self.closed = False
201
202 assert buffer_size <= max_request_size
203 self._buffer_size = buffer_size
204 self._max_request_size = max_request_size
205 self._offset = 0
206 self._buffer = _Buffer()
207 self._etag = None
208
209 get_future = self._get_segment(0, self._buffer_size, check_response=False)
210
211 status, headers, content = self._api.head_object(path)
212 errors.check_status(status, [200], path, resp_headers=headers, body=content)
213 self._file_size = long(common.get_stored_content_length(headers))
214 self._check_etag(headers.get('etag'))
215
216 self._buffer_future = None
217
218 if self._file_size != 0:
219 content, check_response_closure = get_future.get_result()
220 check_response_closure()
221 self._buffer.reset(content)
222 self._request_next_buffer()
223
224 def __getstate__(self):
225 """Store state as part of serialization/pickling.
226
227 The contents of the read buffer are not stored, only the current offset for
228 data read by the client. A new read buffer is established at unpickling.
229 The head information for the object (file size and etag) are stored to
230 reduce startup and ensure the file has not changed.
231
232 Returns:
233 A dictionary with the state of this object
234 """
235 return {'api': self._api,
236 'path': self._path,
237 'buffer_size': self._buffer_size,
238 'request_size': self._max_request_size,
239 'etag': self._etag,
240 'size': self._file_size,
241 'offset': self._offset,
242 'closed': self.closed}
243
244 def __setstate__(self, state):
245 """Restore state as part of deserialization/unpickling.
246
247 Args:
248 state: the dictionary from a __getstate__ call
249
250 Along with restoring the state, pre-fetch the next read buffer.
251 """
252 self._api = state['api']
253 self._path = state['path']
254 self.name = api_utils._unquote_filename(self._path)
255 self._buffer_size = state['buffer_size']
256 self._max_request_size = state['request_size']
257 self._etag = state['etag']
258 self._file_size = state['size']
259 self._offset = state['offset']
260 self._buffer = _Buffer()
261 self.closed = state['closed']
262 self._buffer_future = None
263 if self._remaining() and not self.closed:
264 self._request_next_buffer()
265
266 def __iter__(self):
267 """Iterator interface.
268
269 Note the ReadBuffer container itself is the iterator. It's
270 (quote PEP0234)
271 'destructive: they consumes all the values and a second iterator
272 cannot easily be created that iterates independently over the same values.
273 You could open the file for the second time, or seek() to the beginning.'
274
275 Returns:
276 Self.
277 """
278 return self
279
280 def next(self):
281 line = self.readline()
282 if not line:
283 raise StopIteration()
284 return line
285
286 def readline(self, size=-1):
287 """Read one line delimited by '\n' from the file.
288
289 A trailing newline character is kept in the string. It may be absent when a
290 file ends with an incomplete line. If the size argument is non-negative,
291 it specifies the maximum string size (counting the newline) to return.
292 A negative size is the same as unspecified. Empty string is returned
293 only when EOF is encountered immediately.
294
295 Args:
296 size: Maximum number of bytes to read. If not specified, readline stops
297 only on '\n' or EOF.
298
299 Returns:
300 The data read as a string.
301
302 Raises:
303 IOError: When this buffer is closed.
304 """
305 self._check_open()
306 if size == 0 or not self._remaining():
307 return ''
308
309 data_list = []
310 newline_offset = self._buffer.find_newline(size)
311 while newline_offset < 0:
312 data = self._buffer.read(size)
313 size -= len(data)
314 self._offset += len(data)
315 data_list.append(data)
316 if size == 0 or not self._remaining():
317 return ''.join(data_list)
318 self._buffer.reset(self._buffer_future.get_result())
319 self._request_next_buffer()
320 newline_offset = self._buffer.find_newline(size)
321
322 data = self._buffer.read_to_offset(newline_offset + 1)
323 self._offset += len(data)
324 data_list.append(data)
325
326 return ''.join(data_list)
327
328 def read(self, size=-1):
329 """Read data from RAW file.
330
331 Args:
332 size: Number of bytes to read as integer. Actual number of bytes
333 read is always equal to size unless EOF is reached. If size is
334 negative or unspecified, read the entire file.
335
336 Returns:
337 data read as str.
338
339 Raises:
340 IOError: When this buffer is closed.
341 """
342 self._check_open()
343 if not self._remaining():
344 return ''
345
346 data_list = []
347 while True:
348 remaining = self._buffer.remaining()
349 if size >= 0 and size < remaining:
350 data_list.append(self._buffer.read(size))
351 self._offset += size
352 break
353 else:
354 size -= remaining
355 self._offset += remaining
356 data_list.append(self._buffer.read())
357
358 if self._buffer_future is None:
359 if size < 0 or size >= self._remaining():
360 needs = self._remaining()
361 else:
362 needs = size
363 data_list.extend(self._get_segments(self._offset, needs))
364 self._offset += needs
365 break
366
367 if self._buffer_future:
368 self._buffer.reset(self._buffer_future.get_result())
369 self._buffer_future = None
370
371 if self._buffer_future is None:
372 self._request_next_buffer()
373 return ''.join(data_list)
374
375 def _remaining(self):
376 return self._file_size - self._offset
377
378 def _request_next_buffer(self):
379 """Request next buffer.
380
381 Requires self._offset and self._buffer are in consistent state.
382 """
383 self._buffer_future = None
384 next_offset = self._offset + self._buffer.remaining()
385 if next_offset != self._file_size:
386 self._buffer_future = self._get_segment(next_offset,
387 self._buffer_size)
388
389 def _get_segments(self, start, request_size):
390 """Get segments of the file from Google Storage as a list.
391
392 A large request is broken into segments to avoid hitting urlfetch
393 response size limit. Each segment is returned from a separate urlfetch.
394
395 Args:
396 start: start offset to request. Inclusive. Have to be within the
397 range of the file.
398 request_size: number of bytes to request.
399
400 Returns:
401 A list of file segments in order
402 """
403 if not request_size:
404 return []
405
406 end = start + request_size
407 futures = []
408
409 while request_size > self._max_request_size:
410 futures.append(self._get_segment(start, self._max_request_size))
411 request_size -= self._max_request_size
412 start += self._max_request_size
413 if start < end:
414 futures.append(self._get_segment(start, end-start))
415 return [fut.get_result() for fut in futures]
416
417 @ndb.tasklet
418 def _get_segment(self, start, request_size, check_response=True):
419 """Get a segment of the file from Google Storage.
420
421 Args:
422 start: start offset of the segment. Inclusive. Have to be within the
423 range of the file.
424 request_size: number of bytes to request. Have to be small enough
425 for a single urlfetch request. May go over the logical range of the
426 file.
427 check_response: True to check the validity of GCS response automatically
428 before the future returns. False otherwise. See Yields section.
429
430 Yields:
431 If check_response is True, the segment [start, start + request_size)
432 of the file.
433 Otherwise, a tuple. The first element is the unverified file segment.
434 The second element is a closure that checks response. Caller should
435 first invoke the closure before consuing the file segment.
436
437 Raises:
438 ValueError: if the file has changed while reading.
439 """
440 end = start + request_size - 1
441 content_range = '%d-%d' % (start, end)
442 headers = {'Range': 'bytes=' + content_range}
443 status, resp_headers, content = yield self._api.get_object_async(
444 self._path, headers=headers)
445 def _checker():
446 errors.check_status(status, [200, 206], self._path, headers,
447 resp_headers, body=content)
448 self._check_etag(resp_headers.get('etag'))
449 if check_response:
450 _checker()
451 raise ndb.Return(content)
452 raise ndb.Return(content, _checker)
453
454 def _check_etag(self, etag):
455 """Check if etag is the same across requests to GCS.
456
457 If self._etag is None, set it. If etag is set, check that the new
458 etag equals the old one.
459
460 In the __init__ method, we fire one HEAD and one GET request using
461 ndb tasklet. One of them would return first and set the first value.
462
463 Args:
464 etag: etag from a GCS HTTP response. None if etag is not part of the
465 response header. It could be None for example in the case of GCS
466 composite file.
467
468 Raises:
469 ValueError: if two etags are not equal.
470 """
471 if etag is None:
472 return
473 elif self._etag is None:
474 self._etag = etag
475 elif self._etag != etag:
476 raise ValueError('File on GCS has changed while reading.')
477
478 def close(self):
479 self.closed = True
480 self._buffer = None
481 self._buffer_future = None
482
483 def __enter__(self):
484 return self
485
486 def __exit__(self, atype, value, traceback):
487 self.close()
488 return False
489
490 def seek(self, offset, whence=os.SEEK_SET):
491 """Set the file's current offset.
492
493 Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
494
495 Args:
496 offset: seek offset as number.
497 whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
498 os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
499 (seek relative to the end, offset should be negative).
500
501 Raises:
502 IOError: When this buffer is closed.
503 ValueError: When whence is invalid.
504 """
505 self._check_open()
506
507 self._buffer.reset()
508 self._buffer_future = None
509
510 if whence == os.SEEK_SET:
511 self._offset = offset
512 elif whence == os.SEEK_CUR:
513 self._offset += offset
514 elif whence == os.SEEK_END:
515 self._offset = self._file_size + offset
516 else:
517 raise ValueError('Whence mode %s is invalid.' % str(whence))
518
519 self._offset = min(self._offset, self._file_size)
520 self._offset = max(self._offset, 0)
521 if self._remaining():
522 self._request_next_buffer()
523
524 def tell(self):
525 """Tell the file's current offset.
526
527 Returns:
528 current offset in reading this file.
529
530 Raises:
531 IOError: When this buffer is closed.
532 """
533 self._check_open()
534 return self._offset
535
536 def _check_open(self):
537 if self.closed:
538 raise IOError('Buffer is closed.')
539
540 def seekable(self):
541 return True
542
543 def readable(self):
544 return True
545
546 def writable(self):
547 return False
548
549
550 class _Buffer(object):
551 """In memory buffer."""
552
553 def __init__(self):
554 self.reset()
555
556 def reset(self, content='', offset=0):
557 self._buffer = content
558 self._offset = offset
559
560 def read(self, size=-1):
561 """Returns bytes from self._buffer and update related offsets.
562
563 Args:
564 size: number of bytes to read starting from current offset.
565 Read the entire buffer if negative.
566
567 Returns:
568 Requested bytes from buffer.
569 """
570 if size < 0:
571 offset = len(self._buffer)
572 else:
573 offset = self._offset + size
574 return self.read_to_offset(offset)
575
576 def read_to_offset(self, offset):
577 """Returns bytes from self._buffer and update related offsets.
578
579 Args:
580 offset: read from current offset to this offset, exclusive.
581
582 Returns:
583 Requested bytes from buffer.
584 """
585 assert offset >= self._offset
586 result = self._buffer[self._offset: offset]
587 self._offset += len(result)
588 return result
589
590 def remaining(self):
591 return len(self._buffer) - self._offset
592
593 def find_newline(self, size=-1):
594 """Search for newline char in buffer starting from current offset.
595
596 Args:
597 size: number of bytes to search. -1 means all.
598
599 Returns:
600 offset of newline char in buffer. -1 if doesn't exist.
601 """
602 if size < 0:
603 return self._buffer.find('\n', self._offset)
604 return self._buffer.find('\n', self._offset, self._offset + size)
605
606
607 class StreamingBuffer(object):
608 """A class for creating large objects using the 'resumable' API.
609
610 The API is a subset of the Python writable stream API sufficient to
611 support writing zip files using the zipfile module.
612
613 The exact sequence of calls and use of headers is documented at
614 https://developers.google.com/storage/docs/developer-guide#unknownresumables
615 """
616
617 _blocksize = 256 * 1024
618
619 _flushsize = 8 * _blocksize
620
621 _maxrequestsize = 9 * 4 * _blocksize
622
623 def __init__(self,
624 api,
625 path,
626 content_type=None,
627 gcs_headers=None):
628 """Constructor.
629
630 Args:
631 api: A StorageApi instance.
632 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
633 content_type: Optional content-type; Default value is
634 delegate to Google Cloud Storage.
635 gcs_headers: additional gs headers as a str->str dict, e.g
636 {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
637 Raises:
638 IOError: When this location can not be found.
639 """
640 assert self._maxrequestsize > self._blocksize
641 assert self._maxrequestsize % self._blocksize == 0
642 assert self._maxrequestsize >= self._flushsize
643
644 self._api = api
645 self._path = path
646
647 self.name = api_utils._unquote_filename(path)
648 self.closed = False
649
650 self._buffer = collections.deque()
651 self._buffered = 0
652 self._written = 0
653 self._offset = 0
654
655 headers = {'x-goog-resumable': 'start'}
656 if content_type:
657 headers['content-type'] = content_type
658 if gcs_headers:
659 headers.update(gcs_headers)
660 status, resp_headers, content = self._api.post_object(path, headers=headers)
661 errors.check_status(status, [201], path, headers, resp_headers,
662 body=content)
663 loc = resp_headers.get('location')
664 if not loc:
665 raise IOError('No location header found in 201 response')
666 parsed = urlparse.urlparse(loc)
667 self._path_with_token = '%s?%s' % (self._path, parsed.query)
668
669 def __getstate__(self):
670 """Store state as part of serialization/pickling.
671
672 The contents of the write buffer are stored. Writes to the underlying
673 storage are required to be on block boundaries (_blocksize) except for the
674 last write. In the worst case the pickled version of this object may be
675 slightly larger than the blocksize.
676
677 Returns:
678 A dictionary with the state of this object
679
680 """
681 return {'api': self._api,
682 'path': self._path,
683 'path_token': self._path_with_token,
684 'buffer': self._buffer,
685 'buffered': self._buffered,
686 'written': self._written,
687 'offset': self._offset,
688 'closed': self.closed}
689
690 def __setstate__(self, state):
691 """Restore state as part of deserialization/unpickling.
692
693 Args:
694 state: the dictionary from a __getstate__ call
695 """
696 self._api = state['api']
697 self._path_with_token = state['path_token']
698 self._buffer = state['buffer']
699 self._buffered = state['buffered']
700 self._written = state['written']
701 self._offset = state['offset']
702 self.closed = state['closed']
703 self._path = state['path']
704 self.name = api_utils._unquote_filename(self._path)
705
706 def write(self, data):
707 """Write some bytes.
708
709 Args:
710 data: data to write. str.
711
712 Raises:
713 TypeError: if data is not of type str.
714 """
715 self._check_open()
716 if not isinstance(data, str):
717 raise TypeError('Expected str but got %s.' % type(data))
718 if not data:
719 return
720 self._buffer.append(data)
721 self._buffered += len(data)
722 self._offset += len(data)
723 if self._buffered >= self._flushsize:
724 self._flush()
725
726 def flush(self):
727 """Flush as much as possible to GCS.
728
729 GCS *requires* that all writes except for the final one align on
730 256KB boundaries. So the internal buffer may still have < 256KB bytes left
731 after flush.
732 """
733 self._check_open()
734 self._flush(finish=False)
735
736 def tell(self):
737 """Return the total number of bytes passed to write() so far.
738
739 (There is no seek() method.)
740 """
741 return self._offset
742
743 def close(self):
744 """Flush the buffer and finalize the file.
745
746 When this returns the new file is available for reading.
747 """
748 if not self.closed:
749 self.closed = True
750 self._flush(finish=True)
751 self._buffer = None
752
753 def __enter__(self):
754 return self
755
756 def __exit__(self, atype, value, traceback):
757 self.close()
758 return False
759
760 def _flush(self, finish=False):
761 """Internal API to flush.
762
763 Buffer is flushed to GCS only when the total amount of buffered data is at
764 least self._blocksize, or to flush the final (incomplete) block of
765 the file with finish=True.
766 """
767 while ((finish and self._buffered >= 0) or
768 (not finish and self._buffered >= self._blocksize)):
769 tmp_buffer = []
770 tmp_buffer_len = 0
771
772 excess = 0
773 while self._buffer:
774 buf = self._buffer.popleft()
775 size = len(buf)
776 self._buffered -= size
777 tmp_buffer.append(buf)
778 tmp_buffer_len += size
779 if tmp_buffer_len >= self._maxrequestsize:
780 excess = tmp_buffer_len - self._maxrequestsize
781 break
782 if not finish and (
783 tmp_buffer_len % self._blocksize + self._buffered <
784 self._blocksize):
785 excess = tmp_buffer_len % self._blocksize
786 break
787
788 if excess:
789 over = tmp_buffer.pop()
790 size = len(over)
791 assert size >= excess
792 tmp_buffer_len -= size
793 head, tail = over[:-excess], over[-excess:]
794 self._buffer.appendleft(tail)
795 self._buffered += len(tail)
796 if head:
797 tmp_buffer.append(head)
798 tmp_buffer_len += len(head)
799
800 data = ''.join(tmp_buffer)
801 file_len = '*'
802 if finish and not self._buffered:
803 file_len = self._written + len(data)
804 self._send_data(data, self._written, file_len)
805 self._written += len(data)
806 if file_len != '*':
807 break
808
809 def _send_data(self, data, start_offset, file_len):
810 """Send the block to the storage service.
811
812 This is a utility method that does not modify self.
813
814 Args:
815 data: data to send in str.
816 start_offset: start offset of the data in relation to the file.
817 file_len: an int if this is the last data to append to the file.
818 Otherwise '*'.
819 """
820 headers = {}
821 end_offset = start_offset + len(data) - 1
822
823 if data:
824 headers['content-range'] = ('bytes %d-%d/%s' %
825 (start_offset, end_offset, file_len))
826 else:
827 headers['content-range'] = ('bytes */%s' % file_len)
828
829 status, response_headers, content = self._api.put_object(
830 self._path_with_token, payload=data, headers=headers)
831 if file_len == '*':
832 expected = 308
833 else:
834 expected = 200
835 errors.check_status(status, [expected], self._path, headers,
836 response_headers, content,
837 {'upload_path': self._path_with_token})
838
839 def _get_offset_from_gcs(self):
840 """Get the last offset that has been written to GCS.
841
842 This is a utility method that does not modify self.
843
844 Returns:
845 an int of the last offset written to GCS by this upload, inclusive.
846 -1 means nothing has been written.
847 """
848 headers = {'content-range': 'bytes */*'}
849 status, response_headers, content = self._api.put_object(
850 self._path_with_token, headers=headers)
851 errors.check_status(status, [308], self._path, headers,
852 response_headers, content,
853 {'upload_path': self._path_with_token})
854 val = response_headers.get('range')
855 if val is None:
856 return -1
857 _, offset = val.rsplit('-', 1)
858 return int(offset)
859
860 def _force_close(self, file_length=None):
861 """Close this buffer on file_length.
862
863 Finalize this upload immediately on file_length.
864 Contents that are still in memory will not be uploaded.
865
866 This is a utility method that does not modify self.
867
868 Args:
869 file_length: file length. Must match what has been uploaded. If None,
870 it will be queried from GCS.
871 """
872 if file_length is None:
873 file_length = self._get_offset_from_gcs() + 1
874 self._send_data('', 0, file_length)
875
876 def _check_open(self):
877 if self.closed:
878 raise IOError('Buffer is closed.')
879
880 def seekable(self):
881 return False
882
883 def readable(self):
884 return False
885
886 def writable(self):
887 return True
OLDNEW
« no previous file with comments | « third_party/cloud_storage/cloudstorage/rest_api.py ('k') | third_party/cloud_storage/cloudstorage/test_utils.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698