OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 from datetime import datetime, timedelta |
| 3 |
| 4 from tests.unit import unittest |
| 5 import mock |
| 6 |
| 7 from boto import provider |
| 8 |
| 9 |
| 10 class TestProvider(unittest.TestCase): |
| 11 def setUp(self): |
| 12 self.environ = {} |
| 13 self.config = {} |
| 14 |
| 15 self.metadata_patch = mock.patch('boto.utils.get_instance_metadata') |
| 16 self.config_patch = mock.patch('boto.provider.config.get', |
| 17 self.get_config) |
| 18 self.has_config_patch = mock.patch('boto.provider.config.has_option', |
| 19 self.has_config) |
| 20 self.environ_patch = mock.patch('os.environ', self.environ) |
| 21 |
| 22 self.get_instance_metadata = self.metadata_patch.start() |
| 23 self.config_patch.start() |
| 24 self.has_config_patch.start() |
| 25 self.environ_patch.start() |
| 26 |
| 27 |
| 28 def tearDown(self): |
| 29 self.metadata_patch.stop() |
| 30 self.config_patch.stop() |
| 31 self.has_config_patch.stop() |
| 32 self.environ_patch.stop() |
| 33 |
| 34 def has_config(self, section_name, key): |
| 35 try: |
| 36 self.config[section_name][key] |
| 37 return True |
| 38 except KeyError: |
| 39 return False |
| 40 |
| 41 def get_config(self, section_name, key): |
| 42 try: |
| 43 return self.config[section_name][key] |
| 44 except KeyError: |
| 45 return None |
| 46 |
| 47 def test_passed_in_values_are_used(self): |
| 48 p = provider.Provider('aws', 'access_key', 'secret_key', 'security_token
') |
| 49 self.assertEqual(p.access_key, 'access_key') |
| 50 self.assertEqual(p.secret_key, 'secret_key') |
| 51 self.assertEqual(p.security_token, 'security_token') |
| 52 |
| 53 def test_environment_variables_are_used(self): |
| 54 self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key' |
| 55 self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key' |
| 56 p = provider.Provider('aws') |
| 57 self.assertEqual(p.access_key, 'env_access_key') |
| 58 self.assertEqual(p.secret_key, 'env_secret_key') |
| 59 self.assertIsNone(p.security_token) |
| 60 |
| 61 def test_config_values_are_used(self): |
| 62 self.config = { |
| 63 'Credentials': { |
| 64 'aws_access_key_id': 'cfg_access_key', |
| 65 'aws_secret_access_key': 'cfg_secret_key', |
| 66 } |
| 67 } |
| 68 p = provider.Provider('aws') |
| 69 self.assertEqual(p.access_key, 'cfg_access_key') |
| 70 self.assertEqual(p.secret_key, 'cfg_secret_key') |
| 71 self.assertIsNone(p.security_token) |
| 72 |
| 73 def test_keyring_is_used(self): |
| 74 self.config = { |
| 75 'Credentials': { |
| 76 'aws_access_key_id': 'cfg_access_key', |
| 77 'keyring': 'test', |
| 78 } |
| 79 } |
| 80 import sys |
| 81 try: |
| 82 import keyring |
| 83 imported = True |
| 84 except ImportError: |
| 85 sys.modules['keyring'] = keyring = type(mock)('keyring', '') |
| 86 imported = False |
| 87 |
| 88 try: |
| 89 with mock.patch('keyring.get_password', create=True): |
| 90 keyring.get_password.side_effect = ( |
| 91 lambda kr, login: kr+login+'pw') |
| 92 p = provider.Provider('aws') |
| 93 self.assertEqual(p.access_key, 'cfg_access_key') |
| 94 self.assertEqual(p.secret_key, 'testcfg_access_keypw') |
| 95 self.assertIsNone(p.security_token) |
| 96 finally: |
| 97 if not imported: |
| 98 del sys.modules['keyring'] |
| 99 |
| 100 def test_env_vars_beat_config_values(self): |
| 101 self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key' |
| 102 self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key' |
| 103 self.config = { |
| 104 'Credentials': { |
| 105 'aws_access_key_id': 'cfg_access_key', |
| 106 'aws_secret_access_key': 'cfg_secret_key', |
| 107 } |
| 108 } |
| 109 p = provider.Provider('aws') |
| 110 self.assertEqual(p.access_key, 'env_access_key') |
| 111 self.assertEqual(p.secret_key, 'env_secret_key') |
| 112 self.assertIsNone(p.security_token) |
| 113 |
| 114 def test_metadata_server_credentials(self): |
| 115 instance_config = { |
| 116 'iam': { |
| 117 'security-credentials': { |
| 118 'allowall': {u'AccessKeyId': u'iam_access_key', |
| 119 u'Code': u'Success', |
| 120 u'Expiration': u'2012-09-01T03:57:34Z', |
| 121 u'LastUpdated': u'2012-08-31T21:43:40Z', |
| 122 u'SecretAccessKey': u'iam_secret_key', |
| 123 u'Token': u'iam_token', |
| 124 u'Type': u'AWS-HMAC'} |
| 125 } |
| 126 } |
| 127 } |
| 128 self.get_instance_metadata.return_value = instance_config |
| 129 p = provider.Provider('aws') |
| 130 self.assertEqual(p.access_key, 'iam_access_key') |
| 131 self.assertEqual(p.secret_key, 'iam_secret_key') |
| 132 self.assertEqual(p.security_token, 'iam_token') |
| 133 |
| 134 def test_refresh_credentials(self): |
| 135 now = datetime.now() |
| 136 first_expiration = (now + timedelta(seconds=10)).strftime( |
| 137 "%Y-%m-%dT%H:%M:%SZ") |
| 138 credentials = { |
| 139 u'AccessKeyId': u'first_access_key', |
| 140 u'Code': u'Success', |
| 141 u'Expiration': first_expiration, |
| 142 u'LastUpdated': u'2012-08-31T21:43:40Z', |
| 143 u'SecretAccessKey': u'first_secret_key', |
| 144 u'Token': u'first_token', |
| 145 u'Type': u'AWS-HMAC' |
| 146 } |
| 147 instance_config = { |
| 148 'iam': { |
| 149 'security-credentials': { |
| 150 'allowall': credentials |
| 151 } |
| 152 } |
| 153 } |
| 154 self.get_instance_metadata.return_value = instance_config |
| 155 p = provider.Provider('aws') |
| 156 self.assertEqual(p.access_key, 'first_access_key') |
| 157 self.assertEqual(p.secret_key, 'first_secret_key') |
| 158 self.assertEqual(p.security_token, 'first_token') |
| 159 self.assertIsNotNone(p._credential_expiry_time) |
| 160 |
| 161 # Now set the expiration to something in the past. |
| 162 expired = now - timedelta(seconds=20) |
| 163 p._credential_expiry_time = expired |
| 164 credentials['AccessKeyId'] = 'second_access_key' |
| 165 credentials['SecretAccessKey'] = 'second_secret_key' |
| 166 credentials['Token'] = 'second_token' |
| 167 self.get_instance_metadata.return_value = instance_config |
| 168 |
| 169 # Now upon attribute access, the credentials should be updated. |
| 170 self.assertEqual(p.access_key, 'second_access_key') |
| 171 self.assertEqual(p.secret_key, 'second_secret_key') |
| 172 self.assertEqual(p.security_token, 'second_token') |
| 173 |
| 174 |
| 175 if __name__ == '__main__': |
| 176 unittest.main() |
OLD | NEW |