Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Unified Diff: third_party/gsutil/boto/boto/glacier/concurrent.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Review fixes, updated gsutil Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/gsutil/boto/boto/glacier/concurrent.py
diff --git a/third_party/gsutil/boto/boto/glacier/concurrent.py b/third_party/gsutil/boto/boto/glacier/concurrent.py
new file mode 100644
index 0000000000000000000000000000000000000000..a956f0660a31ce06d5674d98f76a2c594867f3b4
--- /dev/null
+++ b/third_party/gsutil/boto/boto/glacier/concurrent.py
@@ -0,0 +1,409 @@
+# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+import os
+import math
+import threading
+import hashlib
+import time
+import logging
+from Queue import Queue, Empty
+import binascii
+
+from .utils import DEFAULT_PART_SIZE, minimum_part_size, chunk_hashes, \
+ tree_hash, bytes_to_hex
+from .exceptions import UploadArchiveError, DownloadArchiveError, \
+ TreeHashDoesNotMatchError
+
+
+_END_SENTINEL = object()
+log = logging.getLogger('boto.glacier.concurrent')
+
+
+class ConcurrentTransferer(object):
+ def __init__(self, part_size=DEFAULT_PART_SIZE, num_threads=10):
+ self._part_size = part_size
+ self._num_threads = num_threads
+ self._threads = []
+
+ def _calculate_required_part_size(self, total_size):
+ min_part_size_required = minimum_part_size(total_size)
+ if self._part_size >= min_part_size_required:
+ part_size = self._part_size
+ else:
+ part_size = min_part_size_required
+ log.debug("The part size specified (%s) is smaller than "
+ "the minimum required part size. Using a part "
+ "size of: %s", self._part_size, part_size)
+ total_parts = int(math.ceil(total_size / float(part_size)))
+ return total_parts, part_size
+
+ def _shutdown_threads(self):
+ log.debug("Shutting down threads.")
+ for thread in self._threads:
+ thread.should_continue = False
+ for thread in self._threads:
+ thread.join()
+ log.debug("Threads have exited.")
+
+ def _add_work_items_to_queue(self, total_parts, worker_queue, part_size):
+ log.debug("Adding work items to queue.")
+ for i in xrange(total_parts):
+ worker_queue.put((i, part_size))
+ for i in xrange(self._num_threads):
+ worker_queue.put(_END_SENTINEL)
+
+
+class ConcurrentUploader(ConcurrentTransferer):
+ """Concurrently upload an archive to glacier.
+
+ This class uses a thread pool to concurrently upload an archive
+ to glacier using the multipart upload API.
+
+ The threadpool is completely managed by this class and is
+ transparent to the users of this class.
+
+ """
+ def __init__(self, api, vault_name, part_size=DEFAULT_PART_SIZE,
+ num_threads=10):
+ """
+ :type api: :class:`boto.glacier.layer1.Layer1`
+ :param api: A layer1 glacier object.
+
+ :type vault_name: str
+ :param vault_name: The name of the vault.
+
+ :type part_size: int
+ :param part_size: The size, in bytes, of the chunks to use when uploading
+ the archive parts. The part size must be a megabyte multiplied by
+ a power of two.
+
+ """
+ super(ConcurrentUploader, self).__init__(part_size, num_threads)
+ self._api = api
+ self._vault_name = vault_name
+
+ def upload(self, filename, description=None):
+ """Concurrently create an archive.
+
+ The part_size value specified when the class was constructed
+ will be used *unless* it is smaller than the minimum required
+ part size needed for the size of the given file. In that case,
+ the part size used will be the minimum part size required
+ to properly upload the given file.
+
+ :type file: str
+ :param file: The filename to upload
+
+ :type description: str
+ :param description: The description of the archive.
+
+ :rtype: str
+ :return: The archive id of the newly created archive.
+
+ """
+ total_size = os.stat(filename).st_size
+ total_parts, part_size = self._calculate_required_part_size(total_size)
+ hash_chunks = [None] * total_parts
+ worker_queue = Queue()
+ result_queue = Queue()
+ response = self._api.initiate_multipart_upload(self._vault_name,
+ part_size,
+ description)
+ upload_id = response['UploadId']
+ # The basic idea is to add the chunks (the offsets not the actual
+ # contents) to a work queue, start up a thread pool, let the crank
+ # through the items in the work queue, and then place their results
+ # in a result queue which we use to complete the multipart upload.
+ self._add_work_items_to_queue(total_parts, worker_queue, part_size)
+ self._start_upload_threads(result_queue, upload_id,
+ worker_queue, filename)
+ try:
+ self._wait_for_upload_threads(hash_chunks, result_queue,
+ total_parts)
+ except UploadArchiveError, e:
+ log.debug("An error occurred while uploading an archive, "
+ "aborting multipart upload.")
+ self._api.abort_multipart_upload(self._vault_name, upload_id)
+ raise e
+ log.debug("Completing upload.")
+ response = self._api.complete_multipart_upload(
+ self._vault_name, upload_id, bytes_to_hex(tree_hash(hash_chunks)),
+ total_size)
+ log.debug("Upload finished.")
+ return response['ArchiveId']
+
+ def _wait_for_upload_threads(self, hash_chunks, result_queue, total_parts):
+ for _ in xrange(total_parts):
+ result = result_queue.get()
+ if isinstance(result, Exception):
+ log.debug("An error was found in the result queue, terminating "
+ "threads: %s", result)
+ self._shutdown_threads()
+ raise UploadArchiveError("An error occurred while uploading "
+ "an archive: %s" % result)
+ # Each unit of work returns the tree hash for the given part
+ # number, which we use at the end to compute the tree hash of
+ # the entire archive.
+ part_number, tree_sha256 = result
+ hash_chunks[part_number] = tree_sha256
+ self._shutdown_threads()
+
+ def _start_upload_threads(self, result_queue, upload_id, worker_queue,
+ filename):
+ log.debug("Starting threads.")
+ for _ in xrange(self._num_threads):
+ thread = UploadWorkerThread(self._api, self._vault_name, filename,
+ upload_id, worker_queue, result_queue)
+ time.sleep(0.2)
+ thread.start()
+ self._threads.append(thread)
+
+
+class TransferThread(threading.Thread):
+ def __init__(self, worker_queue, result_queue):
+ super(TransferThread, self).__init__()
+ self._worker_queue = worker_queue
+ self._result_queue = result_queue
+ # This value can be set externally by other objects
+ # to indicate that the thread should be shut down.
+ self.should_continue = True
+
+ def run(self):
+ while self.should_continue:
+ try:
+ work = self._worker_queue.get(timeout=1)
+ except Empty:
+ continue
+ if work is _END_SENTINEL:
+ return
+ result = self._process_chunk(work)
+ self._result_queue.put(result)
+
+ def _process_chunk(self, work):
+ pass
+
+
+class UploadWorkerThread(TransferThread):
+ def __init__(self, api, vault_name, filename, upload_id,
+ worker_queue, result_queue, num_retries=5,
+ time_between_retries=5,
+ retry_exceptions=Exception):
+ super(UploadWorkerThread, self).__init__(worker_queue, result_queue)
+ self._api = api
+ self._vault_name = vault_name
+ self._filename = filename
+ self._fileobj = open(filename, 'rb')
+ self._upload_id = upload_id
+ self._num_retries = num_retries
+ self._time_between_retries = time_between_retries
+ self._retry_exceptions = retry_exceptions
+
+ def _process_chunk(self, work):
+ result = None
+ for _ in xrange(self._num_retries):
+ try:
+ result = self._upload_chunk(work)
+ break
+ except self._retry_exceptions, e:
+ log.error("Exception caught uploading part number %s for "
+ "vault %s, filename: %s", work[0], self._vault_name,
+ self._filename)
+ time.sleep(self._time_between_retries)
+ result = e
+ return result
+
+ def _upload_chunk(self, work):
+ part_number, part_size = work
+ start_byte = part_number * part_size
+ self._fileobj.seek(start_byte)
+ contents = self._fileobj.read(part_size)
+ linear_hash = hashlib.sha256(contents).hexdigest()
+ tree_hash_bytes = tree_hash(chunk_hashes(contents))
+ byte_range = (start_byte, start_byte + len(contents) - 1)
+ log.debug("Uploading chunk %s of size %s", part_number, part_size)
+ response = self._api.upload_part(self._vault_name, self._upload_id,
+ linear_hash,
+ bytes_to_hex(tree_hash_bytes),
+ byte_range, contents)
+ # Reading the response allows the connection to be reused.
+ response.read()
+ return (part_number, tree_hash_bytes)
+
+
+class ConcurrentDownloader(ConcurrentTransferer):
+ """
+ Concurrently download an archive from glacier.
+
+ This class uses a thread pool to concurrently download an archive
+ from glacier.
+
+ The threadpool is completely managed by this class and is
+ transparent to the users of this class.
+
+ """
+ def __init__(self, job, part_size=DEFAULT_PART_SIZE,
+ num_threads=10):
+ """
+ :param job: A layer2 job object for archive retrieval object.
+
+ :param part_size: The size, in bytes, of the chunks to use when uploading
+ the archive parts. The part size must be a megabyte multiplied by
+ a power of two.
+
+ """
+ super(ConcurrentDownloader, self).__init__(part_size, num_threads)
+ self._job = job
+
+ def download(self, filename):
+ """
+ Concurrently download an archive.
+
+ :param filename: The filename to download the archive to
+ :type filename: str
+
+ """
+ total_size = self._job.archive_size
+ total_parts, part_size = self._calculate_required_part_size(total_size)
+ worker_queue = Queue()
+ result_queue = Queue()
+ self._add_work_items_to_queue(total_parts, worker_queue, part_size)
+ self._start_download_threads(result_queue, worker_queue)
+ try:
+ self._wait_for_download_threads(filename, result_queue, total_parts)
+ except DownloadArchiveError, e:
+ log.debug("An error occurred while downloading an archive: %s", e)
+ raise e
+ log.debug("Download completed.")
+
+ def _wait_for_download_threads(self, filename, result_queue, total_parts):
+ """
+ Waits until the result_queue is filled with all the downloaded parts
+ This indicates that all part downloads have completed
+
+ Saves downloaded parts into filename
+
+ :param filename:
+ :param result_queue:
+ :param total_parts:
+ """
+ hash_chunks = [None] * total_parts
+ with open(filename, "wb") as f:
+ for _ in xrange(total_parts):
+ result = result_queue.get()
+ if isinstance(result, Exception):
+ log.debug("An error was found in the result queue, "
+ "terminating threads: %s", result)
+ self._shutdown_threads()
+ raise DownloadArchiveError(
+ "An error occurred while uploading "
+ "an archive: %s" % result)
+ part_number, part_size, actual_hash, data = result
+ hash_chunks[part_number] = actual_hash
+ start_byte = part_number * part_size
+ f.seek(start_byte)
+ f.write(data)
+ f.flush()
+ final_hash = bytes_to_hex(tree_hash(hash_chunks))
+ log.debug("Verifying final tree hash of archive, expecting: %s, "
+ "actual: %s", self._job.sha256_treehash, final_hash)
+ if self._job.sha256_treehash != final_hash:
+ self._shutdown_threads()
+ raise TreeHashDoesNotMatchError(
+ "Tree hash for entire archive does not match, "
+ "expected: %s, got: %s" % (self._job.sha256_treehash,
+ final_hash))
+ self._shutdown_threads()
+
+ def _start_download_threads(self, result_queue, worker_queue):
+ log.debug("Starting threads.")
+ for _ in xrange(self._num_threads):
+ thread = DownloadWorkerThread(self._job, worker_queue, result_queue)
+ time.sleep(0.2)
+ thread.start()
+ self._threads.append(thread)
+
+
+class DownloadWorkerThread(TransferThread):
+ def __init__(self, job,
+ worker_queue, result_queue,
+ num_retries=5,
+ time_between_retries=5,
+ retry_exceptions=Exception):
+ """
+ Individual download thread that will download parts of the file from Glacier. Parts
+ to download stored in work queue.
+
+ Parts download to a temp dir with each part a separate file
+
+ :param job: Glacier job object
+ :param work_queue: A queue of tuples which include the part_number and
+ part_size
+ :param result_queue: A priority queue of tuples which include the
+ part_number and the path to the temp file that holds that
+ part's data.
+
+ """
+ super(DownloadWorkerThread, self).__init__(worker_queue, result_queue)
+ self._job = job
+ self._num_retries = num_retries
+ self._time_between_retries = time_between_retries
+ self._retry_exceptions = retry_exceptions
+
+ def _process_chunk(self, work):
+ """
+ Attempt to download a part of the archive from Glacier
+ Store the result in the result_queue
+
+ :param work:
+ """
+ result = None
+ for _ in xrange(self._num_retries):
+ try:
+ result = self._download_chunk(work)
+ break
+ except self._retry_exceptions, e:
+ log.error("Exception caught downloading part number %s for "
+ "job %s", work[0], self._job,)
+ time.sleep(self._time_between_retries)
+ result = e
+ return result
+
+ def _download_chunk(self, work):
+ """
+ Downloads a chunk of archive from Glacier. Saves the data to a temp file
+ Returns the part number and temp file location
+
+ :param work:
+ """
+ part_number, part_size = work
+ start_byte = part_number * part_size
+ byte_range = (start_byte, start_byte + part_size - 1)
+ log.debug("Downloading chunk %s of size %s", part_number, part_size)
+ response = self._job.get_output(byte_range)
+ data = response.read()
+ actual_hash = bytes_to_hex(tree_hash(chunk_hashes(data)))
+ if response['TreeHash'] != actual_hash:
+ raise TreeHashDoesNotMatchError(
+ "Tree hash for part number %s does not match, "
+ "expected: %s, got: %s" % (part_number, response['TreeHash'],
+ actual_hash))
+ return (part_number, part_size, binascii.unhexlify(actual_hash), data)

Powered by Google App Engine
This is Rietveld 408576698