Index: third_party/gsutil/boto/tests/unit/provider/test_provider.py |
diff --git a/third_party/gsutil/boto/tests/unit/provider/test_provider.py b/third_party/gsutil/boto/tests/unit/provider/test_provider.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..edece9798f841b129d24f3c889799fbd4de94217 |
--- /dev/null |
+++ b/third_party/gsutil/boto/tests/unit/provider/test_provider.py |
@@ -0,0 +1,176 @@ |
+#!/usr/bin/env python |
+from datetime import datetime, timedelta |
+ |
+from tests.unit import unittest |
+import mock |
+ |
+from boto import provider |
+ |
+ |
+class TestProvider(unittest.TestCase): |
+ def setUp(self): |
+ self.environ = {} |
+ self.config = {} |
+ |
+ self.metadata_patch = mock.patch('boto.utils.get_instance_metadata') |
+ self.config_patch = mock.patch('boto.provider.config.get', |
+ self.get_config) |
+ self.has_config_patch = mock.patch('boto.provider.config.has_option', |
+ self.has_config) |
+ self.environ_patch = mock.patch('os.environ', self.environ) |
+ |
+ self.get_instance_metadata = self.metadata_patch.start() |
+ self.config_patch.start() |
+ self.has_config_patch.start() |
+ self.environ_patch.start() |
+ |
+ |
+ def tearDown(self): |
+ self.metadata_patch.stop() |
+ self.config_patch.stop() |
+ self.has_config_patch.stop() |
+ self.environ_patch.stop() |
+ |
+ def has_config(self, section_name, key): |
+ try: |
+ self.config[section_name][key] |
+ return True |
+ except KeyError: |
+ return False |
+ |
+ def get_config(self, section_name, key): |
+ try: |
+ return self.config[section_name][key] |
+ except KeyError: |
+ return None |
+ |
+ def test_passed_in_values_are_used(self): |
+ p = provider.Provider('aws', 'access_key', 'secret_key', 'security_token') |
+ self.assertEqual(p.access_key, 'access_key') |
+ self.assertEqual(p.secret_key, 'secret_key') |
+ self.assertEqual(p.security_token, 'security_token') |
+ |
+ def test_environment_variables_are_used(self): |
+ self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key' |
+ self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key' |
+ p = provider.Provider('aws') |
+ self.assertEqual(p.access_key, 'env_access_key') |
+ self.assertEqual(p.secret_key, 'env_secret_key') |
+ self.assertIsNone(p.security_token) |
+ |
+ def test_config_values_are_used(self): |
+ self.config = { |
+ 'Credentials': { |
+ 'aws_access_key_id': 'cfg_access_key', |
+ 'aws_secret_access_key': 'cfg_secret_key', |
+ } |
+ } |
+ p = provider.Provider('aws') |
+ self.assertEqual(p.access_key, 'cfg_access_key') |
+ self.assertEqual(p.secret_key, 'cfg_secret_key') |
+ self.assertIsNone(p.security_token) |
+ |
+ def test_keyring_is_used(self): |
+ self.config = { |
+ 'Credentials': { |
+ 'aws_access_key_id': 'cfg_access_key', |
+ 'keyring': 'test', |
+ } |
+ } |
+ import sys |
+ try: |
+ import keyring |
+ imported = True |
+ except ImportError: |
+ sys.modules['keyring'] = keyring = type(mock)('keyring', '') |
+ imported = False |
+ |
+ try: |
+ with mock.patch('keyring.get_password', create=True): |
+ keyring.get_password.side_effect = ( |
+ lambda kr, login: kr+login+'pw') |
+ p = provider.Provider('aws') |
+ self.assertEqual(p.access_key, 'cfg_access_key') |
+ self.assertEqual(p.secret_key, 'testcfg_access_keypw') |
+ self.assertIsNone(p.security_token) |
+ finally: |
+ if not imported: |
+ del sys.modules['keyring'] |
+ |
+ def test_env_vars_beat_config_values(self): |
+ self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key' |
+ self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key' |
+ self.config = { |
+ 'Credentials': { |
+ 'aws_access_key_id': 'cfg_access_key', |
+ 'aws_secret_access_key': 'cfg_secret_key', |
+ } |
+ } |
+ p = provider.Provider('aws') |
+ self.assertEqual(p.access_key, 'env_access_key') |
+ self.assertEqual(p.secret_key, 'env_secret_key') |
+ self.assertIsNone(p.security_token) |
+ |
+ def test_metadata_server_credentials(self): |
+ instance_config = { |
+ 'iam': { |
+ 'security-credentials': { |
+ 'allowall': {u'AccessKeyId': u'iam_access_key', |
+ u'Code': u'Success', |
+ u'Expiration': u'2012-09-01T03:57:34Z', |
+ u'LastUpdated': u'2012-08-31T21:43:40Z', |
+ u'SecretAccessKey': u'iam_secret_key', |
+ u'Token': u'iam_token', |
+ u'Type': u'AWS-HMAC'} |
+ } |
+ } |
+ } |
+ self.get_instance_metadata.return_value = instance_config |
+ p = provider.Provider('aws') |
+ self.assertEqual(p.access_key, 'iam_access_key') |
+ self.assertEqual(p.secret_key, 'iam_secret_key') |
+ self.assertEqual(p.security_token, 'iam_token') |
+ |
+ def test_refresh_credentials(self): |
+ now = datetime.now() |
+ first_expiration = (now + timedelta(seconds=10)).strftime( |
+ "%Y-%m-%dT%H:%M:%SZ") |
+ credentials = { |
+ u'AccessKeyId': u'first_access_key', |
+ u'Code': u'Success', |
+ u'Expiration': first_expiration, |
+ u'LastUpdated': u'2012-08-31T21:43:40Z', |
+ u'SecretAccessKey': u'first_secret_key', |
+ u'Token': u'first_token', |
+ u'Type': u'AWS-HMAC' |
+ } |
+ instance_config = { |
+ 'iam': { |
+ 'security-credentials': { |
+ 'allowall': credentials |
+ } |
+ } |
+ } |
+ self.get_instance_metadata.return_value = instance_config |
+ p = provider.Provider('aws') |
+ self.assertEqual(p.access_key, 'first_access_key') |
+ self.assertEqual(p.secret_key, 'first_secret_key') |
+ self.assertEqual(p.security_token, 'first_token') |
+ self.assertIsNotNone(p._credential_expiry_time) |
+ |
+ # Now set the expiration to something in the past. |
+ expired = now - timedelta(seconds=20) |
+ p._credential_expiry_time = expired |
+ credentials['AccessKeyId'] = 'second_access_key' |
+ credentials['SecretAccessKey'] = 'second_secret_key' |
+ credentials['Token'] = 'second_token' |
+ self.get_instance_metadata.return_value = instance_config |
+ |
+ # Now upon attribute access, the credentials should be updated. |
+ self.assertEqual(p.access_key, 'second_access_key') |
+ self.assertEqual(p.secret_key, 'second_secret_key') |
+ self.assertEqual(p.security_token, 'second_token') |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |