| Index: third_party/gsutil/gslib/tests/test_chacl.py
|
| diff --git a/third_party/gsutil/gslib/tests/test_chacl.py b/third_party/gsutil/gslib/tests/test_chacl.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..c5081025f5e7f83f38f49f7f444b98887ead6639
|
| --- /dev/null
|
| +++ b/third_party/gsutil/gslib/tests/test_chacl.py
|
| @@ -0,0 +1,240 @@
|
| +# Copyright 2013 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +
|
| +
|
| +from gslib.command import _ThreadedLogger as ThreadedLogger
|
| +import gslib.commands.chacl as chacl
|
| +import gslib.tests.testcase as case
|
| +from gslib.tests.util import ObjectToURI as suri
|
| +from gslib.util import Retry
|
| +
|
| +
|
| +class ChaclIntegrationTest(case.GsUtilIntegrationTestCase):
|
| + """Tests gslib.commands.chacl."""
|
| + logger = ThreadedLogger()
|
| +
|
| + def setUp(self):
|
| + super(ChaclIntegrationTest, self).setUp()
|
| + self.sample_uri = self.CreateBucket()
|
| + self.logger = ThreadedLogger()
|
| +
|
| + def testAclChangeWithUserId(self):
|
| + change = chacl.AclChange(self.USER_TEST_ID + ':r',
|
| + scope_type=chacl.ChangeType.USER,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + change.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'READ', 'UserById', self.USER_TEST_ID)
|
| +
|
| + def testAclChangeWithGroupId(self):
|
| + change = chacl.AclChange(self.GROUP_TEST_ID + ':r',
|
| + scope_type=chacl.ChangeType.GROUP,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + change.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'READ', 'GroupById', self.GROUP_TEST_ID)
|
| +
|
| + def testAclChangeWithUserEmail(self):
|
| + change = chacl.AclChange(self.USER_TEST_ADDRESS + ':r',
|
| + scope_type=chacl.ChangeType.USER,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + change.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
|
| +
|
| + def testAclChangeWithGroupEmail(self):
|
| + change = chacl.AclChange(self.GROUP_TEST_ADDRESS + ':fc',
|
| + scope_type=chacl.ChangeType.GROUP,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + change.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'FULL_CONTROL', 'GroupByEmail',
|
| + self.GROUP_TEST_ADDRESS)
|
| +
|
| + def testAclChangeWithDomain(self):
|
| + change = chacl.AclChange(self.DOMAIN_TEST + ':READ',
|
| + scope_type=chacl.ChangeType.GROUP,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + change.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'READ', 'GroupByDomain', self.DOMAIN_TEST)
|
| +
|
| + def testAclChangeWithAllUsers(self):
|
| + change = chacl.AclChange('AllUsers:WRITE',
|
| + scope_type=chacl.ChangeType.GROUP,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + change.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'WRITE', 'AllUsers')
|
| +
|
| + def testAclChangeWithAllAuthUsers(self):
|
| + change = chacl.AclChange('AllAuthenticatedUsers:READ',
|
| + scope_type=chacl.ChangeType.GROUP,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + change.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'READ', 'AllAuthenticatedUsers')
|
| +
|
| + def testAclDelWithUser(self):
|
| + add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ',
|
| + scope_type=chacl.ChangeType.USER,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + add.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
|
| +
|
| + remove = chacl.AclDel(self.USER_TEST_ADDRESS,
|
| + logger=self.logger)
|
| + remove.Execute(self.sample_uri, acl)
|
| + self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
|
| +
|
| + def testAclDelWithGroup(self):
|
| + add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ',
|
| + scope_type=chacl.ChangeType.GROUP,
|
| + logger=self.logger)
|
| + acl = self.sample_uri.get_acl()
|
| + add.Execute(self.sample_uri, acl)
|
| + self._AssertHas(acl, 'READ', 'GroupByEmail', self.USER_TEST_ADDRESS)
|
| +
|
| + remove = chacl.AclDel(self.USER_TEST_ADDRESS,
|
| + logger=self.logger)
|
| + remove.Execute(self.sample_uri, acl)
|
| + self._AssertHasNo(acl, 'READ', 'GroupByEmail', self.GROUP_TEST_ADDRESS)
|
| +
|
| + #
|
| + # Here are a whole lot of verbose asserts
|
| + #
|
| +
|
| + def _AssertHas(self, current_acl, perm, scope, value=None):
|
| + matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value))
|
| + self.assertEqual(1, len(matches))
|
| +
|
| + def _AssertHasNo(self, current_acl, perm, scope, value=None):
|
| + matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value))
|
| + self.assertEqual(0, len(matches))
|
| +
|
| + def _YieldMatchingEntries(self, current_acl, perm, scope, value=None):
|
| + """Generator that finds entries that match the change descriptor."""
|
| + for entry in current_acl.entries.entry_list:
|
| + if entry.scope.type == scope:
|
| + if scope in ['UserById', 'GroupById']:
|
| + if value == entry.scope.id:
|
| + yield entry
|
| + elif scope in ['UserByEmail', 'GroupByEmail']:
|
| + if value == entry.scope.email_address:
|
| + yield entry
|
| + elif scope == 'GroupByDomain':
|
| + if value == entry.scope.domain:
|
| + yield entry
|
| + elif scope in ['AllUsers', 'AllAuthenticatedUsers']:
|
| + yield entry
|
| + else:
|
| + raise CommandException('Found an unrecognized ACL '
|
| + 'entry type, aborting.')
|
| +
|
| + def _MakeScopeRegex(self, scope_type, email_address, perm):
|
| + template_regex = (
|
| + r'<Scope type="{0}">\s*<EmailAddress>\s*{1}\s*</EmailAddress>\s*'
|
| + r'</Scope>\s*<Permission>\s*{2}\s*</Permission>')
|
| + return template_regex.format(scope_type, email_address, perm)
|
| +
|
| + def testBucketAclChange(self):
|
| + test_regex = self._MakeScopeRegex(
|
| + 'UserByEmail', self.USER_TEST_ADDRESS, 'FULL_CONTROL')
|
| + xml = self.RunGsUtil(
|
| + ['getacl', suri(self.sample_uri)], return_stdout=True)
|
| + self.assertNotRegexpMatches(xml, test_regex)
|
| +
|
| + self.RunGsUtil(
|
| + ['chacl', '-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)])
|
| + xml = self.RunGsUtil(
|
| + ['getacl', suri(self.sample_uri)], return_stdout=True)
|
| + self.assertRegexpMatches(xml, test_regex)
|
| +
|
| + self.RunGsUtil(
|
| + ['chacl', '-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)])
|
| + xml = self.RunGsUtil(
|
| + ['getacl', suri(self.sample_uri)], return_stdout=True)
|
| + self.assertNotRegexpMatches(xml, test_regex)
|
| +
|
| + def testObjectAclChange(self):
|
| + obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something')
|
| + test_regex = self._MakeScopeRegex(
|
| + 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
|
| + xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)
|
| + self.assertNotRegexpMatches(xml, test_regex)
|
| +
|
| + self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)])
|
| + xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)
|
| + self.assertRegexpMatches(xml, test_regex)
|
| +
|
| + self.RunGsUtil(['chacl', '-d', self.GROUP_TEST_ADDRESS, suri(obj)])
|
| + xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)
|
| + self.assertNotRegexpMatches(xml, test_regex)
|
| +
|
| + def testMultithreadedAclChange(self, count=10):
|
| + objects = []
|
| + for i in range(count):
|
| + objects.append(self.CreateObject(
|
| + bucket_uri=self.sample_uri,
|
| + contents='something {0}'.format(i)))
|
| +
|
| + test_regex = self._MakeScopeRegex(
|
| + 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
|
| + xmls = []
|
| + for obj in objects:
|
| + xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True))
|
| + for xml in xmls:
|
| + self.assertNotRegexpMatches(xml, test_regex)
|
| +
|
| + uris = [suri(obj) for obj in objects]
|
| + self.RunGsUtil(['-m', '-DD', 'chacl', '-g',
|
| + self.GROUP_TEST_ADDRESS+':READ'] + uris)
|
| +
|
| + xmls = []
|
| + for obj in objects:
|
| + xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True))
|
| + for xml in xmls:
|
| + self.assertRegexpMatches(xml, test_regex)
|
| +
|
| + def testMultiVersionSupport(self):
|
| + bucket = self.CreateVersionedBucket()
|
| + object_name = self.MakeTempName('obj')
|
| + obj = self.CreateObject(
|
| + bucket_uri=bucket, object_name=object_name, contents='One thing')
|
| + # Create another on the same URI, giving us a second version.
|
| + self.CreateObject(
|
| + bucket_uri=bucket, object_name=object_name, contents='Another thing')
|
| +
|
| + # Use @Retry as hedge against bucket listing eventual consistency.
|
| + @Retry(AssertionError, tries=3, delay=1, backoff=1, logger=self.logger)
|
| + def _getObjects():
|
| + stdout = self.RunGsUtil(['ls', '-a', suri(obj)], return_stdout=True)
|
| + lines = stdout.strip().split('\n')
|
| + self.assertEqual(len(lines), 2)
|
| + return lines
|
| +
|
| + obj_v1, obj_v2 = _getObjects()
|
| +
|
| + test_regex = self._MakeScopeRegex(
|
| + 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
|
| + xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True)
|
| + self.assertNotRegexpMatches(xml, test_regex)
|
| +
|
| + self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1])
|
| + xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True)
|
| + self.assertRegexpMatches(xml, test_regex)
|
| +
|
| + xml = self.RunGsUtil(['getacl', obj_v2], return_stdout=True)
|
| + self.assertNotRegexpMatches(xml, test_regex)
|
|
|