OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/ |
| 3 # |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a |
| 5 # copy of this software and associated documentation files (the |
| 6 # "Software"), to deal in the Software without restriction, including |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- |
| 10 # lowing conditions: |
| 11 # |
| 12 # The above copyright notice and this permission notice shall be included |
| 13 # in all copies or substantial portions of the Software. |
| 14 # |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 21 # IN THE SOFTWARE. |
| 22 |
| 23 """ |
| 24 Some unit tests for the S3Connection |
| 25 """ |
| 26 |
| 27 import unittest |
| 28 import time |
| 29 import os |
| 30 import urllib |
| 31 import urlparse |
| 32 import httplib |
| 33 from boto.s3.connection import S3Connection |
| 34 from boto.s3.bucket import Bucket |
| 35 from boto.exception import S3PermissionsError, S3ResponseError |
| 36 |
| 37 |
| 38 class S3ConnectionTest (unittest.TestCase): |
| 39 s3 = True |
| 40 |
| 41 def test_1_basic(self): |
| 42 print '--- running S3Connection tests ---' |
| 43 c = S3Connection() |
| 44 # create a new, empty bucket |
| 45 bucket_name = 'test-%d' % int(time.time()) |
| 46 bucket = c.create_bucket(bucket_name) |
| 47 # now try a get_bucket call and see if it's really there |
| 48 bucket = c.get_bucket(bucket_name) |
| 49 # test logging |
| 50 logging_bucket = c.create_bucket(bucket_name + '-log') |
| 51 logging_bucket.set_as_logging_target() |
| 52 bucket.enable_logging(target_bucket=logging_bucket, target_prefix=bucket
.name) |
| 53 bucket.disable_logging() |
| 54 c.delete_bucket(logging_bucket) |
| 55 k = bucket.new_key('foobar') |
| 56 s1 = 'This is a test of file upload and download' |
| 57 s2 = 'This is a second string to test file upload and download' |
| 58 k.set_contents_from_string(s1) |
| 59 fp = open('foobar', 'wb') |
| 60 # now get the contents from s3 to a local file |
| 61 k.get_contents_to_file(fp) |
| 62 fp.close() |
| 63 fp = open('foobar') |
| 64 # check to make sure content read from s3 is identical to original |
| 65 assert s1 == fp.read(), 'corrupted file' |
| 66 fp.close() |
| 67 # test generated URLs |
| 68 url = k.generate_url(3600) |
| 69 file = urllib.urlopen(url) |
| 70 assert s1 == file.read(), 'invalid URL %s' % url |
| 71 url = k.generate_url(3600, force_http=True) |
| 72 file = urllib.urlopen(url) |
| 73 assert s1 == file.read(), 'invalid URL %s' % url |
| 74 url = k.generate_url(3600, force_http=True, headers={'x-amz-x-token' : '
XYZ'}) |
| 75 file = urllib.urlopen(url) |
| 76 assert s1 == file.read(), 'invalid URL %s' % url |
| 77 rh = {'response-content-disposition': 'attachment; filename="foo.txt"'} |
| 78 url = k.generate_url(60, response_headers=rh) |
| 79 file = urllib.urlopen(url) |
| 80 assert s1 == file.read(), 'invalid URL %s' % url |
| 81 #test whether amperands and to-be-escaped characters work in header file
name |
| 82 rh = {'response-content-disposition': 'attachment; filename="foo&z%20ar&
ar&zar&bar.txt"'} |
| 83 url = k.generate_url(60, response_headers=rh, force_http=True) |
| 84 file = urllib.urlopen(url) |
| 85 assert s1 == file.read(), 'invalid URL %s' % url |
| 86 # overwrite foobar contents with a PUT |
| 87 url = k.generate_url(3600, 'PUT', force_http=True, policy='private', red
uced_redundancy=True) |
| 88 up = urlparse.urlsplit(url) |
| 89 con = httplib.HTTPConnection(up.hostname, up.port) |
| 90 con.request("PUT", up.path + '?' + up.query, body="hello there") |
| 91 resp = con.getresponse() |
| 92 assert 200 == resp.status |
| 93 assert "hello there" == k.get_contents_as_string() |
| 94 bucket.delete_key(k) |
| 95 # test a few variations on get_all_keys - first load some data |
| 96 # for the first one, let's override the content type |
| 97 phony_mimetype = 'application/x-boto-test' |
| 98 headers = {'Content-Type': phony_mimetype} |
| 99 k.name = 'foo/bar' |
| 100 k.set_contents_from_string(s1, headers) |
| 101 k.name = 'foo/bas' |
| 102 k.set_contents_from_filename('foobar') |
| 103 k.name = 'foo/bat' |
| 104 k.set_contents_from_string(s1) |
| 105 k.name = 'fie/bar' |
| 106 k.set_contents_from_string(s1) |
| 107 k.name = 'fie/bas' |
| 108 k.set_contents_from_string(s1) |
| 109 k.name = 'fie/bat' |
| 110 k.set_contents_from_string(s1) |
| 111 # try resetting the contents to another value |
| 112 md5 = k.md5 |
| 113 k.set_contents_from_string(s2) |
| 114 assert k.md5 != md5 |
| 115 os.unlink('foobar') |
| 116 all = bucket.get_all_keys() |
| 117 assert len(all) == 6 |
| 118 rs = bucket.get_all_keys(prefix='foo') |
| 119 assert len(rs) == 3 |
| 120 rs = bucket.get_all_keys(prefix='', delimiter='/') |
| 121 assert len(rs) == 2 |
| 122 rs = bucket.get_all_keys(maxkeys=5) |
| 123 assert len(rs) == 5 |
| 124 # test the lookup method |
| 125 k = bucket.lookup('foo/bar') |
| 126 assert isinstance(k, bucket.key_class) |
| 127 assert k.content_type == phony_mimetype |
| 128 k = bucket.lookup('notthere') |
| 129 assert k == None |
| 130 # try some metadata stuff |
| 131 k = bucket.new_key('has_metadata') |
| 132 mdkey1 = 'meta1' |
| 133 mdval1 = 'This is the first metadata value' |
| 134 k.set_metadata(mdkey1, mdval1) |
| 135 mdkey2 = 'meta2' |
| 136 mdval2 = 'This is the second metadata value' |
| 137 k.set_metadata(mdkey2, mdval2) |
| 138 # try a unicode metadata value |
| 139 mdval3 = u'föö' |
| 140 mdkey3 = 'meta3' |
| 141 k.set_metadata(mdkey3, mdval3) |
| 142 k.set_contents_from_string(s1) |
| 143 k = bucket.lookup('has_metadata') |
| 144 assert k.get_metadata(mdkey1) == mdval1 |
| 145 assert k.get_metadata(mdkey2) == mdval2 |
| 146 assert k.get_metadata(mdkey3) == mdval3 |
| 147 k = bucket.new_key('has_metadata') |
| 148 k.get_contents_as_string() |
| 149 assert k.get_metadata(mdkey1) == mdval1 |
| 150 assert k.get_metadata(mdkey2) == mdval2 |
| 151 assert k.get_metadata(mdkey3) == mdval3 |
| 152 bucket.delete_key(k) |
| 153 # test list and iterator |
| 154 rs1 = bucket.list() |
| 155 num_iter = 0 |
| 156 for r in rs1: |
| 157 num_iter = num_iter + 1 |
| 158 rs = bucket.get_all_keys() |
| 159 num_keys = len(rs) |
| 160 assert num_iter == num_keys |
| 161 # try a key with a funny character |
| 162 k = bucket.new_key('testnewline\n') |
| 163 k.set_contents_from_string('This is a test') |
| 164 rs = bucket.get_all_keys() |
| 165 assert len(rs) == num_keys + 1 |
| 166 bucket.delete_key(k) |
| 167 rs = bucket.get_all_keys() |
| 168 assert len(rs) == num_keys |
| 169 # try some acl stuff |
| 170 bucket.set_acl('public-read') |
| 171 policy = bucket.get_acl() |
| 172 assert len(policy.acl.grants) == 2 |
| 173 bucket.set_acl('private') |
| 174 policy = bucket.get_acl() |
| 175 assert len(policy.acl.grants) == 1 |
| 176 k = bucket.lookup('foo/bar') |
| 177 k.set_acl('public-read') |
| 178 policy = k.get_acl() |
| 179 assert len(policy.acl.grants) == 2 |
| 180 k.set_acl('private') |
| 181 policy = k.get_acl() |
| 182 assert len(policy.acl.grants) == 1 |
| 183 # try the convenience methods for grants |
| 184 bucket.add_user_grant('FULL_CONTROL', |
| 185 'c1e724fbfa0979a4448393c59a8c055011f739b6d102fb37a
65f26414653cd67') |
| 186 try: |
| 187 bucket.add_email_grant('foobar', 'foo@bar.com') |
| 188 except S3PermissionsError: |
| 189 pass |
| 190 # now try to create an RRS key |
| 191 k = bucket.new_key('reduced_redundancy') |
| 192 k.set_contents_from_string('This key has reduced redundancy', |
| 193 reduced_redundancy=True) |
| 194 |
| 195 # now try to inject a response header |
| 196 data = k.get_contents_as_string(response_headers={'response-content-type
' : 'foo/bar'}) |
| 197 assert k.content_type == 'foo/bar' |
| 198 |
| 199 # now delete all keys in bucket |
| 200 for k in bucket: |
| 201 if k.name == 'reduced_redundancy': |
| 202 assert k.storage_class == 'REDUCED_REDUNDANCY' |
| 203 bucket.delete_key(k) |
| 204 # now delete bucket |
| 205 time.sleep(5) |
| 206 c.delete_bucket(bucket) |
| 207 print '--- tests completed ---' |
| 208 |
| 209 def test_basic_anon(self): |
| 210 auth_con = S3Connection() |
| 211 # create a new, empty bucket |
| 212 bucket_name = 'test-%d' % int(time.time()) |
| 213 auth_bucket = auth_con.create_bucket(bucket_name) |
| 214 |
| 215 # try read the bucket anonymously |
| 216 anon_con = S3Connection(anon=True) |
| 217 anon_bucket = Bucket(anon_con, bucket_name) |
| 218 try: |
| 219 iter(anon_bucket.list()).next() |
| 220 self.fail("anon bucket list should fail") |
| 221 except S3ResponseError: |
| 222 pass |
| 223 |
| 224 # give bucket anon user access and anon read again |
| 225 auth_bucket.set_acl('public-read') |
| 226 try: |
| 227 iter(anon_bucket.list()).next() |
| 228 self.fail("not expecting contents") |
| 229 except S3ResponseError, e: |
| 230 self.fail("We should have public-read access, but received " |
| 231 "an error: %s" % e) |
| 232 except StopIteration: |
| 233 pass |
| 234 |
| 235 # cleanup |
| 236 auth_con.delete_bucket(auth_bucket) |
| 237 |
| 238 def test_error_code_populated(self): |
| 239 c = S3Connection() |
| 240 try: |
| 241 c.create_bucket('bad$bucket$name') |
| 242 except S3ResponseError, e: |
| 243 self.assertEqual(e.error_code, 'InvalidBucketName') |
| 244 else: |
| 245 self.fail("S3ResponseError not raised.") |
OLD | NEW |