Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(300)

Side by Side Diff: third_party/gsutil/boto/glacier/concurrent.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Removed gsutil/tests and gsutil/docs Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 #
22 import os
23 import math
24 import threading
25 import hashlib
26 import time
27 import logging
28 from Queue import Queue, Empty
29 import binascii
30
31 from .utils import DEFAULT_PART_SIZE, minimum_part_size, chunk_hashes, \
32 tree_hash, bytes_to_hex
33 from .exceptions import UploadArchiveError, DownloadArchiveError, \
34 TreeHashDoesNotMatchError
35
36
37 _END_SENTINEL = object()
38 log = logging.getLogger('boto.glacier.concurrent')
39
40
41 class ConcurrentTransferer(object):
42 def __init__(self, part_size=DEFAULT_PART_SIZE, num_threads=10):
43 self._part_size = part_size
44 self._num_threads = num_threads
45 self._threads = []
46
47 def _calculate_required_part_size(self, total_size):
48 min_part_size_required = minimum_part_size(total_size)
49 if self._part_size >= min_part_size_required:
50 part_size = self._part_size
51 else:
52 part_size = min_part_size_required
53 log.debug("The part size specified (%s) is smaller than "
54 "the minimum required part size. Using a part "
55 "size of: %s", self._part_size, part_size)
56 total_parts = int(math.ceil(total_size / float(part_size)))
57 return total_parts, part_size
58
59 def _shutdown_threads(self):
60 log.debug("Shutting down threads.")
61 for thread in self._threads:
62 thread.should_continue = False
63 for thread in self._threads:
64 thread.join()
65 log.debug("Threads have exited.")
66
67 def _add_work_items_to_queue(self, total_parts, worker_queue, part_size):
68 log.debug("Adding work items to queue.")
69 for i in xrange(total_parts):
70 worker_queue.put((i, part_size))
71 for i in xrange(self._num_threads):
72 worker_queue.put(_END_SENTINEL)
73
74
75 class ConcurrentUploader(ConcurrentTransferer):
76 """Concurrently upload an archive to glacier.
77
78 This class uses a thread pool to concurrently upload an archive
79 to glacier using the multipart upload API.
80
81 The threadpool is completely managed by this class and is
82 transparent to the users of this class.
83
84 """
85 def __init__(self, api, vault_name, part_size=DEFAULT_PART_SIZE,
86 num_threads=10):
87 """
88 :type api: :class:`boto.glacier.layer1.Layer1`
89 :param api: A layer1 glacier object.
90
91 :type vault_name: str
92 :param vault_name: The name of the vault.
93
94 :type part_size: int
95 :param part_size: The size, in bytes, of the chunks to use when uploadin g
96 the archive parts. The part size must be a megabyte multiplied by
97 a power of two.
98
99 """
100 super(ConcurrentUploader, self).__init__(part_size, num_threads)
101 self._api = api
102 self._vault_name = vault_name
103
104 def upload(self, filename, description=None):
105 """Concurrently create an archive.
106
107 The part_size value specified when the class was constructed
108 will be used *unless* it is smaller than the minimum required
109 part size needed for the size of the given file. In that case,
110 the part size used will be the minimum part size required
111 to properly upload the given file.
112
113 :type file: str
114 :param file: The filename to upload
115
116 :type description: str
117 :param description: The description of the archive.
118
119 :rtype: str
120 :return: The archive id of the newly created archive.
121
122 """
123 total_size = os.stat(filename).st_size
124 total_parts, part_size = self._calculate_required_part_size(total_size)
125 hash_chunks = [None] * total_parts
126 worker_queue = Queue()
127 result_queue = Queue()
128 response = self._api.initiate_multipart_upload(self._vault_name,
129 part_size,
130 description)
131 upload_id = response['UploadId']
132 # The basic idea is to add the chunks (the offsets not the actual
133 # contents) to a work queue, start up a thread pool, let the crank
134 # through the items in the work queue, and then place their results
135 # in a result queue which we use to complete the multipart upload.
136 self._add_work_items_to_queue(total_parts, worker_queue, part_size)
137 self._start_upload_threads(result_queue, upload_id,
138 worker_queue, filename)
139 try:
140 self._wait_for_upload_threads(hash_chunks, result_queue,
141 total_parts)
142 except UploadArchiveError, e:
143 log.debug("An error occurred while uploading an archive, "
144 "aborting multipart upload.")
145 self._api.abort_multipart_upload(self._vault_name, upload_id)
146 raise e
147 log.debug("Completing upload.")
148 response = self._api.complete_multipart_upload(
149 self._vault_name, upload_id, bytes_to_hex(tree_hash(hash_chunks)),
150 total_size)
151 log.debug("Upload finished.")
152 return response['ArchiveId']
153
154 def _wait_for_upload_threads(self, hash_chunks, result_queue, total_parts):
155 for _ in xrange(total_parts):
156 result = result_queue.get()
157 if isinstance(result, Exception):
158 log.debug("An error was found in the result queue, terminating "
159 "threads: %s", result)
160 self._shutdown_threads()
161 raise UploadArchiveError("An error occurred while uploading "
162 "an archive: %s" % result)
163 # Each unit of work returns the tree hash for the given part
164 # number, which we use at the end to compute the tree hash of
165 # the entire archive.
166 part_number, tree_sha256 = result
167 hash_chunks[part_number] = tree_sha256
168 self._shutdown_threads()
169
170 def _start_upload_threads(self, result_queue, upload_id, worker_queue,
171 filename):
172 log.debug("Starting threads.")
173 for _ in xrange(self._num_threads):
174 thread = UploadWorkerThread(self._api, self._vault_name, filename,
175 upload_id, worker_queue, result_queue)
176 time.sleep(0.2)
177 thread.start()
178 self._threads.append(thread)
179
180
181 class TransferThread(threading.Thread):
182 def __init__(self, worker_queue, result_queue):
183 super(TransferThread, self).__init__()
184 self._worker_queue = worker_queue
185 self._result_queue = result_queue
186 # This value can be set externally by other objects
187 # to indicate that the thread should be shut down.
188 self.should_continue = True
189
190 def run(self):
191 while self.should_continue:
192 try:
193 work = self._worker_queue.get(timeout=1)
194 except Empty:
195 continue
196 if work is _END_SENTINEL:
197 return
198 result = self._process_chunk(work)
199 self._result_queue.put(result)
200
201 def _process_chunk(self, work):
202 pass
203
204
205 class UploadWorkerThread(TransferThread):
206 def __init__(self, api, vault_name, filename, upload_id,
207 worker_queue, result_queue, num_retries=5,
208 time_between_retries=5,
209 retry_exceptions=Exception):
210 super(UploadWorkerThread, self).__init__(worker_queue, result_queue)
211 self._api = api
212 self._vault_name = vault_name
213 self._filename = filename
214 self._fileobj = open(filename, 'rb')
215 self._upload_id = upload_id
216 self._num_retries = num_retries
217 self._time_between_retries = time_between_retries
218 self._retry_exceptions = retry_exceptions
219
220 def _process_chunk(self, work):
221 result = None
222 for _ in xrange(self._num_retries):
223 try:
224 result = self._upload_chunk(work)
225 break
226 except self._retry_exceptions, e:
227 log.error("Exception caught uploading part number %s for "
228 "vault %s, filename: %s", work[0], self._vault_name,
229 self._filename)
230 time.sleep(self._time_between_retries)
231 result = e
232 return result
233
234 def _upload_chunk(self, work):
235 part_number, part_size = work
236 start_byte = part_number * part_size
237 self._fileobj.seek(start_byte)
238 contents = self._fileobj.read(part_size)
239 linear_hash = hashlib.sha256(contents).hexdigest()
240 tree_hash_bytes = tree_hash(chunk_hashes(contents))
241 byte_range = (start_byte, start_byte + len(contents) - 1)
242 log.debug("Uploading chunk %s of size %s", part_number, part_size)
243 response = self._api.upload_part(self._vault_name, self._upload_id,
244 linear_hash,
245 bytes_to_hex(tree_hash_bytes),
246 byte_range, contents)
247 # Reading the response allows the connection to be reused.
248 response.read()
249 return (part_number, tree_hash_bytes)
250
251
252 class ConcurrentDownloader(ConcurrentTransferer):
253 """
254 Concurrently download an archive from glacier.
255
256 This class uses a thread pool to concurrently download an archive
257 from glacier.
258
259 The threadpool is completely managed by this class and is
260 transparent to the users of this class.
261
262 """
263 def __init__(self, job, part_size=DEFAULT_PART_SIZE,
264 num_threads=10):
265 """
266 :param job: A layer2 job object for archive retrieval object.
267
268 :param part_size: The size, in bytes, of the chunks to use when uploadin g
269 the archive parts. The part size must be a megabyte multiplied by
270 a power of two.
271
272 """
273 super(ConcurrentDownloader, self).__init__(part_size, num_threads)
274 self._job = job
275
276 def download(self, filename):
277 """
278 Concurrently download an archive.
279
280 :param filename: The filename to download the archive to
281 :type filename: str
282
283 """
284 total_size = self._job.archive_size
285 total_parts, part_size = self._calculate_required_part_size(total_size)
286 worker_queue = Queue()
287 result_queue = Queue()
288 self._add_work_items_to_queue(total_parts, worker_queue, part_size)
289 self._start_download_threads(result_queue, worker_queue)
290 try:
291 self._wait_for_download_threads(filename, result_queue, total_parts)
292 except DownloadArchiveError, e:
293 log.debug("An error occurred while downloading an archive: %s", e)
294 raise e
295 log.debug("Download completed.")
296
297 def _wait_for_download_threads(self, filename, result_queue, total_parts):
298 """
299 Waits until the result_queue is filled with all the downloaded parts
300 This indicates that all part downloads have completed
301
302 Saves downloaded parts into filename
303
304 :param filename:
305 :param result_queue:
306 :param total_parts:
307 """
308 hash_chunks = [None] * total_parts
309 with open(filename, "wb") as f:
310 for _ in xrange(total_parts):
311 result = result_queue.get()
312 if isinstance(result, Exception):
313 log.debug("An error was found in the result queue, "
314 "terminating threads: %s", result)
315 self._shutdown_threads()
316 raise DownloadArchiveError(
317 "An error occurred while uploading "
318 "an archive: %s" % result)
319 part_number, part_size, actual_hash, data = result
320 hash_chunks[part_number] = actual_hash
321 start_byte = part_number * part_size
322 f.seek(start_byte)
323 f.write(data)
324 f.flush()
325 final_hash = bytes_to_hex(tree_hash(hash_chunks))
326 log.debug("Verifying final tree hash of archive, expecting: %s, "
327 "actual: %s", self._job.sha256_treehash, final_hash)
328 if self._job.sha256_treehash != final_hash:
329 self._shutdown_threads()
330 raise TreeHashDoesNotMatchError(
331 "Tree hash for entire archive does not match, "
332 "expected: %s, got: %s" % (self._job.sha256_treehash,
333 final_hash))
334 self._shutdown_threads()
335
336 def _start_download_threads(self, result_queue, worker_queue):
337 log.debug("Starting threads.")
338 for _ in xrange(self._num_threads):
339 thread = DownloadWorkerThread(self._job, worker_queue, result_queue)
340 time.sleep(0.2)
341 thread.start()
342 self._threads.append(thread)
343
344
345 class DownloadWorkerThread(TransferThread):
346 def __init__(self, job,
347 worker_queue, result_queue,
348 num_retries=5,
349 time_between_retries=5,
350 retry_exceptions=Exception):
351 """
352 Individual download thread that will download parts of the file from Gla cier. Parts
353 to download stored in work queue.
354
355 Parts download to a temp dir with each part a separate file
356
357 :param job: Glacier job object
358 :param work_queue: A queue of tuples which include the part_number and
359 part_size
360 :param result_queue: A priority queue of tuples which include the
361 part_number and the path to the temp file that holds that
362 part's data.
363
364 """
365 super(DownloadWorkerThread, self).__init__(worker_queue, result_queue)
366 self._job = job
367 self._num_retries = num_retries
368 self._time_between_retries = time_between_retries
369 self._retry_exceptions = retry_exceptions
370
371 def _process_chunk(self, work):
372 """
373 Attempt to download a part of the archive from Glacier
374 Store the result in the result_queue
375
376 :param work:
377 """
378 result = None
379 for _ in xrange(self._num_retries):
380 try:
381 result = self._download_chunk(work)
382 break
383 except self._retry_exceptions, e:
384 log.error("Exception caught downloading part number %s for "
385 "job %s", work[0], self._job,)
386 time.sleep(self._time_between_retries)
387 result = e
388 return result
389
390 def _download_chunk(self, work):
391 """
392 Downloads a chunk of archive from Glacier. Saves the data to a temp file
393 Returns the part number and temp file location
394
395 :param work:
396 """
397 part_number, part_size = work
398 start_byte = part_number * part_size
399 byte_range = (start_byte, start_byte + part_size - 1)
400 log.debug("Downloading chunk %s of size %s", part_number, part_size)
401 response = self._job.get_output(byte_range)
402 data = response.read()
403 actual_hash = bytes_to_hex(tree_hash(chunk_hashes(data)))
404 if response['TreeHash'] != actual_hash:
405 raise TreeHashDoesNotMatchError(
406 "Tree hash for part number %s does not match, "
407 "expected: %s, got: %s" % (part_number, response['TreeHash'],
408 actual_hash))
409 return (part_number, part_size, binascii.unhexlify(actual_hash), data)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698