OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2012 Mitch Garnaat http://garnaat.org/ |
| 2 # All rights reserved. |
| 3 # |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a |
| 5 # copy of this software and associated documentation files (the |
| 6 # "Software"), to deal in the Software without restriction, including |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- |
| 10 # lowing conditions: |
| 11 # |
| 12 # The above copyright notice and this permission notice shall be included |
| 13 # in all copies or substantial portions of the Software. |
| 14 # |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 21 # IN THE SOFTWARE. |
| 22 |
| 23 """ |
| 24 Some unit tests for S3 Key |
| 25 """ |
| 26 |
| 27 from tests.unit import unittest |
| 28 import time |
| 29 import StringIO |
| 30 from boto.s3.connection import S3Connection |
| 31 from boto.s3.key import Key |
| 32 from boto.exception import S3ResponseError |
| 33 |
| 34 |
| 35 class S3KeyTest (unittest.TestCase): |
| 36 s3 = True |
| 37 |
| 38 def setUp(self): |
| 39 self.conn = S3Connection() |
| 40 self.bucket_name = 'keytest-%d' % int(time.time()) |
| 41 self.bucket = self.conn.create_bucket(self.bucket_name) |
| 42 |
| 43 def tearDown(self): |
| 44 for key in self.bucket: |
| 45 key.delete() |
| 46 self.bucket.delete() |
| 47 |
| 48 def test_set_contents_from_file_dataloss(self): |
| 49 # Create an empty stringio and write to it. |
| 50 content = "abcde" |
| 51 sfp = StringIO.StringIO() |
| 52 sfp.write(content) |
| 53 # Try set_contents_from_file() without rewinding sfp |
| 54 k = self.bucket.new_key("k") |
| 55 try: |
| 56 k.set_contents_from_file(sfp) |
| 57 self.fail("forgot to rewind so should fail.") |
| 58 except AttributeError: |
| 59 pass |
| 60 # call with rewind and check if we wrote 5 bytes |
| 61 k.set_contents_from_file(sfp, rewind=True) |
| 62 self.assertEqual(k.size, 5) |
| 63 # check actual contents by getting it. |
| 64 kn = self.bucket.new_key("k") |
| 65 ks = kn.get_contents_as_string() |
| 66 self.assertEqual(ks, content) |
| 67 |
| 68 # finally, try with a 0 length string |
| 69 sfp = StringIO.StringIO() |
| 70 k = self.bucket.new_key("k") |
| 71 k.set_contents_from_file(sfp) |
| 72 self.assertEqual(k.size, 0) |
| 73 # check actual contents by getting it. |
| 74 kn = self.bucket.new_key("k") |
| 75 ks = kn.get_contents_as_string() |
| 76 self.assertEqual(ks, "") |
| 77 |
| 78 def test_set_contents_as_file(self): |
| 79 content="01234567890123456789" |
| 80 sfp = StringIO.StringIO(content) |
| 81 |
| 82 # fp is set at 0 for just opened (for read) files. |
| 83 # set_contents should write full content to key. |
| 84 k = self.bucket.new_key("k") |
| 85 k.set_contents_from_file(sfp) |
| 86 self.assertEqual(k.size, 20) |
| 87 kn = self.bucket.new_key("k") |
| 88 ks = kn.get_contents_as_string() |
| 89 self.assertEqual(ks, content) |
| 90 |
| 91 # set fp to 5 and set contents. this should |
| 92 # set "567890123456789" to the key |
| 93 sfp.seek(5) |
| 94 k = self.bucket.new_key("k") |
| 95 k.set_contents_from_file(sfp) |
| 96 self.assertEqual(k.size, 15) |
| 97 kn = self.bucket.new_key("k") |
| 98 ks = kn.get_contents_as_string() |
| 99 self.assertEqual(ks, content[5:]) |
| 100 |
| 101 # set fp to 5 and only set 5 bytes. this should |
| 102 # write the value "56789" to the key. |
| 103 sfp.seek(5) |
| 104 k = self.bucket.new_key("k") |
| 105 k.set_contents_from_file(sfp, size=5) |
| 106 self.assertEqual(k.size, 5) |
| 107 self.assertEqual(sfp.tell(), 10) |
| 108 kn = self.bucket.new_key("k") |
| 109 ks = kn.get_contents_as_string() |
| 110 self.assertEqual(ks, content[5:10]) |
| 111 |
| 112 def test_set_contents_with_md5(self): |
| 113 content="01234567890123456789" |
| 114 sfp = StringIO.StringIO(content) |
| 115 |
| 116 # fp is set at 0 for just opened (for read) files. |
| 117 # set_contents should write full content to key. |
| 118 k = self.bucket.new_key("k") |
| 119 good_md5 = k.compute_md5(sfp) |
| 120 k.set_contents_from_file(sfp, md5=good_md5) |
| 121 kn = self.bucket.new_key("k") |
| 122 ks = kn.get_contents_as_string() |
| 123 self.assertEqual(ks, content) |
| 124 |
| 125 # set fp to 5 and only set 5 bytes. this should |
| 126 # write the value "56789" to the key. |
| 127 sfp.seek(5) |
| 128 k = self.bucket.new_key("k") |
| 129 good_md5 = k.compute_md5(sfp, size=5) |
| 130 k.set_contents_from_file(sfp, size=5, md5=good_md5) |
| 131 self.assertEqual(sfp.tell(), 10) |
| 132 kn = self.bucket.new_key("k") |
| 133 ks = kn.get_contents_as_string() |
| 134 self.assertEqual(ks, content[5:10]) |
| 135 |
| 136 # let's try a wrong md5 by just altering it. |
| 137 k = self.bucket.new_key("k") |
| 138 sfp.seek(0) |
| 139 hexdig, base64 = k.compute_md5(sfp) |
| 140 bad_md5 = (hexdig, base64[3:]) |
| 141 try: |
| 142 k.set_contents_from_file(sfp, md5=bad_md5) |
| 143 self.fail("should fail with bad md5") |
| 144 except S3ResponseError: |
| 145 pass |
| 146 |
| 147 def test_get_contents_with_md5(self): |
| 148 content="01234567890123456789" |
| 149 sfp = StringIO.StringIO(content) |
| 150 |
| 151 k = self.bucket.new_key("k") |
| 152 k.set_contents_from_file(sfp) |
| 153 kn = self.bucket.new_key("k") |
| 154 s = kn.get_contents_as_string() |
| 155 self.assertEqual(kn.md5, k.md5) |
| 156 self.assertEqual(s, content) |
| 157 |
| 158 def test_file_callback(self): |
| 159 def callback(wrote, total): |
| 160 self.my_cb_cnt += 1 |
| 161 self.assertNotEqual(wrote, self.my_cb_last, "called twice with same
value") |
| 162 self.my_cb_last = wrote |
| 163 |
| 164 # Zero bytes written => 1 call |
| 165 self.my_cb_cnt = 0 |
| 166 self.my_cb_last = None |
| 167 k = self.bucket.new_key("k") |
| 168 k.BufferSize = 2 |
| 169 sfp = StringIO.StringIO("") |
| 170 k.set_contents_from_file(sfp, cb=callback, num_cb=10) |
| 171 self.assertEqual(self.my_cb_cnt, 1) |
| 172 self.assertEqual(self.my_cb_last, 0) |
| 173 sfp.close() |
| 174 |
| 175 # Read back zero bytes => 1 call |
| 176 self.my_cb_cnt = 0 |
| 177 self.my_cb_last = None |
| 178 s = k.get_contents_as_string(cb=callback) |
| 179 self.assertEqual(self.my_cb_cnt, 1) |
| 180 self.assertEqual(self.my_cb_last, 0) |
| 181 |
| 182 content="01234567890123456789" |
| 183 sfp = StringIO.StringIO(content) |
| 184 |
| 185 # expect 2 calls due start/finish |
| 186 self.my_cb_cnt = 0 |
| 187 self.my_cb_last = None |
| 188 k = self.bucket.new_key("k") |
| 189 k.set_contents_from_file(sfp, cb=callback, num_cb=10) |
| 190 self.assertEqual(self.my_cb_cnt, 2) |
| 191 self.assertEqual(self.my_cb_last, 20) |
| 192 |
| 193 # Read back all bytes => 2 calls |
| 194 self.my_cb_cnt = 0 |
| 195 self.my_cb_last = None |
| 196 s = k.get_contents_as_string(cb=callback) |
| 197 self.assertEqual(self.my_cb_cnt, 2) |
| 198 self.assertEqual(self.my_cb_last, 20) |
| 199 self.assertEqual(s, content) |
| 200 |
| 201 # rewind sfp and try upload again. -1 should call |
| 202 # for every read/write so that should make 11 when bs=2 |
| 203 sfp.seek(0) |
| 204 self.my_cb_cnt = 0 |
| 205 self.my_cb_last = None |
| 206 k = self.bucket.new_key("k") |
| 207 k.BufferSize = 2 |
| 208 k.set_contents_from_file(sfp, cb=callback, num_cb=-1) |
| 209 self.assertEqual(self.my_cb_cnt, 11) |
| 210 self.assertEqual(self.my_cb_last, 20) |
| 211 |
| 212 # Read back all bytes => 11 calls |
| 213 self.my_cb_cnt = 0 |
| 214 self.my_cb_last = None |
| 215 s = k.get_contents_as_string(cb=callback, num_cb=-1) |
| 216 self.assertEqual(self.my_cb_cnt, 11) |
| 217 self.assertEqual(self.my_cb_last, 20) |
| 218 self.assertEqual(s, content) |
| 219 |
| 220 # no more than 1 times => 2 times |
| 221 # last time always 20 bytes |
| 222 sfp.seek(0) |
| 223 self.my_cb_cnt = 0 |
| 224 self.my_cb_last = None |
| 225 k = self.bucket.new_key("k") |
| 226 k.BufferSize = 2 |
| 227 k.set_contents_from_file(sfp, cb=callback, num_cb=1) |
| 228 self.assertTrue(self.my_cb_cnt <= 2) |
| 229 self.assertEqual(self.my_cb_last, 20) |
| 230 |
| 231 # no more than 1 times => 2 times |
| 232 self.my_cb_cnt = 0 |
| 233 self.my_cb_last = None |
| 234 s = k.get_contents_as_string(cb=callback, num_cb=1) |
| 235 self.assertTrue(self.my_cb_cnt <= 2) |
| 236 self.assertEqual(self.my_cb_last, 20) |
| 237 self.assertEqual(s, content) |
| 238 |
| 239 # no more than 2 times |
| 240 # last time always 20 bytes |
| 241 sfp.seek(0) |
| 242 self.my_cb_cnt = 0 |
| 243 self.my_cb_last = None |
| 244 k = self.bucket.new_key("k") |
| 245 k.BufferSize = 2 |
| 246 k.set_contents_from_file(sfp, cb=callback, num_cb=2) |
| 247 self.assertTrue(self.my_cb_cnt <= 2) |
| 248 self.assertEqual(self.my_cb_last, 20) |
| 249 |
| 250 # no more than 2 times |
| 251 self.my_cb_cnt = 0 |
| 252 self.my_cb_last = None |
| 253 s = k.get_contents_as_string(cb=callback, num_cb=2) |
| 254 self.assertTrue(self.my_cb_cnt <= 2) |
| 255 self.assertEqual(self.my_cb_last, 20) |
| 256 self.assertEqual(s, content) |
| 257 |
| 258 # no more than 3 times |
| 259 # last time always 20 bytes |
| 260 sfp.seek(0) |
| 261 self.my_cb_cnt = 0 |
| 262 self.my_cb_last = None |
| 263 k = self.bucket.new_key("k") |
| 264 k.BufferSize = 2 |
| 265 k.set_contents_from_file(sfp, cb=callback, num_cb=3) |
| 266 self.assertTrue(self.my_cb_cnt <= 3) |
| 267 self.assertEqual(self.my_cb_last, 20) |
| 268 |
| 269 # no more than 3 times |
| 270 self.my_cb_cnt = 0 |
| 271 self.my_cb_last = None |
| 272 s = k.get_contents_as_string(cb=callback, num_cb=3) |
| 273 self.assertTrue(self.my_cb_cnt <= 3) |
| 274 self.assertEqual(self.my_cb_last, 20) |
| 275 self.assertEqual(s, content) |
| 276 |
| 277 # no more than 4 times |
| 278 # last time always 20 bytes |
| 279 sfp.seek(0) |
| 280 self.my_cb_cnt = 0 |
| 281 self.my_cb_last = None |
| 282 k = self.bucket.new_key("k") |
| 283 k.BufferSize = 2 |
| 284 k.set_contents_from_file(sfp, cb=callback, num_cb=4) |
| 285 self.assertTrue(self.my_cb_cnt <= 4) |
| 286 self.assertEqual(self.my_cb_last, 20) |
| 287 |
| 288 # no more than 4 times |
| 289 self.my_cb_cnt = 0 |
| 290 self.my_cb_last = None |
| 291 s = k.get_contents_as_string(cb=callback, num_cb=4) |
| 292 self.assertTrue(self.my_cb_cnt <= 4) |
| 293 self.assertEqual(self.my_cb_last, 20) |
| 294 self.assertEqual(s, content) |
| 295 |
| 296 # no more than 6 times |
| 297 # last time always 20 bytes |
| 298 sfp.seek(0) |
| 299 self.my_cb_cnt = 0 |
| 300 self.my_cb_last = None |
| 301 k = self.bucket.new_key("k") |
| 302 k.BufferSize = 2 |
| 303 k.set_contents_from_file(sfp, cb=callback, num_cb=6) |
| 304 self.assertTrue(self.my_cb_cnt <= 6) |
| 305 self.assertEqual(self.my_cb_last, 20) |
| 306 |
| 307 # no more than 6 times |
| 308 self.my_cb_cnt = 0 |
| 309 self.my_cb_last = None |
| 310 s = k.get_contents_as_string(cb=callback, num_cb=6) |
| 311 self.assertTrue(self.my_cb_cnt <= 6) |
| 312 self.assertEqual(self.my_cb_last, 20) |
| 313 self.assertEqual(s, content) |
| 314 |
| 315 # no more than 10 times |
| 316 # last time always 20 bytes |
| 317 sfp.seek(0) |
| 318 self.my_cb_cnt = 0 |
| 319 self.my_cb_last = None |
| 320 k = self.bucket.new_key("k") |
| 321 k.BufferSize = 2 |
| 322 k.set_contents_from_file(sfp, cb=callback, num_cb=10) |
| 323 self.assertTrue(self.my_cb_cnt <= 10) |
| 324 self.assertEqual(self.my_cb_last, 20) |
| 325 |
| 326 # no more than 10 times |
| 327 self.my_cb_cnt = 0 |
| 328 self.my_cb_last = None |
| 329 s = k.get_contents_as_string(cb=callback, num_cb=10) |
| 330 self.assertTrue(self.my_cb_cnt <= 10) |
| 331 self.assertEqual(self.my_cb_last, 20) |
| 332 self.assertEqual(s, content) |
| 333 |
| 334 # no more than 1000 times |
| 335 # last time always 20 bytes |
| 336 sfp.seek(0) |
| 337 self.my_cb_cnt = 0 |
| 338 self.my_cb_last = None |
| 339 k = self.bucket.new_key("k") |
| 340 k.BufferSize = 2 |
| 341 k.set_contents_from_file(sfp, cb=callback, num_cb=1000) |
| 342 self.assertTrue(self.my_cb_cnt <= 1000) |
| 343 self.assertEqual(self.my_cb_last, 20) |
| 344 |
| 345 # no more than 1000 times |
| 346 self.my_cb_cnt = 0 |
| 347 self.my_cb_last = None |
| 348 s = k.get_contents_as_string(cb=callback, num_cb=1000) |
| 349 self.assertTrue(self.my_cb_cnt <= 1000) |
| 350 self.assertEqual(self.my_cb_last, 20) |
| 351 self.assertEqual(s, content) |
| 352 |
| 353 def test_website_redirects(self): |
| 354 self.bucket.configure_website('index.html') |
| 355 key = self.bucket.new_key('redirect-key') |
| 356 self.assertTrue(key.set_redirect('http://www.amazon.com/')) |
| 357 self.assertEqual(key.get_redirect(), 'http://www.amazon.com/') |
| 358 |
| 359 self.assertTrue(key.set_redirect('http://aws.amazon.com/')) |
| 360 self.assertEqual(key.get_redirect(), 'http://aws.amazon.com/') |
| 361 |
| 362 def test_website_redirect_none_configured(self): |
| 363 key = self.bucket.new_key('redirect-key') |
| 364 key.set_contents_from_string('') |
| 365 self.assertEqual(key.get_redirect(), None) |
| 366 |
| 367 def test_website_redirect_with_bad_value(self): |
| 368 self.bucket.configure_website('index.html') |
| 369 key = self.bucket.new_key('redirect-key') |
| 370 with self.assertRaises(key.provider.storage_response_error): |
| 371 # Must start with a / or http |
| 372 key.set_redirect('ftp://ftp.example.org') |
| 373 with self.assertRaises(key.provider.storage_response_error): |
| 374 # Must start with a / or http |
| 375 key.set_redirect('') |
OLD | NEW |