| OLD | NEW |
| (Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright (c) 2012 Thomas Parslow http://almostobsolete.net/ |
| 3 # Copyright (c) 2012 Robie Basak <robie@justgohome.co.uk> |
| 4 # Tree hash implementation from Aaron Brady bradya@gmail.com |
| 5 # |
| 6 # Permission is hereby granted, free of charge, to any person obtaining a |
| 7 # copy of this software and associated documentation files (the |
| 8 # "Software"), to deal in the Software without restriction, including |
| 9 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 10 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 11 # persons to whom the Software is furnished to do so, subject to the fol- |
| 12 # lowing conditions: |
| 13 # |
| 14 # The above copyright notice and this permission notice shall be included |
| 15 # in all copies or substantial portions of the Software. |
| 16 # |
| 17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 18 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 19 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 20 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 21 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 23 # IN THE SOFTWARE. |
| 24 # |
| 25 import hashlib |
| 26 |
| 27 from boto.glacier.utils import chunk_hashes, tree_hash, bytes_to_hex |
| 28 # This import is provided for backwards compatibility. This function is |
| 29 # now in boto.glacier.utils, but any existing code can still import |
| 30 # this directly from this module. |
| 31 from boto.glacier.utils import compute_hashes_from_fileobj |
| 32 |
| 33 |
| 34 _ONE_MEGABYTE = 1024 * 1024 |
| 35 |
| 36 |
| 37 class _Partitioner(object): |
| 38 """Convert variable-size writes into part-sized writes |
| 39 |
| 40 Call write(data) with variable sized data as needed to write all data. Call |
| 41 flush() after all data is written. |
| 42 |
| 43 This instance will call send_fn(part_data) as needed in part_size pieces, |
| 44 except for the final part which may be shorter than part_size. Make sure to |
| 45 call flush() to ensure that a short final part results in a final send_fn |
| 46 call. |
| 47 |
| 48 """ |
| 49 def __init__(self, part_size, send_fn): |
| 50 self.part_size = part_size |
| 51 self.send_fn = send_fn |
| 52 self._buffer = [] |
| 53 self._buffer_size = 0 |
| 54 |
| 55 def write(self, data): |
| 56 if data == '': |
| 57 return |
| 58 self._buffer.append(data) |
| 59 self._buffer_size += len(data) |
| 60 while self._buffer_size > self.part_size: |
| 61 self._send_part() |
| 62 |
| 63 def _send_part(self): |
| 64 data = ''.join(self._buffer) |
| 65 # Put back any data remaining over the part size into the |
| 66 # buffer |
| 67 if len(data) > self.part_size: |
| 68 self._buffer = [data[self.part_size:]] |
| 69 self._buffer_size = len(self._buffer[0]) |
| 70 else: |
| 71 self._buffer = [] |
| 72 self._buffer_size = 0 |
| 73 # The part we will send |
| 74 part = data[:self.part_size] |
| 75 self.send_fn(part) |
| 76 |
| 77 def flush(self): |
| 78 if self._buffer_size > 0: |
| 79 self._send_part() |
| 80 |
| 81 |
| 82 class _Uploader(object): |
| 83 """Upload to a Glacier upload_id. |
| 84 |
| 85 Call upload_part for each part (in any order) and then close to complete |
| 86 the upload. |
| 87 |
| 88 """ |
| 89 def __init__(self, vault, upload_id, part_size, chunk_size=_ONE_MEGABYTE): |
| 90 self.vault = vault |
| 91 self.upload_id = upload_id |
| 92 self.part_size = part_size |
| 93 self.chunk_size = chunk_size |
| 94 self.archive_id = None |
| 95 |
| 96 self._uploaded_size = 0 |
| 97 self._tree_hashes = [] |
| 98 |
| 99 self.closed = False |
| 100 |
| 101 def _insert_tree_hash(self, index, raw_tree_hash): |
| 102 list_length = len(self._tree_hashes) |
| 103 if index >= list_length: |
| 104 self._tree_hashes.extend([None] * (list_length - index + 1)) |
| 105 self._tree_hashes[index] = raw_tree_hash |
| 106 |
| 107 def upload_part(self, part_index, part_data): |
| 108 """Upload a part to Glacier. |
| 109 |
| 110 :param part_index: part number where 0 is the first part |
| 111 :param part_data: data to upload corresponding to this part |
| 112 |
| 113 """ |
| 114 if self.closed: |
| 115 raise ValueError("I/O operation on closed file") |
| 116 # Create a request and sign it |
| 117 part_tree_hash = tree_hash(chunk_hashes(part_data, self.chunk_size)) |
| 118 self._insert_tree_hash(part_index, part_tree_hash) |
| 119 |
| 120 hex_tree_hash = bytes_to_hex(part_tree_hash) |
| 121 linear_hash = hashlib.sha256(part_data).hexdigest() |
| 122 start = self.part_size * part_index |
| 123 content_range = (start, |
| 124 (start + len(part_data)) - 1) |
| 125 response = self.vault.layer1.upload_part(self.vault.name, |
| 126 self.upload_id, |
| 127 linear_hash, |
| 128 hex_tree_hash, |
| 129 content_range, part_data) |
| 130 response.read() |
| 131 self._uploaded_size += len(part_data) |
| 132 |
| 133 def skip_part(self, part_index, part_tree_hash, part_length): |
| 134 """Skip uploading of a part. |
| 135 |
| 136 The final close call needs to calculate the tree hash and total size |
| 137 of all uploaded data, so this is the mechanism for resume |
| 138 functionality to provide it without actually uploading the data again. |
| 139 |
| 140 :param part_index: part number where 0 is the first part |
| 141 :param part_tree_hash: binary tree_hash of part being skipped |
| 142 :param part_length: length of part being skipped |
| 143 |
| 144 """ |
| 145 if self.closed: |
| 146 raise ValueError("I/O operation on closed file") |
| 147 self._insert_tree_hash(part_index, part_tree_hash) |
| 148 self._uploaded_size += part_length |
| 149 |
| 150 def close(self): |
| 151 if self.closed: |
| 152 return |
| 153 if None in self._tree_hashes: |
| 154 raise RuntimeError("Some parts were not uploaded.") |
| 155 # Complete the multiplart glacier upload |
| 156 hex_tree_hash = bytes_to_hex(tree_hash(self._tree_hashes)) |
| 157 response = self.vault.layer1.complete_multipart_upload( |
| 158 self.vault.name, self.upload_id, hex_tree_hash, |
| 159 self._uploaded_size) |
| 160 self.archive_id = response['ArchiveId'] |
| 161 self.closed = True |
| 162 |
| 163 |
| 164 def generate_parts_from_fobj(fobj, part_size): |
| 165 data = fobj.read(part_size) |
| 166 while data: |
| 167 yield data |
| 168 data = fobj.read(part_size) |
| 169 |
| 170 |
| 171 def resume_file_upload(vault, upload_id, part_size, fobj, part_hash_map, |
| 172 chunk_size=_ONE_MEGABYTE): |
| 173 """Resume upload of a file already part-uploaded to Glacier. |
| 174 |
| 175 The resumption of an upload where the part-uploaded section is empty is a |
| 176 valid degenerate case that this function can handle. In this case, |
| 177 part_hash_map should be an empty dict. |
| 178 |
| 179 :param vault: boto.glacier.vault.Vault object. |
| 180 :param upload_id: existing Glacier upload id of upload being resumed. |
| 181 :param part_size: part size of existing upload. |
| 182 :param fobj: file object containing local data to resume. This must read |
| 183 from the start of the entire upload, not just from the point being |
| 184 resumed. Use fobj.seek(0) to achieve this if necessary. |
| 185 :param part_hash_map: {part_index: part_tree_hash, ...} of data already |
| 186 uploaded. Each supplied part_tree_hash will be verified and the part |
| 187 re-uploaded if there is a mismatch. |
| 188 :param chunk_size: chunk size of tree hash calculation. This must be |
| 189 1 MiB for Amazon. |
| 190 |
| 191 """ |
| 192 uploader = _Uploader(vault, upload_id, part_size, chunk_size) |
| 193 for part_index, part_data in enumerate( |
| 194 generate_parts_from_fobj(fobj, part_size)): |
| 195 part_tree_hash = tree_hash(chunk_hashes(part_data, chunk_size)) |
| 196 if (part_index not in part_hash_map or |
| 197 part_hash_map[part_index] != part_tree_hash): |
| 198 uploader.upload_part(part_index, part_data) |
| 199 else: |
| 200 uploader.skip_part(part_index, part_tree_hash, len(part_data)) |
| 201 uploader.close() |
| 202 return uploader.archive_id |
| 203 |
| 204 |
| 205 class Writer(object): |
| 206 """ |
| 207 Presents a file-like object for writing to a Amazon Glacier |
| 208 Archive. The data is written using the multi-part upload API. |
| 209 """ |
| 210 def __init__(self, vault, upload_id, part_size, chunk_size=_ONE_MEGABYTE): |
| 211 self.uploader = _Uploader(vault, upload_id, part_size, chunk_size) |
| 212 self.partitioner = _Partitioner(part_size, self._upload_part) |
| 213 self.closed = False |
| 214 self.next_part_index = 0 |
| 215 |
| 216 def write(self, data): |
| 217 if self.closed: |
| 218 raise ValueError("I/O operation on closed file") |
| 219 self.partitioner.write(data) |
| 220 |
| 221 def _upload_part(self, part_data): |
| 222 self.uploader.upload_part(self.next_part_index, part_data) |
| 223 self.next_part_index += 1 |
| 224 |
| 225 def close(self): |
| 226 if self.closed: |
| 227 return |
| 228 self.partitioner.flush() |
| 229 self.uploader.close() |
| 230 self.closed = True |
| 231 |
| 232 def get_archive_id(self): |
| 233 self.close() |
| 234 return self.uploader.archive_id |
| 235 |
| 236 @property |
| 237 def upload_id(self): |
| 238 return self.uploader.upload_id |
| 239 |
| 240 @property |
| 241 def vault(self): |
| 242 return self.uploader.vault |
| OLD | NEW |