Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Unified Diff: third_party/gsutil/boto/tests/unit/provider/test_provider.py

Issue 12317103: Added gsutil to depot tools (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: added readme Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/gsutil/boto/tests/unit/provider/test_provider.py
diff --git a/third_party/gsutil/boto/tests/unit/provider/test_provider.py b/third_party/gsutil/boto/tests/unit/provider/test_provider.py
new file mode 100644
index 0000000000000000000000000000000000000000..edece9798f841b129d24f3c889799fbd4de94217
--- /dev/null
+++ b/third_party/gsutil/boto/tests/unit/provider/test_provider.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+from datetime import datetime, timedelta
+
+from tests.unit import unittest
+import mock
+
+from boto import provider
+
+
+class TestProvider(unittest.TestCase):
+ def setUp(self):
+ self.environ = {}
+ self.config = {}
+
+ self.metadata_patch = mock.patch('boto.utils.get_instance_metadata')
+ self.config_patch = mock.patch('boto.provider.config.get',
+ self.get_config)
+ self.has_config_patch = mock.patch('boto.provider.config.has_option',
+ self.has_config)
+ self.environ_patch = mock.patch('os.environ', self.environ)
+
+ self.get_instance_metadata = self.metadata_patch.start()
+ self.config_patch.start()
+ self.has_config_patch.start()
+ self.environ_patch.start()
+
+
+ def tearDown(self):
+ self.metadata_patch.stop()
+ self.config_patch.stop()
+ self.has_config_patch.stop()
+ self.environ_patch.stop()
+
+ def has_config(self, section_name, key):
+ try:
+ self.config[section_name][key]
+ return True
+ except KeyError:
+ return False
+
+ def get_config(self, section_name, key):
+ try:
+ return self.config[section_name][key]
+ except KeyError:
+ return None
+
+ def test_passed_in_values_are_used(self):
+ p = provider.Provider('aws', 'access_key', 'secret_key', 'security_token')
+ self.assertEqual(p.access_key, 'access_key')
+ self.assertEqual(p.secret_key, 'secret_key')
+ self.assertEqual(p.security_token, 'security_token')
+
+ def test_environment_variables_are_used(self):
+ self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key'
+ self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key'
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'env_access_key')
+ self.assertEqual(p.secret_key, 'env_secret_key')
+ self.assertIsNone(p.security_token)
+
+ def test_config_values_are_used(self):
+ self.config = {
+ 'Credentials': {
+ 'aws_access_key_id': 'cfg_access_key',
+ 'aws_secret_access_key': 'cfg_secret_key',
+ }
+ }
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'cfg_access_key')
+ self.assertEqual(p.secret_key, 'cfg_secret_key')
+ self.assertIsNone(p.security_token)
+
+ def test_keyring_is_used(self):
+ self.config = {
+ 'Credentials': {
+ 'aws_access_key_id': 'cfg_access_key',
+ 'keyring': 'test',
+ }
+ }
+ import sys
+ try:
+ import keyring
+ imported = True
+ except ImportError:
+ sys.modules['keyring'] = keyring = type(mock)('keyring', '')
+ imported = False
+
+ try:
+ with mock.patch('keyring.get_password', create=True):
+ keyring.get_password.side_effect = (
+ lambda kr, login: kr+login+'pw')
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'cfg_access_key')
+ self.assertEqual(p.secret_key, 'testcfg_access_keypw')
+ self.assertIsNone(p.security_token)
+ finally:
+ if not imported:
+ del sys.modules['keyring']
+
+ def test_env_vars_beat_config_values(self):
+ self.environ['AWS_ACCESS_KEY_ID'] = 'env_access_key'
+ self.environ['AWS_SECRET_ACCESS_KEY'] = 'env_secret_key'
+ self.config = {
+ 'Credentials': {
+ 'aws_access_key_id': 'cfg_access_key',
+ 'aws_secret_access_key': 'cfg_secret_key',
+ }
+ }
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'env_access_key')
+ self.assertEqual(p.secret_key, 'env_secret_key')
+ self.assertIsNone(p.security_token)
+
+ def test_metadata_server_credentials(self):
+ instance_config = {
+ 'iam': {
+ 'security-credentials': {
+ 'allowall': {u'AccessKeyId': u'iam_access_key',
+ u'Code': u'Success',
+ u'Expiration': u'2012-09-01T03:57:34Z',
+ u'LastUpdated': u'2012-08-31T21:43:40Z',
+ u'SecretAccessKey': u'iam_secret_key',
+ u'Token': u'iam_token',
+ u'Type': u'AWS-HMAC'}
+ }
+ }
+ }
+ self.get_instance_metadata.return_value = instance_config
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'iam_access_key')
+ self.assertEqual(p.secret_key, 'iam_secret_key')
+ self.assertEqual(p.security_token, 'iam_token')
+
+ def test_refresh_credentials(self):
+ now = datetime.now()
+ first_expiration = (now + timedelta(seconds=10)).strftime(
+ "%Y-%m-%dT%H:%M:%SZ")
+ credentials = {
+ u'AccessKeyId': u'first_access_key',
+ u'Code': u'Success',
+ u'Expiration': first_expiration,
+ u'LastUpdated': u'2012-08-31T21:43:40Z',
+ u'SecretAccessKey': u'first_secret_key',
+ u'Token': u'first_token',
+ u'Type': u'AWS-HMAC'
+ }
+ instance_config = {
+ 'iam': {
+ 'security-credentials': {
+ 'allowall': credentials
+ }
+ }
+ }
+ self.get_instance_metadata.return_value = instance_config
+ p = provider.Provider('aws')
+ self.assertEqual(p.access_key, 'first_access_key')
+ self.assertEqual(p.secret_key, 'first_secret_key')
+ self.assertEqual(p.security_token, 'first_token')
+ self.assertIsNotNone(p._credential_expiry_time)
+
+ # Now set the expiration to something in the past.
+ expired = now - timedelta(seconds=20)
+ p._credential_expiry_time = expired
+ credentials['AccessKeyId'] = 'second_access_key'
+ credentials['SecretAccessKey'] = 'second_secret_key'
+ credentials['Token'] = 'second_token'
+ self.get_instance_metadata.return_value = instance_config
+
+ # Now upon attribute access, the credentials should be updated.
+ self.assertEqual(p.access_key, 'second_access_key')
+ self.assertEqual(p.secret_key, 'second_secret_key')
+ self.assertEqual(p.security_token, 'second_token')
+
+
+if __name__ == '__main__':
+ unittest.main()

Powered by Google App Engine
This is Rietveld 408576698