Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(249)

Side by Side Diff: third_party/boto/gs/resumable_upload_handler.py

Issue 12633019: Added boto/ to depot_tools/third_party (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Moved boto down by one Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « third_party/boto/gs/key.py ('k') | third_party/boto/gs/user.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2010 Google Inc.
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 import cgi
23 import errno
24 import httplib
25 import os
26 import random
27 import re
28 import socket
29 import time
30 import urlparse
31 import boto
32 from boto import config, UserAgent
33 from boto.connection import AWSAuthConnection
34 from boto.exception import InvalidUriError
35 from boto.exception import ResumableTransferDisposition
36 from boto.exception import ResumableUploadException
37 from boto.s3.keyfile import KeyFile
38 try:
39 from hashlib import md5
40 except ImportError:
41 from md5 import md5
42
43 """
44 Handler for Google Cloud Storage resumable uploads. See
45 http://code.google.com/apis/storage/docs/developer-guide.html#resumable
46 for details.
47
48 Resumable uploads will retry failed uploads, resuming at the byte
49 count completed by the last upload attempt. If too many retries happen with
50 no progress (per configurable num_retries param), the upload will be
51 aborted in the current process.
52
53 The caller can optionally specify a tracker_file_name param in the
54 ResumableUploadHandler constructor. If you do this, that file will
55 save the state needed to allow retrying later, in a separate process
56 (e.g., in a later run of gsutil).
57 """
58
59
60 class ResumableUploadHandler(object):
61
62 BUFFER_SIZE = 8192
63 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
64 socket.gaierror)
65
66 # (start, end) response indicating server has nothing (upload protocol uses
67 # inclusive numbering).
68 SERVER_HAS_NOTHING = (0, -1)
69
70 def __init__(self, tracker_file_name=None, num_retries=None):
71 """
72 Constructor. Instantiate once for each uploaded file.
73
74 :type tracker_file_name: string
75 :param tracker_file_name: optional file name to save tracker URI.
76 If supplied and the current process fails the upload, it can be
77 retried in a new process. If called with an existing file containing
78 a valid tracker URI, we'll resume the upload from this URI; else
79 we'll start a new resumable upload (and write the URI to this
80 tracker file).
81
82 :type num_retries: int
83 :param num_retries: the number of times we'll re-try a resumable upload
84 making no progress. (Count resets every time we get progress, so
85 upload can span many more than this number of retries.)
86 """
87 self.tracker_file_name = tracker_file_name
88 self.num_retries = num_retries
89 self.server_has_bytes = 0 # Byte count at last server check.
90 self.tracker_uri = None
91 if tracker_file_name:
92 self._load_tracker_uri_from_file()
93 # Save upload_start_point in instance state so caller can find how
94 # much was transferred by this ResumableUploadHandler (across retries).
95 self.upload_start_point = None
96
97 def _load_tracker_uri_from_file(self):
98 f = None
99 try:
100 f = open(self.tracker_file_name, 'r')
101 uri = f.readline().strip()
102 self._set_tracker_uri(uri)
103 except IOError, e:
104 # Ignore non-existent file (happens first time an upload
105 # is attempted on a file), but warn user for other errors.
106 if e.errno != errno.ENOENT:
107 # Will restart because self.tracker_uri == None.
108 print('Couldn\'t read URI tracker file (%s): %s. Restarting '
109 'upload from scratch.' %
110 (self.tracker_file_name, e.strerror))
111 except InvalidUriError, e:
112 # Warn user, but proceed (will restart because
113 # self.tracker_uri == None).
114 print('Invalid tracker URI (%s) found in URI tracker file '
115 '(%s). Restarting upload from scratch.' %
116 (uri, self.tracker_file_name))
117 finally:
118 if f:
119 f.close()
120
121 def _save_tracker_uri_to_file(self):
122 """
123 Saves URI to tracker file if one was passed to constructor.
124 """
125 if not self.tracker_file_name:
126 return
127 f = None
128 try:
129 f = open(self.tracker_file_name, 'w')
130 f.write(self.tracker_uri)
131 except IOError, e:
132 raise ResumableUploadException(
133 'Couldn\'t write URI tracker file (%s): %s.\nThis can happen'
134 'if you\'re using an incorrectly configured upload tool\n'
135 '(e.g., gsutil configured to save tracker files to an '
136 'unwritable directory)' %
137 (self.tracker_file_name, e.strerror),
138 ResumableTransferDisposition.ABORT)
139 finally:
140 if f:
141 f.close()
142
143 def _set_tracker_uri(self, uri):
144 """
145 Called when we start a new resumable upload or get a new tracker
146 URI for the upload. Saves URI and resets upload state.
147
148 Raises InvalidUriError if URI is syntactically invalid.
149 """
150 parse_result = urlparse.urlparse(uri)
151 if (parse_result.scheme.lower() not in ['http', 'https'] or
152 not parse_result.netloc):
153 raise InvalidUriError('Invalid tracker URI (%s)' % uri)
154 self.tracker_uri = uri
155 self.tracker_uri_host = parse_result.netloc
156 self.tracker_uri_path = '%s?%s' % (
157 parse_result.path, parse_result.query)
158 self.server_has_bytes = 0
159
160 def get_tracker_uri(self):
161 """
162 Returns upload tracker URI, or None if the upload has not yet started.
163 """
164 return self.tracker_uri
165
166 def _remove_tracker_file(self):
167 if (self.tracker_file_name and
168 os.path.exists(self.tracker_file_name)):
169 os.unlink(self.tracker_file_name)
170
171 def _build_content_range_header(self, range_spec='*', length_spec='*'):
172 return 'bytes %s/%s' % (range_spec, length_spec)
173
174 def _query_server_state(self, conn, file_length):
175 """
176 Queries server to find out state of given upload.
177
178 Note that this method really just makes special case use of the
179 fact that the upload server always returns the current start/end
180 state whenever a PUT doesn't complete.
181
182 Returns HTTP response from sending request.
183
184 Raises ResumableUploadException if problem querying server.
185 """
186 # Send an empty PUT so that server replies with this resumable
187 # transfer's state.
188 put_headers = {}
189 put_headers['Content-Range'] = (
190 self._build_content_range_header('*', file_length))
191 put_headers['Content-Length'] = '0'
192 return AWSAuthConnection.make_request(conn, 'PUT',
193 path=self.tracker_uri_path,
194 auth_path=self.tracker_uri_path,
195 headers=put_headers,
196 host=self.tracker_uri_host)
197
198 def _query_server_pos(self, conn, file_length):
199 """
200 Queries server to find out what bytes it currently has.
201
202 Returns (server_start, server_end), where the values are inclusive.
203 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
204
205 Raises ResumableUploadException if problem querying server.
206 """
207 resp = self._query_server_state(conn, file_length)
208 if resp.status == 200:
209 # To handle the boundary condition where the server has the complete
210 # file, we return (server_start, file_length-1). That way the
211 # calling code can always simply read up through server_end. (If we
212 # didn't handle this boundary condition here, the caller would have
213 # to check whether server_end == file_length and read one fewer byte
214 # in that case.)
215 return (0, file_length - 1) # Completed upload.
216 if resp.status != 308:
217 # This means the server didn't have any state for the given
218 # upload ID, which can happen (for example) if the caller saved
219 # the tracker URI to a file and then tried to restart the transfer
220 # after that upload ID has gone stale. In that case we need to
221 # start a new transfer (and the caller will then save the new
222 # tracker URI to the tracker file).
223 raise ResumableUploadException(
224 'Got non-308 response (%s) from server state query' %
225 resp.status, ResumableTransferDisposition.START_OVER)
226 got_valid_response = False
227 range_spec = resp.getheader('range')
228 if range_spec:
229 # Parse 'bytes=<from>-<to>' range_spec.
230 m = re.search('bytes=(\d+)-(\d+)', range_spec)
231 if m:
232 server_start = long(m.group(1))
233 server_end = long(m.group(2))
234 got_valid_response = True
235 else:
236 # No Range header, which means the server does not yet have
237 # any bytes. Note that the Range header uses inclusive 'from'
238 # and 'to' values. Since Range 0-0 would mean that the server
239 # has byte 0, omitting the Range header is used to indicate that
240 # the server doesn't have any bytes.
241 return self.SERVER_HAS_NOTHING
242 if not got_valid_response:
243 raise ResumableUploadException(
244 'Couldn\'t parse upload server state query response (%s)' %
245 str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
246 if conn.debug >= 1:
247 print 'Server has: Range: %d - %d.' % (server_start, server_end)
248 return (server_start, server_end)
249
250 def _start_new_resumable_upload(self, key, headers=None):
251 """
252 Starts a new resumable upload.
253
254 Raises ResumableUploadException if any errors occur.
255 """
256 conn = key.bucket.connection
257 if conn.debug >= 1:
258 print 'Starting new resumable upload.'
259 self.server_has_bytes = 0
260
261 # Start a new resumable upload by sending a POST request with an
262 # empty body and the "X-Goog-Resumable: start" header. Include any
263 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
264 # (and raise an exception if they tried to pass one, since it's
265 # a semantic error to specify it at this point, and if we were to
266 # include one now it would cause the server to expect that many
267 # bytes; the POST doesn't include the actual file bytes We set
268 # the Content-Length in the subsequent PUT, based on the uploaded
269 # file size.
270 post_headers = {}
271 for k in headers:
272 if k.lower() == 'content-length':
273 raise ResumableUploadException(
274 'Attempt to specify Content-Length header (disallowed)',
275 ResumableTransferDisposition.ABORT)
276 post_headers[k] = headers[k]
277 post_headers[conn.provider.resumable_upload_header] = 'start'
278
279 resp = conn.make_request(
280 'POST', key.bucket.name, key.name, post_headers)
281 # Get tracker URI from response 'Location' header.
282 body = resp.read()
283
284 # Check for various status conditions.
285 if resp.status in [500, 503]:
286 # Retry status 500 and 503 errors after a delay.
287 raise ResumableUploadException(
288 'Got status %d from attempt to start resumable upload. '
289 'Will wait/retry' % resp.status,
290 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
291 elif resp.status != 200 and resp.status != 201:
292 raise ResumableUploadException(
293 'Got status %d from attempt to start resumable upload. '
294 'Aborting' % resp.status,
295 ResumableTransferDisposition.ABORT)
296
297 # Else we got 200 or 201 response code, indicating the resumable
298 # upload was created.
299 tracker_uri = resp.getheader('Location')
300 if not tracker_uri:
301 raise ResumableUploadException(
302 'No resumable tracker URI found in resumable initiation '
303 'POST response (%s)' % body,
304 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
305 self._set_tracker_uri(tracker_uri)
306 self._save_tracker_uri_to_file()
307
308 def _upload_file_bytes(self, conn, http_conn, fp, file_length,
309 total_bytes_uploaded, cb, num_cb, md5sum, headers):
310 """
311 Makes one attempt to upload file bytes, using an existing resumable
312 upload connection.
313
314 Returns (etag, generation, meta_generation) from server upon success.
315
316 Raises ResumableUploadException if any problems occur.
317 """
318 buf = fp.read(self.BUFFER_SIZE)
319 if cb:
320 # The cb_count represents the number of full buffers to send between
321 # cb executions.
322 if num_cb > 2:
323 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
324 elif num_cb < 0:
325 cb_count = -1
326 else:
327 cb_count = 0
328 i = 0
329 cb(total_bytes_uploaded, file_length)
330
331 # Build resumable upload headers for the transfer. Don't send a
332 # Content-Range header if the file is 0 bytes long, because the
333 # resumable upload protocol uses an *inclusive* end-range (so, sending
334 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
335 if not headers:
336 put_headers = {}
337 else:
338 put_headers = headers.copy()
339 if file_length:
340 if total_bytes_uploaded == file_length:
341 range_header = self._build_content_range_header(
342 '*', file_length)
343 else:
344 range_header = self._build_content_range_header(
345 '%d-%d' % (total_bytes_uploaded, file_length - 1),
346 file_length)
347 put_headers['Content-Range'] = range_header
348 # Set Content-Length to the total bytes we'll send with this PUT.
349 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
350 http_request = AWSAuthConnection.build_base_http_request(
351 conn, 'PUT', path=self.tracker_uri_path, auth_path=None,
352 headers=put_headers, host=self.tracker_uri_host)
353 http_conn.putrequest('PUT', http_request.path)
354 for k in put_headers:
355 http_conn.putheader(k, put_headers[k])
356 http_conn.endheaders()
357
358 # Turn off debug on http connection so upload content isn't included
359 # in debug stream.
360 http_conn.set_debuglevel(0)
361 while buf:
362 http_conn.send(buf)
363 md5sum.update(buf)
364 total_bytes_uploaded += len(buf)
365 if cb:
366 i += 1
367 if i == cb_count or cb_count == -1:
368 cb(total_bytes_uploaded, file_length)
369 i = 0
370 buf = fp.read(self.BUFFER_SIZE)
371 http_conn.set_debuglevel(conn.debug)
372 if cb:
373 cb(total_bytes_uploaded, file_length)
374 if total_bytes_uploaded != file_length:
375 # Abort (and delete the tracker file) so if the user retries
376 # they'll start a new resumable upload rather than potentially
377 # attempting to pick back up later where we left off.
378 raise ResumableUploadException(
379 'File changed during upload: EOF at %d bytes of %d byte file.' %
380 (total_bytes_uploaded, file_length),
381 ResumableTransferDisposition.ABORT)
382 resp = http_conn.getresponse()
383 body = resp.read()
384 # Restore http connection debug level.
385 http_conn.set_debuglevel(conn.debug)
386
387 if resp.status == 200:
388 # Success.
389 return (resp.getheader('etag'),
390 resp.getheader('x-goog-generation'),
391 resp.getheader('x-goog-metageneration'))
392 # Retry timeout (408) and status 500 and 503 errors after a delay.
393 elif resp.status in [408, 500, 503]:
394 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
395 else:
396 # Catch all for any other error codes.
397 disposition = ResumableTransferDisposition.ABORT
398 raise ResumableUploadException('Got response code %d while attempting '
399 'upload (%s)' %
400 (resp.status, resp.reason), disposition)
401
402 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
403 num_cb, md5sum):
404 """
405 Attempts a resumable upload.
406
407 Returns (etag, generation, meta_generation) from server upon success.
408
409 Raises ResumableUploadException if any problems occur.
410 """
411 (server_start, server_end) = self.SERVER_HAS_NOTHING
412 conn = key.bucket.connection
413 if self.tracker_uri:
414 # Try to resume existing resumable upload.
415 try:
416 (server_start, server_end) = (
417 self._query_server_pos(conn, file_length))
418 self.server_has_bytes = server_start
419
420 if server_end:
421 # If the server already has some of the content, we need to
422 # update the md5 with the bytes that have already been
423 # uploaded to ensure we get a complete hash in the end.
424 print 'Catching up md5 for resumed upload'
425 fp.seek(0)
426 # Read local file's bytes through position server has. For
427 # example, if server has (0, 3) we want to read 3-0+1=4 bytes.
428 bytes_to_go = server_end + 1
429 while bytes_to_go:
430 chunk = fp.read(min(key.BufferSize, bytes_to_go))
431 if not chunk:
432 raise ResumableUploadException(
433 'Hit end of file during resumable upload md5 '
434 'catchup. This should not happen under\n'
435 'normal circumstances, as it indicates the '
436 'server has more bytes of this transfer\nthan'
437 ' the current file size. Restarting upload.',
438 ResumableTransferDisposition.START_OVER)
439 md5sum.update(chunk)
440 bytes_to_go -= len(chunk)
441
442 if conn.debug >= 1:
443 print 'Resuming transfer.'
444 except ResumableUploadException, e:
445 if conn.debug >= 1:
446 print 'Unable to resume transfer (%s).' % e.message
447 self._start_new_resumable_upload(key, headers)
448 else:
449 self._start_new_resumable_upload(key, headers)
450
451 # upload_start_point allows the code that instantiated the
452 # ResumableUploadHandler to find out the point from which it started
453 # uploading (e.g., so it can correctly compute throughput).
454 if self.upload_start_point is None:
455 self.upload_start_point = server_end
456
457 total_bytes_uploaded = server_end + 1
458 # Corner case: Don't attempt to seek if we've already uploaded the
459 # entire file, because if the file is a stream (e.g., the KeyFile
460 # wrapper around input key when copying between providers), attempting
461 # to seek to the end of file would result in an InvalidRange error.
462 if file_length < total_bytes_uploaded:
463 fp.seek(total_bytes_uploaded)
464 conn = key.bucket.connection
465
466 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
467 # pool connections) because httplib requires a new HTTP connection per
468 # transaction. (Without this, calling http_conn.getresponse() would get
469 # "ResponseNotReady".)
470 http_conn = conn.new_http_connection(self.tracker_uri_host,
471 conn.is_secure)
472 http_conn.set_debuglevel(conn.debug)
473
474 # Make sure to close http_conn at end so if a local file read
475 # failure occurs partway through server will terminate current upload
476 # and can report that progress on next attempt.
477 try:
478 return self._upload_file_bytes(conn, http_conn, fp, file_length,
479 total_bytes_uploaded, cb, num_cb, md5 sum,
480 headers)
481 except (ResumableUploadException, socket.error):
482 resp = self._query_server_state(conn, file_length)
483 if resp.status == 400:
484 raise ResumableUploadException('Got 400 response from server '
485 'state query after failed resumable upload attempt. This '
486 'can happen for various reasons, including specifying an '
487 'invalid request (e.g., an invalid canned ACL) or if the '
488 'file size changed between upload attempts',
489 ResumableTransferDisposition.ABORT)
490 else:
491 raise
492 finally:
493 http_conn.close()
494
495 def _check_final_md5(self, key, etag):
496 """
497 Checks that etag from server agrees with md5 computed before upload.
498 This is important, since the upload could have spanned a number of
499 hours and multiple processes (e.g., gsutil runs), and the user could
500 change some of the file and not realize they have inconsistent data.
501 """
502 if key.bucket.connection.debug >= 1:
503 print 'Checking md5 against etag.'
504 if key.md5 != etag.strip('"\''):
505 # Call key.open_read() before attempting to delete the
506 # (incorrect-content) key, so we perform that request on a
507 # different HTTP connection. This is neededb because httplib
508 # will return a "Response not ready" error if you try to perform
509 # a second transaction on the connection.
510 key.open_read()
511 key.close()
512 key.delete()
513 raise ResumableUploadException(
514 'File changed during upload: md5 signature doesn\'t match etag '
515 '(incorrect uploaded object deleted)',
516 ResumableTransferDisposition.ABORT)
517
518 def handle_resumable_upload_exception(self, e, debug):
519 if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS):
520 if debug >= 1:
521 print('Caught non-retryable ResumableUploadException (%s); '
522 'aborting but retaining tracker file' % e.message)
523 raise
524 elif (e.disposition == ResumableTransferDisposition.ABORT):
525 if debug >= 1:
526 print('Caught non-retryable ResumableUploadException (%s); '
527 'aborting and removing tracker file' % e.message)
528 self._remove_tracker_file()
529 raise
530 else:
531 if debug >= 1:
532 print('Caught ResumableUploadException (%s) - will retry' %
533 e.message)
534
535 def track_progress_less_iterations(self, server_had_bytes_before_attempt,
536 roll_back_md5=True, debug=0):
537 # At this point we had a re-tryable failure; see if made progress.
538 if self.server_has_bytes > server_had_bytes_before_attempt:
539 self.progress_less_iterations = 0 # If progress, reset counter.
540 else:
541 self.progress_less_iterations += 1
542 if roll_back_md5:
543 # Rollback any potential md5sum updates, as we did not
544 # make any progress in this iteration.
545 self.md5sum = self.md5sum_before_attempt
546
547 if self.progress_less_iterations > self.num_retries:
548 # Don't retry any longer in the current process.
549 raise ResumableUploadException(
550 'Too many resumable upload attempts failed without '
551 'progress. You might try this upload again later',
552 ResumableTransferDisposition.ABORT_CUR_PROCESS)
553
554 # Use binary exponential backoff to desynchronize client requests.
555 sleep_time_secs = random.random() * (2**self.progress_less_iterations)
556 if debug >= 1:
557 print ('Got retryable failure (%d progress-less in a row).\n'
558 'Sleeping %3.1f seconds before re-trying' %
559 (self.progress_less_iterations, sleep_time_secs))
560 time.sleep(sleep_time_secs)
561
562 def send_file(self, key, fp, headers, cb=None, num_cb=10):
563 """
564 Upload a file to a key into a bucket on GS, using GS resumable upload
565 protocol.
566
567 :type key: :class:`boto.s3.key.Key` or subclass
568 :param key: The Key object to which data is to be uploaded
569
570 :type fp: file-like object
571 :param fp: The file pointer to upload
572
573 :type headers: dict
574 :param headers: The headers to pass along with the PUT request
575
576 :type cb: function
577 :param cb: a callback function that will be called to report progress on
578 the upload. The callback should accept two integer parameters, the
579 first representing the number of bytes that have been successfully
580 transmitted to GS, and the second representing the total number of
581 bytes that need to be transmitted.
582
583 :type num_cb: int
584 :param num_cb: (optional) If a callback is specified with the cb
585 parameter, this parameter determines the granularity of the callback
586 by defining the maximum number of times the callback will be called
587 during the file transfer. Providing a negative integer will cause
588 your callback to be called with each buffer read.
589
590 Raises ResumableUploadException if a problem occurs during the transfer.
591 """
592
593 if not headers:
594 headers = {}
595 # If Content-Type header is present and set to None, remove it.
596 # This is gsutil's way of asking boto to refrain from auto-generating
597 # that header.
598 CT = 'Content-Type'
599 if CT in headers and headers[CT] is None:
600 del headers[CT]
601
602 headers['User-Agent'] = UserAgent
603
604 # Determine file size different ways for case where fp is actually a
605 # wrapper around a Key vs an actual file.
606 if isinstance(fp, KeyFile):
607 file_length = fp.getkey().size
608 else:
609 fp.seek(0, os.SEEK_END)
610 file_length = fp.tell()
611 fp.seek(0)
612 debug = key.bucket.connection.debug
613
614 # Compute the MD5 checksum on the fly.
615 self.md5sum = md5()
616
617 # Use num-retries from constructor if one was provided; else check
618 # for a value specified in the boto config file; else default to 5.
619 if self.num_retries is None:
620 self.num_retries = config.getint('Boto', 'num_retries', 6)
621 self.progress_less_iterations = 0
622
623 while True: # Retry as long as we're making progress.
624 server_had_bytes_before_attempt = self.server_has_bytes
625 self.md5sum_before_attempt = self.md5sum.copy()
626 try:
627 # Save generation and meta_generation in class state so caller
628 # can find these values, for use in preconditions of future
629 # operations on the uploaded object.
630 (etag, self.generation, self.meta_generation) = (
631 self._attempt_resumable_upload(key, fp, file_length,
632 headers, cb, num_cb,
633 self.md5sum))
634
635 # Get the final md5 for the uploaded content.
636 hd = self.md5sum.hexdigest()
637 key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd)
638
639 # Upload succceded, so remove the tracker file (if have one).
640 self._remove_tracker_file()
641 self._check_final_md5(key, etag)
642 if debug >= 1:
643 print 'Resumable upload complete.'
644 return
645 except self.RETRYABLE_EXCEPTIONS, e:
646 if debug >= 1:
647 print('Caught exception (%s)' % e.__repr__())
648 if isinstance(e, IOError) and e.errno == errno.EPIPE:
649 # Broken pipe error causes httplib to immediately
650 # close the socket (http://bugs.python.org/issue5542),
651 # so we need to close the connection before we resume
652 # the upload (which will cause a new connection to be
653 # opened the next time an HTTP request is sent).
654 key.bucket.connection.connection.close()
655 except ResumableUploadException, e:
656 self.handle_resumable_upload_exception(e, debug)
657
658 self.track_progress_less_iterations(server_had_bytes_before_attempt,
659 True, debug)
OLDNEW
« no previous file with comments | « third_party/boto/gs/key.py ('k') | third_party/boto/gs/user.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698