Index: third_party/gsutil/gslib/tests/test_chacl.py |
diff --git a/third_party/gsutil/gslib/tests/test_chacl.py b/third_party/gsutil/gslib/tests/test_chacl.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..c5081025f5e7f83f38f49f7f444b98887ead6639 |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/test_chacl.py |
@@ -0,0 +1,240 @@ |
+# Copyright 2013 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+ |
+ |
+from gslib.command import _ThreadedLogger as ThreadedLogger |
+import gslib.commands.chacl as chacl |
+import gslib.tests.testcase as case |
+from gslib.tests.util import ObjectToURI as suri |
+from gslib.util import Retry |
+ |
+ |
+class ChaclIntegrationTest(case.GsUtilIntegrationTestCase): |
+ """Tests gslib.commands.chacl.""" |
+ logger = ThreadedLogger() |
+ |
+ def setUp(self): |
+ super(ChaclIntegrationTest, self).setUp() |
+ self.sample_uri = self.CreateBucket() |
+ self.logger = ThreadedLogger() |
+ |
+ def testAclChangeWithUserId(self): |
+ change = chacl.AclChange(self.USER_TEST_ID + ':r', |
+ scope_type=chacl.ChangeType.USER, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ change.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'READ', 'UserById', self.USER_TEST_ID) |
+ |
+ def testAclChangeWithGroupId(self): |
+ change = chacl.AclChange(self.GROUP_TEST_ID + ':r', |
+ scope_type=chacl.ChangeType.GROUP, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ change.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'READ', 'GroupById', self.GROUP_TEST_ID) |
+ |
+ def testAclChangeWithUserEmail(self): |
+ change = chacl.AclChange(self.USER_TEST_ADDRESS + ':r', |
+ scope_type=chacl.ChangeType.USER, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ change.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
+ |
+ def testAclChangeWithGroupEmail(self): |
+ change = chacl.AclChange(self.GROUP_TEST_ADDRESS + ':fc', |
+ scope_type=chacl.ChangeType.GROUP, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ change.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'FULL_CONTROL', 'GroupByEmail', |
+ self.GROUP_TEST_ADDRESS) |
+ |
+ def testAclChangeWithDomain(self): |
+ change = chacl.AclChange(self.DOMAIN_TEST + ':READ', |
+ scope_type=chacl.ChangeType.GROUP, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ change.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'READ', 'GroupByDomain', self.DOMAIN_TEST) |
+ |
+ def testAclChangeWithAllUsers(self): |
+ change = chacl.AclChange('AllUsers:WRITE', |
+ scope_type=chacl.ChangeType.GROUP, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ change.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'WRITE', 'AllUsers') |
+ |
+ def testAclChangeWithAllAuthUsers(self): |
+ change = chacl.AclChange('AllAuthenticatedUsers:READ', |
+ scope_type=chacl.ChangeType.GROUP, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ change.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'READ', 'AllAuthenticatedUsers') |
+ |
+ def testAclDelWithUser(self): |
+ add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ', |
+ scope_type=chacl.ChangeType.USER, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ add.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
+ |
+ remove = chacl.AclDel(self.USER_TEST_ADDRESS, |
+ logger=self.logger) |
+ remove.Execute(self.sample_uri, acl) |
+ self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
+ |
+ def testAclDelWithGroup(self): |
+ add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ', |
+ scope_type=chacl.ChangeType.GROUP, |
+ logger=self.logger) |
+ acl = self.sample_uri.get_acl() |
+ add.Execute(self.sample_uri, acl) |
+ self._AssertHas(acl, 'READ', 'GroupByEmail', self.USER_TEST_ADDRESS) |
+ |
+ remove = chacl.AclDel(self.USER_TEST_ADDRESS, |
+ logger=self.logger) |
+ remove.Execute(self.sample_uri, acl) |
+ self._AssertHasNo(acl, 'READ', 'GroupByEmail', self.GROUP_TEST_ADDRESS) |
+ |
+ # |
+ # Here are a whole lot of verbose asserts |
+ # |
+ |
+ def _AssertHas(self, current_acl, perm, scope, value=None): |
+ matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value)) |
+ self.assertEqual(1, len(matches)) |
+ |
+ def _AssertHasNo(self, current_acl, perm, scope, value=None): |
+ matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value)) |
+ self.assertEqual(0, len(matches)) |
+ |
+ def _YieldMatchingEntries(self, current_acl, perm, scope, value=None): |
+ """Generator that finds entries that match the change descriptor.""" |
+ for entry in current_acl.entries.entry_list: |
+ if entry.scope.type == scope: |
+ if scope in ['UserById', 'GroupById']: |
+ if value == entry.scope.id: |
+ yield entry |
+ elif scope in ['UserByEmail', 'GroupByEmail']: |
+ if value == entry.scope.email_address: |
+ yield entry |
+ elif scope == 'GroupByDomain': |
+ if value == entry.scope.domain: |
+ yield entry |
+ elif scope in ['AllUsers', 'AllAuthenticatedUsers']: |
+ yield entry |
+ else: |
+ raise CommandException('Found an unrecognized ACL ' |
+ 'entry type, aborting.') |
+ |
+ def _MakeScopeRegex(self, scope_type, email_address, perm): |
+ template_regex = ( |
+ r'<Scope type="{0}">\s*<EmailAddress>\s*{1}\s*</EmailAddress>\s*' |
+ r'</Scope>\s*<Permission>\s*{2}\s*</Permission>') |
+ return template_regex.format(scope_type, email_address, perm) |
+ |
+ def testBucketAclChange(self): |
+ test_regex = self._MakeScopeRegex( |
+ 'UserByEmail', self.USER_TEST_ADDRESS, 'FULL_CONTROL') |
+ xml = self.RunGsUtil( |
+ ['getacl', suri(self.sample_uri)], return_stdout=True) |
+ self.assertNotRegexpMatches(xml, test_regex) |
+ |
+ self.RunGsUtil( |
+ ['chacl', '-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)]) |
+ xml = self.RunGsUtil( |
+ ['getacl', suri(self.sample_uri)], return_stdout=True) |
+ self.assertRegexpMatches(xml, test_regex) |
+ |
+ self.RunGsUtil( |
+ ['chacl', '-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)]) |
+ xml = self.RunGsUtil( |
+ ['getacl', suri(self.sample_uri)], return_stdout=True) |
+ self.assertNotRegexpMatches(xml, test_regex) |
+ |
+ def testObjectAclChange(self): |
+ obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') |
+ test_regex = self._MakeScopeRegex( |
+ 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ') |
+ xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True) |
+ self.assertNotRegexpMatches(xml, test_regex) |
+ |
+ self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)]) |
+ xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True) |
+ self.assertRegexpMatches(xml, test_regex) |
+ |
+ self.RunGsUtil(['chacl', '-d', self.GROUP_TEST_ADDRESS, suri(obj)]) |
+ xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True) |
+ self.assertNotRegexpMatches(xml, test_regex) |
+ |
+ def testMultithreadedAclChange(self, count=10): |
+ objects = [] |
+ for i in range(count): |
+ objects.append(self.CreateObject( |
+ bucket_uri=self.sample_uri, |
+ contents='something {0}'.format(i))) |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ') |
+ xmls = [] |
+ for obj in objects: |
+ xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)) |
+ for xml in xmls: |
+ self.assertNotRegexpMatches(xml, test_regex) |
+ |
+ uris = [suri(obj) for obj in objects] |
+ self.RunGsUtil(['-m', '-DD', 'chacl', '-g', |
+ self.GROUP_TEST_ADDRESS+':READ'] + uris) |
+ |
+ xmls = [] |
+ for obj in objects: |
+ xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)) |
+ for xml in xmls: |
+ self.assertRegexpMatches(xml, test_regex) |
+ |
+ def testMultiVersionSupport(self): |
+ bucket = self.CreateVersionedBucket() |
+ object_name = self.MakeTempName('obj') |
+ obj = self.CreateObject( |
+ bucket_uri=bucket, object_name=object_name, contents='One thing') |
+ # Create another on the same URI, giving us a second version. |
+ self.CreateObject( |
+ bucket_uri=bucket, object_name=object_name, contents='Another thing') |
+ |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, delay=1, backoff=1, logger=self.logger) |
+ def _getObjects(): |
+ stdout = self.RunGsUtil(['ls', '-a', suri(obj)], return_stdout=True) |
+ lines = stdout.strip().split('\n') |
+ self.assertEqual(len(lines), 2) |
+ return lines |
+ |
+ obj_v1, obj_v2 = _getObjects() |
+ |
+ test_regex = self._MakeScopeRegex( |
+ 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ') |
+ xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True) |
+ self.assertNotRegexpMatches(xml, test_regex) |
+ |
+ self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1]) |
+ xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True) |
+ self.assertRegexpMatches(xml, test_regex) |
+ |
+ xml = self.RunGsUtil(['getacl', obj_v2], return_stdout=True) |
+ self.assertNotRegexpMatches(xml, test_regex) |