OLD | NEW |
(Empty) | |
| 1 # Copyright 2013 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 |
| 15 |
| 16 from gslib.command import _ThreadedLogger as ThreadedLogger |
| 17 import gslib.commands.chacl as chacl |
| 18 import gslib.tests.testcase as case |
| 19 from gslib.tests.util import ObjectToURI as suri |
| 20 from gslib.util import Retry |
| 21 |
| 22 |
| 23 class ChaclIntegrationTest(case.GsUtilIntegrationTestCase): |
| 24 """Tests gslib.commands.chacl.""" |
| 25 logger = ThreadedLogger() |
| 26 |
| 27 def setUp(self): |
| 28 super(ChaclIntegrationTest, self).setUp() |
| 29 self.sample_uri = self.CreateBucket() |
| 30 self.logger = ThreadedLogger() |
| 31 |
| 32 def testAclChangeWithUserId(self): |
| 33 change = chacl.AclChange(self.USER_TEST_ID + ':r', |
| 34 scope_type=chacl.ChangeType.USER, |
| 35 logger=self.logger) |
| 36 acl = self.sample_uri.get_acl() |
| 37 change.Execute(self.sample_uri, acl) |
| 38 self._AssertHas(acl, 'READ', 'UserById', self.USER_TEST_ID) |
| 39 |
| 40 def testAclChangeWithGroupId(self): |
| 41 change = chacl.AclChange(self.GROUP_TEST_ID + ':r', |
| 42 scope_type=chacl.ChangeType.GROUP, |
| 43 logger=self.logger) |
| 44 acl = self.sample_uri.get_acl() |
| 45 change.Execute(self.sample_uri, acl) |
| 46 self._AssertHas(acl, 'READ', 'GroupById', self.GROUP_TEST_ID) |
| 47 |
| 48 def testAclChangeWithUserEmail(self): |
| 49 change = chacl.AclChange(self.USER_TEST_ADDRESS + ':r', |
| 50 scope_type=chacl.ChangeType.USER, |
| 51 logger=self.logger) |
| 52 acl = self.sample_uri.get_acl() |
| 53 change.Execute(self.sample_uri, acl) |
| 54 self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
| 55 |
| 56 def testAclChangeWithGroupEmail(self): |
| 57 change = chacl.AclChange(self.GROUP_TEST_ADDRESS + ':fc', |
| 58 scope_type=chacl.ChangeType.GROUP, |
| 59 logger=self.logger) |
| 60 acl = self.sample_uri.get_acl() |
| 61 change.Execute(self.sample_uri, acl) |
| 62 self._AssertHas(acl, 'FULL_CONTROL', 'GroupByEmail', |
| 63 self.GROUP_TEST_ADDRESS) |
| 64 |
| 65 def testAclChangeWithDomain(self): |
| 66 change = chacl.AclChange(self.DOMAIN_TEST + ':READ', |
| 67 scope_type=chacl.ChangeType.GROUP, |
| 68 logger=self.logger) |
| 69 acl = self.sample_uri.get_acl() |
| 70 change.Execute(self.sample_uri, acl) |
| 71 self._AssertHas(acl, 'READ', 'GroupByDomain', self.DOMAIN_TEST) |
| 72 |
| 73 def testAclChangeWithAllUsers(self): |
| 74 change = chacl.AclChange('AllUsers:WRITE', |
| 75 scope_type=chacl.ChangeType.GROUP, |
| 76 logger=self.logger) |
| 77 acl = self.sample_uri.get_acl() |
| 78 change.Execute(self.sample_uri, acl) |
| 79 self._AssertHas(acl, 'WRITE', 'AllUsers') |
| 80 |
| 81 def testAclChangeWithAllAuthUsers(self): |
| 82 change = chacl.AclChange('AllAuthenticatedUsers:READ', |
| 83 scope_type=chacl.ChangeType.GROUP, |
| 84 logger=self.logger) |
| 85 acl = self.sample_uri.get_acl() |
| 86 change.Execute(self.sample_uri, acl) |
| 87 self._AssertHas(acl, 'READ', 'AllAuthenticatedUsers') |
| 88 |
| 89 def testAclDelWithUser(self): |
| 90 add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ', |
| 91 scope_type=chacl.ChangeType.USER, |
| 92 logger=self.logger) |
| 93 acl = self.sample_uri.get_acl() |
| 94 add.Execute(self.sample_uri, acl) |
| 95 self._AssertHas(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
| 96 |
| 97 remove = chacl.AclDel(self.USER_TEST_ADDRESS, |
| 98 logger=self.logger) |
| 99 remove.Execute(self.sample_uri, acl) |
| 100 self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) |
| 101 |
| 102 def testAclDelWithGroup(self): |
| 103 add = chacl.AclChange(self.USER_TEST_ADDRESS + ':READ', |
| 104 scope_type=chacl.ChangeType.GROUP, |
| 105 logger=self.logger) |
| 106 acl = self.sample_uri.get_acl() |
| 107 add.Execute(self.sample_uri, acl) |
| 108 self._AssertHas(acl, 'READ', 'GroupByEmail', self.USER_TEST_ADDRESS) |
| 109 |
| 110 remove = chacl.AclDel(self.USER_TEST_ADDRESS, |
| 111 logger=self.logger) |
| 112 remove.Execute(self.sample_uri, acl) |
| 113 self._AssertHasNo(acl, 'READ', 'GroupByEmail', self.GROUP_TEST_ADDRESS) |
| 114 |
| 115 # |
| 116 # Here are a whole lot of verbose asserts |
| 117 # |
| 118 |
| 119 def _AssertHas(self, current_acl, perm, scope, value=None): |
| 120 matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value)) |
| 121 self.assertEqual(1, len(matches)) |
| 122 |
| 123 def _AssertHasNo(self, current_acl, perm, scope, value=None): |
| 124 matches = list(self._YieldMatchingEntries(current_acl, perm, scope, value)) |
| 125 self.assertEqual(0, len(matches)) |
| 126 |
| 127 def _YieldMatchingEntries(self, current_acl, perm, scope, value=None): |
| 128 """Generator that finds entries that match the change descriptor.""" |
| 129 for entry in current_acl.entries.entry_list: |
| 130 if entry.scope.type == scope: |
| 131 if scope in ['UserById', 'GroupById']: |
| 132 if value == entry.scope.id: |
| 133 yield entry |
| 134 elif scope in ['UserByEmail', 'GroupByEmail']: |
| 135 if value == entry.scope.email_address: |
| 136 yield entry |
| 137 elif scope == 'GroupByDomain': |
| 138 if value == entry.scope.domain: |
| 139 yield entry |
| 140 elif scope in ['AllUsers', 'AllAuthenticatedUsers']: |
| 141 yield entry |
| 142 else: |
| 143 raise CommandException('Found an unrecognized ACL ' |
| 144 'entry type, aborting.') |
| 145 |
| 146 def _MakeScopeRegex(self, scope_type, email_address, perm): |
| 147 template_regex = ( |
| 148 r'<Scope type="{0}">\s*<EmailAddress>\s*{1}\s*</EmailAddress>\s*' |
| 149 r'</Scope>\s*<Permission>\s*{2}\s*</Permission>') |
| 150 return template_regex.format(scope_type, email_address, perm) |
| 151 |
| 152 def testBucketAclChange(self): |
| 153 test_regex = self._MakeScopeRegex( |
| 154 'UserByEmail', self.USER_TEST_ADDRESS, 'FULL_CONTROL') |
| 155 xml = self.RunGsUtil( |
| 156 ['getacl', suri(self.sample_uri)], return_stdout=True) |
| 157 self.assertNotRegexpMatches(xml, test_regex) |
| 158 |
| 159 self.RunGsUtil( |
| 160 ['chacl', '-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)]) |
| 161 xml = self.RunGsUtil( |
| 162 ['getacl', suri(self.sample_uri)], return_stdout=True) |
| 163 self.assertRegexpMatches(xml, test_regex) |
| 164 |
| 165 self.RunGsUtil( |
| 166 ['chacl', '-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)]) |
| 167 xml = self.RunGsUtil( |
| 168 ['getacl', suri(self.sample_uri)], return_stdout=True) |
| 169 self.assertNotRegexpMatches(xml, test_regex) |
| 170 |
| 171 def testObjectAclChange(self): |
| 172 obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') |
| 173 test_regex = self._MakeScopeRegex( |
| 174 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ') |
| 175 xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True) |
| 176 self.assertNotRegexpMatches(xml, test_regex) |
| 177 |
| 178 self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)]) |
| 179 xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True) |
| 180 self.assertRegexpMatches(xml, test_regex) |
| 181 |
| 182 self.RunGsUtil(['chacl', '-d', self.GROUP_TEST_ADDRESS, suri(obj)]) |
| 183 xml = self.RunGsUtil(['getacl', suri(obj)], return_stdout=True) |
| 184 self.assertNotRegexpMatches(xml, test_regex) |
| 185 |
| 186 def testMultithreadedAclChange(self, count=10): |
| 187 objects = [] |
| 188 for i in range(count): |
| 189 objects.append(self.CreateObject( |
| 190 bucket_uri=self.sample_uri, |
| 191 contents='something {0}'.format(i))) |
| 192 |
| 193 test_regex = self._MakeScopeRegex( |
| 194 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ') |
| 195 xmls = [] |
| 196 for obj in objects: |
| 197 xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)) |
| 198 for xml in xmls: |
| 199 self.assertNotRegexpMatches(xml, test_regex) |
| 200 |
| 201 uris = [suri(obj) for obj in objects] |
| 202 self.RunGsUtil(['-m', '-DD', 'chacl', '-g', |
| 203 self.GROUP_TEST_ADDRESS+':READ'] + uris) |
| 204 |
| 205 xmls = [] |
| 206 for obj in objects: |
| 207 xmls.append(self.RunGsUtil(['getacl', suri(obj)], return_stdout=True)) |
| 208 for xml in xmls: |
| 209 self.assertRegexpMatches(xml, test_regex) |
| 210 |
| 211 def testMultiVersionSupport(self): |
| 212 bucket = self.CreateVersionedBucket() |
| 213 object_name = self.MakeTempName('obj') |
| 214 obj = self.CreateObject( |
| 215 bucket_uri=bucket, object_name=object_name, contents='One thing') |
| 216 # Create another on the same URI, giving us a second version. |
| 217 self.CreateObject( |
| 218 bucket_uri=bucket, object_name=object_name, contents='Another thing') |
| 219 |
| 220 # Use @Retry as hedge against bucket listing eventual consistency. |
| 221 @Retry(AssertionError, tries=3, delay=1, backoff=1, logger=self.logger) |
| 222 def _getObjects(): |
| 223 stdout = self.RunGsUtil(['ls', '-a', suri(obj)], return_stdout=True) |
| 224 lines = stdout.strip().split('\n') |
| 225 self.assertEqual(len(lines), 2) |
| 226 return lines |
| 227 |
| 228 obj_v1, obj_v2 = _getObjects() |
| 229 |
| 230 test_regex = self._MakeScopeRegex( |
| 231 'GroupByEmail', self.GROUP_TEST_ADDRESS, 'READ') |
| 232 xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True) |
| 233 self.assertNotRegexpMatches(xml, test_regex) |
| 234 |
| 235 self.RunGsUtil(['chacl', '-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1]) |
| 236 xml = self.RunGsUtil(['getacl', obj_v1], return_stdout=True) |
| 237 self.assertRegexpMatches(xml, test_regex) |
| 238 |
| 239 xml = self.RunGsUtil(['getacl', obj_v2], return_stdout=True) |
| 240 self.assertNotRegexpMatches(xml, test_regex) |
OLD | NEW |