OLD | NEW |
(Empty) | |
| 1 import tempfile |
| 2 import unittest |
| 3 try: |
| 4 import simplejson as json |
| 5 except ImportError: |
| 6 import json |
| 7 from textwrap import dedent |
| 8 |
| 9 from boto.cloudfront.distribution import Distribution |
| 10 |
| 11 class CloudfrontSignedUrlsTest(unittest.TestCase): |
| 12 |
| 13 cloudfront = True |
| 14 notdefault = True |
| 15 |
| 16 def setUp(self): |
| 17 self.pk_str = dedent(""" |
| 18 -----BEGIN RSA PRIVATE KEY----- |
| 19 MIICXQIBAAKBgQDA7ki9gI/lRygIoOjV1yymgx6FYFlzJ+z1ATMaLo57nL57AavW |
| 20 hb68HYY8EA0GJU9xQdMVaHBogF3eiCWYXSUZCWM/+M5+ZcdQraRRScucmn6g4EvY |
| 21 2K4W2pxbqH8vmUikPxir41EeBPLjMOzKvbzzQy9e/zzIQVREKSp/7y1mywIDAQAB |
| 22 AoGABc7mp7XYHynuPZxChjWNJZIq+A73gm0ASDv6At7F8Vi9r0xUlQe/v0AQS3yc |
| 23 N8QlyR4XMbzMLYk3yjxFDXo4ZKQtOGzLGteCU2srANiLv26/imXA8FVidZftTAtL |
| 24 viWQZBVPTeYIA69ATUYPEq0a5u5wjGyUOij9OWyuy01mbPkCQQDluYoNpPOekQ0Z |
| 25 WrPgJ5rxc8f6zG37ZVoDBiexqtVShIF5W3xYuWhW5kYb0hliYfkq15cS7t9m95h3 |
| 26 1QJf/xI/AkEA1v9l/WN1a1N3rOK4VGoCokx7kR2SyTMSbZgF9IWJNOugR/WZw7HT |
| 27 njipO3c9dy1Ms9pUKwUF46d7049ck8HwdQJARgrSKuLWXMyBH+/l1Dx/I4tXuAJI |
| 28 rlPyo+VmiOc7b5NzHptkSHEPfR9s1OK0VqjknclqCJ3Ig86OMEtEFBzjZQJBAKYz |
| 29 470hcPkaGk7tKYAgP48FvxRsnzeooptURW5E+M+PQ2W9iDPPOX9739+Xi02hGEWF |
| 30 B0IGbQoTRFdE4VVcPK0CQQCeS84lODlC0Y2BZv2JxW3Osv/WkUQ4dslfAQl1T303 |
| 31 7uwwr7XTroMv8dIFQIPreoPhRKmd/SbJzbiKfS/4QDhU |
| 32 -----END RSA PRIVATE KEY----- |
| 33 """) |
| 34 self.pk_id = "PK123456789754" |
| 35 self.dist = Distribution() |
| 36 self.canned_policy = ( |
| 37 '{"Statement":[{"Resource":' |
| 38 '"http://d604721fxaaqy9.cloudfront.net/horizon.jpg' |
| 39 '?large=yes&license=yes",' |
| 40 '"Condition":{"DateLessThan":{"AWS:EpochTime":1258237200}}}]}') |
| 41 self.custom_policy_1 = ( |
| 42 '{ \n' |
| 43 ' "Statement": [{ \n' |
| 44 ' "Resource":"http://d604721fxaaqy9.cloudfront.net/training/*",
\n' |
| 45 ' "Condition":{ \n' |
| 46 ' "IpAddress":{"AWS:SourceIp":"145.168.143.0/24"}, \n' |
| 47 ' "DateLessThan":{"AWS:EpochTime":1258237200} \n' |
| 48 ' } \n' |
| 49 ' }] \n' |
| 50 '}\n') |
| 51 self.custom_policy_2 = ( |
| 52 '{ \n' |
| 53 ' "Statement": [{ \n' |
| 54 ' "Resource":"http://*", \n' |
| 55 ' "Condition":{ \n' |
| 56 ' "IpAddress":{"AWS:SourceIp":"216.98.35.1/32"},\n' |
| 57 ' "DateGreaterThan":{"AWS:EpochTime":1241073790},\n' |
| 58 ' "DateLessThan":{"AWS:EpochTime":1255674716}\n' |
| 59 ' } \n' |
| 60 ' }] \n' |
| 61 '}\n') |
| 62 |
| 63 def test_encode_custom_policy_1(self): |
| 64 """ |
| 65 Test base64 encoding custom policy 1 from Amazon's documentation. |
| 66 """ |
| 67 expected = ("eyAKICAgIlN0YXRlbWVudCI6IFt7IAogICAgICAiUmVzb3VyY2Ui" |
| 68 "OiJodHRwOi8vZDYwNDcyMWZ4YWFxeTkuY2xvdWRmcm9udC5uZXQv" |
| 69 "dHJhaW5pbmcvKiIsIAogICAgICAiQ29uZGl0aW9uIjp7IAogICAg" |
| 70 "ICAgICAiSXBBZGRyZXNzIjp7IkFXUzpTb3VyY2VJcCI6IjE0NS4x" |
| 71 "NjguMTQzLjAvMjQifSwgCiAgICAgICAgICJEYXRlTGVzc1RoYW4i" |
| 72 "OnsiQVdTOkVwb2NoVGltZSI6MTI1ODIzNzIwMH0gICAgICAKICAg" |
| 73 "ICAgfSAKICAgfV0gCn0K") |
| 74 encoded = self.dist._url_base64_encode(self.custom_policy_1) |
| 75 self.assertEqual(expected, encoded) |
| 76 |
| 77 def test_encode_custom_policy_2(self): |
| 78 """ |
| 79 Test base64 encoding custom policy 2 from Amazon's documentation. |
| 80 """ |
| 81 expected = ("eyAKICAgIlN0YXRlbWVudCI6IFt7IAogICAgICAiUmVzb3VyY2Ui" |
| 82 "OiJodHRwOi8vKiIsIAogICAgICAiQ29uZGl0aW9uIjp7IAogICAg" |
| 83 "ICAgICAiSXBBZGRyZXNzIjp7IkFXUzpTb3VyY2VJcCI6IjIxNi45" |
| 84 "OC4zNS4xLzMyIn0sCiAgICAgICAgICJEYXRlR3JlYXRlclRoYW4i" |
| 85 "OnsiQVdTOkVwb2NoVGltZSI6MTI0MTA3Mzc5MH0sCiAgICAgICAg" |
| 86 "ICJEYXRlTGVzc1RoYW4iOnsiQVdTOkVwb2NoVGltZSI6MTI1NTY3" |
| 87 "NDcxNn0KICAgICAgfSAKICAgfV0gCn0K") |
| 88 encoded = self.dist._url_base64_encode(self.custom_policy_2) |
| 89 self.assertEqual(expected, encoded) |
| 90 |
| 91 def test_sign_canned_policy(self): |
| 92 """ |
| 93 Test signing the canned policy from amazon's cloudfront documentation. |
| 94 """ |
| 95 expected = ("Nql641NHEUkUaXQHZINK1FZ~SYeUSoBJMxjdgqrzIdzV2gyEXPDN" |
| 96 "v0pYdWJkflDKJ3xIu7lbwRpSkG98NBlgPi4ZJpRRnVX4kXAJK6td" |
| 97 "Nx6FucDB7OVqzcxkxHsGFd8VCG1BkC-Afh9~lOCMIYHIaiOB6~5j" |
| 98 "t9w2EOwi6sIIqrg_") |
| 99 sig = self.dist._sign_string(self.canned_policy, private_key_string=self
.pk_str) |
| 100 encoded_sig = self.dist._url_base64_encode(sig) |
| 101 self.assertEqual(expected, encoded_sig) |
| 102 |
| 103 def test_sign_canned_policy_pk_file(self): |
| 104 """ |
| 105 Test signing the canned policy from amazon's cloudfront documentation |
| 106 with a file object. |
| 107 """ |
| 108 expected = ("Nql641NHEUkUaXQHZINK1FZ~SYeUSoBJMxjdgqrzIdzV2gyEXPDN" |
| 109 "v0pYdWJkflDKJ3xIu7lbwRpSkG98NBlgPi4ZJpRRnVX4kXAJK6td" |
| 110 "Nx6FucDB7OVqzcxkxHsGFd8VCG1BkC-Afh9~lOCMIYHIaiOB6~5j" |
| 111 "t9w2EOwi6sIIqrg_") |
| 112 pk_file = tempfile.TemporaryFile() |
| 113 pk_file.write(self.pk_str) |
| 114 pk_file.seek(0) |
| 115 sig = self.dist._sign_string(self.canned_policy, private_key_file=pk_fil
e) |
| 116 encoded_sig = self.dist._url_base64_encode(sig) |
| 117 self.assertEqual(expected, encoded_sig) |
| 118 |
| 119 def test_sign_canned_policy_pk_file_name(self): |
| 120 """ |
| 121 Test signing the canned policy from amazon's cloudfront documentation |
| 122 with a file name. |
| 123 """ |
| 124 expected = ("Nql641NHEUkUaXQHZINK1FZ~SYeUSoBJMxjdgqrzIdzV2gyEXPDN" |
| 125 "v0pYdWJkflDKJ3xIu7lbwRpSkG98NBlgPi4ZJpRRnVX4kXAJK6td" |
| 126 "Nx6FucDB7OVqzcxkxHsGFd8VCG1BkC-Afh9~lOCMIYHIaiOB6~5j" |
| 127 "t9w2EOwi6sIIqrg_") |
| 128 pk_file = tempfile.NamedTemporaryFile() |
| 129 pk_file.write(self.pk_str) |
| 130 pk_file.flush() |
| 131 sig = self.dist._sign_string(self.canned_policy, private_key_file=pk_fil
e.name) |
| 132 encoded_sig = self.dist._url_base64_encode(sig) |
| 133 self.assertEqual(expected, encoded_sig) |
| 134 |
| 135 def test_sign_canned_policy_unicode(self): |
| 136 """ |
| 137 Test signing the canned policy from amazon's cloudfront documentation. |
| 138 """ |
| 139 expected = ("Nql641NHEUkUaXQHZINK1FZ~SYeUSoBJMxjdgqrzIdzV2gyEXPDN" |
| 140 "v0pYdWJkflDKJ3xIu7lbwRpSkG98NBlgPi4ZJpRRnVX4kXAJK6td" |
| 141 "Nx6FucDB7OVqzcxkxHsGFd8VCG1BkC-Afh9~lOCMIYHIaiOB6~5j" |
| 142 "t9w2EOwi6sIIqrg_") |
| 143 unicode_policy = unicode(self.canned_policy) |
| 144 sig = self.dist._sign_string(unicode_policy, private_key_string=self.pk_
str) |
| 145 encoded_sig = self.dist._url_base64_encode(sig) |
| 146 self.assertEqual(expected, encoded_sig) |
| 147 |
| 148 def test_sign_custom_policy_1(self): |
| 149 """ |
| 150 Test signing custom policy 1 from amazon's cloudfront documentation. |
| 151 """ |
| 152 expected = ("cPFtRKvUfYNYmxek6ZNs6vgKEZP6G3Cb4cyVt~FjqbHOnMdxdT7e" |
| 153 "T6pYmhHYzuDsFH4Jpsctke2Ux6PCXcKxUcTIm8SO4b29~1QvhMl-" |
| 154 "CIojki3Hd3~Unxjw7Cpo1qRjtvrimW0DPZBZYHFZtiZXsaPt87yB" |
| 155 "P9GWnTQoaVysMxQ_") |
| 156 sig = self.dist._sign_string(self.custom_policy_1, private_key_string=se
lf.pk_str) |
| 157 encoded_sig = self.dist._url_base64_encode(sig) |
| 158 self.assertEqual(expected, encoded_sig) |
| 159 |
| 160 def test_sign_custom_policy_2(self): |
| 161 """ |
| 162 Test signing custom policy 2 from amazon's cloudfront documentation. |
| 163 """ |
| 164 expected = ("rc~5Qbbm8EJXjUTQ6Cn0LAxR72g1DOPrTmdtfbWVVgQNw0q~KHUA" |
| 165 "mBa2Zv1Wjj8dDET4XSL~Myh44CLQdu4dOH~N9huH7QfPSR~O4tIO" |
| 166 "S1WWcP~2JmtVPoQyLlEc8YHRCuN3nVNZJ0m4EZcXXNAS-0x6Zco2" |
| 167 "SYx~hywTRxWR~5Q_") |
| 168 sig = self.dist._sign_string(self.custom_policy_2, private_key_string=se
lf.pk_str) |
| 169 encoded_sig = self.dist._url_base64_encode(sig) |
| 170 self.assertEqual(expected, encoded_sig) |
| 171 |
| 172 def test_create_canned_policy(self): |
| 173 """ |
| 174 Test that a canned policy is generated correctly. |
| 175 """ |
| 176 url = "http://1234567.cloudfront.com/test_resource.mp3?dog=true" |
| 177 expires = 999999 |
| 178 policy = self.dist._canned_policy(url, expires) |
| 179 policy = json.loads(policy) |
| 180 |
| 181 self.assertEqual(1, len(policy.keys())) |
| 182 statements = policy["Statement"] |
| 183 self.assertEqual(1, len(statements)) |
| 184 statement = statements[0] |
| 185 resource = statement["Resource"] |
| 186 self.assertEqual(url, resource) |
| 187 condition = statement["Condition"] |
| 188 self.assertEqual(1, len(condition.keys())) |
| 189 date_less_than = condition["DateLessThan"] |
| 190 self.assertEqual(1, len(date_less_than.keys())) |
| 191 aws_epoch_time = date_less_than["AWS:EpochTime"] |
| 192 self.assertEqual(expires, aws_epoch_time) |
| 193 |
| 194 def test_custom_policy_expires_and_policy_url(self): |
| 195 """ |
| 196 Test that a custom policy can be created with an expire time and an |
| 197 arbitrary URL. |
| 198 """ |
| 199 url = "http://1234567.cloudfront.com/*" |
| 200 expires = 999999 |
| 201 policy = self.dist._custom_policy(url, expires=expires) |
| 202 policy = json.loads(policy) |
| 203 |
| 204 self.assertEqual(1, len(policy.keys())) |
| 205 statements = policy["Statement"] |
| 206 self.assertEqual(1, len(statements)) |
| 207 statement = statements[0] |
| 208 resource = statement["Resource"] |
| 209 self.assertEqual(url, resource) |
| 210 condition = statement["Condition"] |
| 211 self.assertEqual(1, len(condition.keys())) |
| 212 date_less_than = condition["DateLessThan"] |
| 213 self.assertEqual(1, len(date_less_than.keys())) |
| 214 aws_epoch_time = date_less_than["AWS:EpochTime"] |
| 215 self.assertEqual(expires, aws_epoch_time) |
| 216 |
| 217 def test_custom_policy_valid_after(self): |
| 218 """ |
| 219 Test that a custom policy can be created with a valid-after time and |
| 220 an arbitrary URL. |
| 221 """ |
| 222 url = "http://1234567.cloudfront.com/*" |
| 223 valid_after = 999999 |
| 224 policy = self.dist._custom_policy(url, valid_after=valid_after) |
| 225 policy = json.loads(policy) |
| 226 |
| 227 self.assertEqual(1, len(policy.keys())) |
| 228 statements = policy["Statement"] |
| 229 self.assertEqual(1, len(statements)) |
| 230 statement = statements[0] |
| 231 resource = statement["Resource"] |
| 232 self.assertEqual(url, resource) |
| 233 condition = statement["Condition"] |
| 234 self.assertEqual(2, len(condition.keys())) |
| 235 date_less_than = condition["DateLessThan"] |
| 236 date_greater_than = condition["DateGreaterThan"] |
| 237 self.assertEqual(1, len(date_greater_than.keys())) |
| 238 aws_epoch_time = date_greater_than["AWS:EpochTime"] |
| 239 self.assertEqual(valid_after, aws_epoch_time) |
| 240 |
| 241 def test_custom_policy_ip_address(self): |
| 242 """ |
| 243 Test that a custom policy can be created with an IP address and |
| 244 an arbitrary URL. |
| 245 """ |
| 246 url = "http://1234567.cloudfront.com/*" |
| 247 ip_range = "192.168.0.1" |
| 248 policy = self.dist._custom_policy(url, ip_address=ip_range) |
| 249 policy = json.loads(policy) |
| 250 |
| 251 self.assertEqual(1, len(policy.keys())) |
| 252 statements = policy["Statement"] |
| 253 self.assertEqual(1, len(statements)) |
| 254 statement = statements[0] |
| 255 resource = statement["Resource"] |
| 256 self.assertEqual(url, resource) |
| 257 condition = statement["Condition"] |
| 258 self.assertEqual(2, len(condition.keys())) |
| 259 ip_address = condition["IpAddress"] |
| 260 self.assertTrue("DateLessThan" in condition) |
| 261 self.assertEqual(1, len(ip_address.keys())) |
| 262 source_ip = ip_address["AWS:SourceIp"] |
| 263 self.assertEqual("%s/32" % ip_range, source_ip) |
| 264 |
| 265 def test_custom_policy_ip_range(self): |
| 266 """ |
| 267 Test that a custom policy can be created with an IP address and |
| 268 an arbitrary URL. |
| 269 """ |
| 270 url = "http://1234567.cloudfront.com/*" |
| 271 ip_range = "192.168.0.0/24" |
| 272 policy = self.dist._custom_policy(url, ip_address=ip_range) |
| 273 policy = json.loads(policy) |
| 274 |
| 275 self.assertEqual(1, len(policy.keys())) |
| 276 statements = policy["Statement"] |
| 277 self.assertEqual(1, len(statements)) |
| 278 statement = statements[0] |
| 279 resource = statement["Resource"] |
| 280 self.assertEqual(url, resource) |
| 281 condition = statement["Condition"] |
| 282 self.assertEqual(2, len(condition.keys())) |
| 283 self.assertTrue("DateLessThan" in condition) |
| 284 ip_address = condition["IpAddress"] |
| 285 self.assertEqual(1, len(ip_address.keys())) |
| 286 source_ip = ip_address["AWS:SourceIp"] |
| 287 self.assertEqual(ip_range, source_ip) |
| 288 |
| 289 def test_custom_policy_all(self): |
| 290 """ |
| 291 Test that a custom policy can be created with an IP address and |
| 292 an arbitrary URL. |
| 293 """ |
| 294 url = "http://1234567.cloudfront.com/test.txt" |
| 295 expires = 999999 |
| 296 valid_after = 111111 |
| 297 ip_range = "192.168.0.0/24" |
| 298 policy = self.dist._custom_policy(url, expires=expires, |
| 299 valid_after=valid_after, |
| 300 ip_address=ip_range) |
| 301 policy = json.loads(policy) |
| 302 |
| 303 self.assertEqual(1, len(policy.keys())) |
| 304 statements = policy["Statement"] |
| 305 self.assertEqual(1, len(statements)) |
| 306 statement = statements[0] |
| 307 resource = statement["Resource"] |
| 308 self.assertEqual(url, resource) |
| 309 condition = statement["Condition"] |
| 310 self.assertEqual(3, len(condition.keys())) |
| 311 #check expires condition |
| 312 date_less_than = condition["DateLessThan"] |
| 313 self.assertEqual(1, len(date_less_than.keys())) |
| 314 aws_epoch_time = date_less_than["AWS:EpochTime"] |
| 315 self.assertEqual(expires, aws_epoch_time) |
| 316 #check valid_after condition |
| 317 date_greater_than = condition["DateGreaterThan"] |
| 318 self.assertEqual(1, len(date_greater_than.keys())) |
| 319 aws_epoch_time = date_greater_than["AWS:EpochTime"] |
| 320 self.assertEqual(valid_after, aws_epoch_time) |
| 321 #check source ip address condition |
| 322 ip_address = condition["IpAddress"] |
| 323 self.assertEqual(1, len(ip_address.keys())) |
| 324 source_ip = ip_address["AWS:SourceIp"] |
| 325 self.assertEqual(ip_range, source_ip) |
| 326 |
| 327 def test_params_canned_policy(self): |
| 328 """ |
| 329 Test the correct params are generated for a canned policy. |
| 330 """ |
| 331 url = "http://d604721fxaaqy9.cloudfront.net/horizon.jpg?large=yes&licens
e=yes" |
| 332 expire_time = 1258237200 |
| 333 expected_sig = ("Nql641NHEUkUaXQHZINK1FZ~SYeUSoBJMxjdgqrzIdzV2gyE" |
| 334 "XPDNv0pYdWJkflDKJ3xIu7lbwRpSkG98NBlgPi4ZJpRRnVX4" |
| 335 "kXAJK6tdNx6FucDB7OVqzcxkxHsGFd8VCG1BkC-Afh9~lOCM" |
| 336 "IYHIaiOB6~5jt9w2EOwi6sIIqrg_") |
| 337 signed_url_params = self.dist._create_signing_params(url, self.pk_id, ex
pire_time, private_key_string=self.pk_str) |
| 338 self.assertEqual(3, len(signed_url_params)) |
| 339 self.assertEqual(signed_url_params["Expires"], "1258237200") |
| 340 self.assertEqual(signed_url_params["Signature"], expected_sig) |
| 341 self.assertEqual(signed_url_params["Key-Pair-Id"], "PK123456789754") |
| 342 |
| 343 def test_canned_policy(self): |
| 344 """ |
| 345 Generate signed url from the Example Canned Policy in Amazon's |
| 346 documentation. |
| 347 """ |
| 348 url = "http://d604721fxaaqy9.cloudfront.net/horizon.jpg?large=yes&licens
e=yes" |
| 349 expire_time = 1258237200 |
| 350 expected_url = "http://d604721fxaaqy9.cloudfront.net/horizon.jpg?large=y
es&license=yes&Expires=1258237200&Signature=Nql641NHEUkUaXQHZINK1FZ~SYeUSoBJMxjd
gqrzIdzV2gyEXPDNv0pYdWJkflDKJ3xIu7lbwRpSkG98NBlgPi4ZJpRRnVX4kXAJK6tdNx6FucDB7OVq
zcxkxHsGFd8VCG1BkC-Afh9~lOCMIYHIaiOB6~5jt9w2EOwi6sIIqrg_&Key-Pair-Id=PK123456789
754" |
| 351 signed_url = self.dist.create_signed_url( |
| 352 url, self.pk_id, expire_time, private_key_string=self.pk_str) |
| 353 self.assertEqual(expected_url, signed_url) |
| 354 |
OLD | NEW |