Chromium Code Reviews

Unified Diff: third_party/gsutil/gslib/tests/test_chacl.py

Issue 12317103: Added gsutil to depot tools (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: added readme Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Index: third_party/gsutil/gslib/tests/test_chacl.py
diff --git a/third_party/gsutil/gslib/tests/test_chacl.py b/third_party/gsutil/gslib/tests/test_chacl.py
new file mode 100644
index 0000000000000000000000000000000000000000..c5081025f5e7f83f38f49f7f444b98887ead6639
--- /dev/null
+++ b/third_party/gsutil/gslib/tests/test_chacl.py
@@ -0,0 +1,240 @@
+# Copyright 2013 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from gslib.command import _ThreadedLogger as ThreadedLogger
+import gslib.commands.chacl as chacl
+import gslib.tests.testcase as case
+from gslib.tests.util import ObjectToURI as suri
+from gslib.util import Retry
+
+
+class ChaclIntegrationTest(case.GsUtilIntegrationTestCase):
+ """Tests gslib.commands.chacl."""
+ logger = ThreadedLogger()
+
+ def setUp(self):
+ super(ChaclIntegrationTest, self).setUp()
+ self.sample_uri = self.CreateBucket()
+ self.logger = ThreadedLogger()
+
+ def testAclChangeWithUserId(self):
+ change = chacl.AclChange(self.USER_TEST_ID + ':r',
+ scope_type=chacl.ChangeType.USER,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ change.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'READ', 'UserById', self.USER_TEST_ID)
+
+ def testAclChangeWithGroupId(self):
+ change = chacl.AclChange(self.GROUP_TEST_ID + ':r',
+ scope_type=chacl.ChangeType.GROUP,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ change.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'READ', 'GroupById', self.GROUP_TEST_ID)
+
+ def testAclChangeWithUserEmail(self):
+ change = chacl.AclChange(self.USER_TEST_ADDRESS + ':r',
+ scope_type=chacl.ChangeType.USER,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ change.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
+
+ def testAclChangeWithGroupEmail(self):
+ change = chacl.AclChange(self.GROUP_TEST_ADDRESS + ':fc',
+ scope_type=chacl.ChangeType.GROUP,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ change.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'FULL_CONTROL', 'GroupByEmail',
+ self.GROUP_TEST_ADDRESS)
+
+ def testAclChangeWithDomain(self):
+ change = chacl.AclChange(self.DOMAIN_TEST + ':READ',
+ scope_type=chacl.ChangeType.GROUP,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ change.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'READ', 'GroupByDomain', self.DOMAIN_TEST)
+
+ def testAclChangeWithAllUsers(self):
+ change = chacl.AclChange('AllUsers:WRITE',
+ scope_type=chacl.ChangeType.GROUP,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ change.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'WRITE', 'AllUsers')
+
+ def testAclChangeWithAllAuthUsers(self):
+ change = chacl.AclChange('AllAuthenticatedUsers:READ',
+ scope_type=chacl.ChangeType.GROUP,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ change.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'READ', 'AllAuthenticatedUsers')
+
+ def testAclDelWithUser(self):
+ add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ',
+ scope_type=chacl.ChangeType.USER,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ add.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
+
+ remove = chacl.AclDel(self.USER_TEST_ADDRESS,
+ logger=self.logger)
+ remove.Execute(self.sample_uri, acl)
+ self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
+
+ def testAclDelWithGroup(self):
+ add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ',
+ scope_type=chacl.ChangeType.GROUP,
+ logger=self.logger)
+ acl = self.sample_uri.get_acl()
+ add.Execute(self.sample_uri, acl)
+ self._AssertHas(acl, 'READ', 'GroupByEmail', self.USER_TEST_ADDRESS)
+
+ remove = chacl.AclDel(self.USER_TEST_ADDRESS,
+ logger=self.logger)
+ remove.Execute(self.sample_uri, acl)
+ self._AssertHasNo(acl, 'READ', 'GroupByEmail', self.GROUP_TEST_ADDRESS)
+
+ #
+ # Here are a whole lot of verbose asserts
+ #
+
+ def _AssertHas(self, current_acl, perm, scope, value=None):
+ matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value))
+ self.assertEqual(1, len(matches))
+
+ def _AssertHasNo(self, current_acl, perm, scope, value=None):
+ matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value))
+ self.assertEqual(0, len(matches))
+
+ def _YieldMatchingEntries(self, current_acl, perm, scope, value=None):
+ """Generator that finds entries that match the change descriptor."""
+ for entry in current_acl.entries.entry_list:
+ if entry.scope.type == scope:
+ if scope in ['UserById', 'GroupById']:
+ if value == entry.scope.id:
+ yield entry
+ elif scope in ['UserByEmail', 'GroupByEmail']:
+ if value == entry.scope.email_address:
+ yield entry
+ elif scope == 'GroupByDomain':
+ if value == entry.scope.domain:
+ yield entry
+ elif scope in ['AllUsers', 'AllAuthenticatedUsers']:
+ yield entry
+ else:
+ raise CommandException('Found an unrecognized ACL '
+ 'entry type, aborting.')
+
+ def _MakeScopeRegex(self, scope_type, email_address, perm):
+ template_regex = (
+ r'<Scope type="{0}">\s*<EmailAddress>\s*{1}\s*</EmailAddress>\s*'
+ r'</Scope>\s*<Permission>\s*{2}\s*</Permission>')
+ return template_regex.format(scope_type, email_address, perm)
+
+ def testBucketAclChange(self):
+ test_regex = self._MakeScopeRegex(
+ 'UserByEmail', self.USER_TEST_ADDRESS, 'FULL_CONTROL')
+ xml = self.RunGsUtil(
+ ['getacl', suri(self.sample_uri)], return_stdout=True)
+ self.assertNotRegexpMatches(xml, test_regex)
+
+ self.RunGsUtil(
+ ['chacl', '-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)])
+ xml = self.RunGsUtil(
+ ['getacl', suri(self.sample_uri)], return_stdout=True)
+ self.assertRegexpMatches(xml, test_regex)
+
+ self.RunGsUtil(
+ ['chacl', '-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)])
+ xml = self.RunGsUtil(
+ ['getacl', suri(self.sample_uri)], return_stdout=True)
+ self.assertNotRegexpMatches(xml, test_regex)
+
+ def testObjectAclChange(self):
+ obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something')
+ test_regex = self._MakeScopeRegex(
+ 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
+ xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)
+ self.assertNotRegexpMatches(xml, test_regex)
+
+ self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)])
+ xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)
+ self.assertRegexpMatches(xml, test_regex)
+
+ self.RunGsUtil(['chacl', '-d', self.GROUP_TEST_ADDRESS, suri(obj)])
+ xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)
+ self.assertNotRegexpMatches(xml, test_regex)
+
+ def testMultithreadedAclChange(self, count=10):
+ objects = []
+ for i in range(count):
+ objects.append(self.CreateObject(
+ bucket_uri=self.sample_uri,
+ contents='something {0}'.format(i)))
+
+ test_regex = self._MakeScopeRegex(
+ 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
+ xmls = []
+ for obj in objects:
+ xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True))
+ for xml in xmls:
+ self.assertNotRegexpMatches(xml, test_regex)
+
+ uris = [suri(obj) for obj in objects]
+ self.RunGsUtil(['-m', '-DD', 'chacl', '-g',
+ self.GROUP_TEST_ADDRESS+':READ'] + uris)
+
+ xmls = []
+ for obj in objects:
+ xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True))
+ for xml in xmls:
+ self.assertRegexpMatches(xml, test_regex)
+
+ def testMultiVersionSupport(self):
+ bucket = self.CreateVersionedBucket()
+ object_name = self.MakeTempName('obj')
+ obj = self.CreateObject(
+ bucket_uri=bucket, object_name=object_name, contents='One thing')
+ # Create another on the same URI, giving us a second version.
+ self.CreateObject(
+ bucket_uri=bucket, object_name=object_name, contents='Another thing')
+
+ # Use @Retry as hedge against bucket listing eventual consistency.
+ @Retry(AssertionError, tries=3, delay=1, backoff=1, logger=self.logger)
+ def _getObjects():
+ stdout = self.RunGsUtil(['ls', '-a', suri(obj)], return_stdout=True)
+ lines = stdout.strip().split('\n')
+ self.assertEqual(len(lines), 2)
+ return lines
+
+ obj_v1, obj_v2 = _getObjects()
+
+ test_regex = self._MakeScopeRegex(
+ 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ')
+ xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True)
+ self.assertNotRegexpMatches(xml, test_regex)
+
+ self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1])
+ xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True)
+ self.assertRegexpMatches(xml, test_regex)
+
+ xml = self.RunGsUtil(['getacl', obj_v2], return_stdout=True)
+ self.assertNotRegexpMatches(xml, test_regex)

Powered by Google App Engine