Index: third_party/gsutil/gslib/tests/test_setacl.py |
diff --git a/third_party/gsutil/gslib/tests/test_setacl.py b/third_party/gsutil/gslib/tests/test_setacl.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..37fbf484042d13c198230655844dd0e533904f9e |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/test_setacl.py |
@@ -0,0 +1,156 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2013 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Integration tests for the setacl and setdefacl commands.""" |
+ |
+import gslib.tests.testcase as testcase |
+import re |
+from gslib.util import Retry |
+from gslib.tests.util import ObjectToURI as suri |
+ |
+PUBLIC_READ_ACL_TEXT = '<Scope type="AllUsers"/><Permission>READ</Permission>' |
+ |
+ |
+class TestSetAcl(testcase.GsUtilIntegrationTestCase): |
+ """Integration tests for setacl command.""" |
+ |
+ def test_set_invalid_acl_object(self): |
+ """Ensures that invalid XML content returns a MalformedACLError.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ inpath = self.CreateTempFile(contents='badXml') |
+ stderr = self.RunGsUtil(['setacl', inpath, obj_uri], return_stderr=True, |
+ expected_status=1) |
+ |
+ self.assertIn('MalformedACLError', stderr) |
+ |
+ def test_set_invalid_acl_bucket(self): |
+ """Ensures that invalid XML content returns a MalformedACLError.""" |
+ bucket_uri = suri(self.CreateBucket()) |
+ inpath = self.CreateTempFile(contents='badXml') |
+ stderr = self.RunGsUtil(['setacl', inpath, bucket_uri], return_stderr=True, |
+ expected_status=1) |
+ |
+ self.assertIn('MalformedACLError', stderr) |
+ |
+ def test_set_valid_acl_object(self): |
+ """Ensures that valid canned and XML ACLs work with get/set.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ acl_string = self.RunGsUtil(['getacl', obj_uri], return_stdout=True) |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ self.RunGsUtil(['setacl', 'public-read', obj_uri]) |
+ acl_string2 = self.RunGsUtil(['getacl', obj_uri], return_stdout=True) |
+ self.RunGsUtil(['setacl', inpath, obj_uri]) |
+ acl_string3 = self.RunGsUtil(['getacl', obj_uri], return_stdout=True) |
+ |
+ self.assertNotEqual(acl_string, acl_string2) |
+ self.assertEqual(acl_string, acl_string3) |
+ |
+ def test_set_valid_permission_whitespace_object(self): |
+ """Ensures that whitespace is allowed in <Permission> elements.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ acl_string = self.RunGsUtil(['getacl', obj_uri], return_stdout=True) |
+ acl_string = re.sub(r'<Permission>', r'<Permission> \n', acl_string) |
+ acl_string = re.sub(r'</Permission>', r'\n </Permission>', acl_string) |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ self.RunGsUtil(['setacl', inpath, obj_uri]) |
+ |
+ def test_set_valid_acl_bucket(self): |
+ """Ensures that valid canned and XML ACLs work with get/set.""" |
+ bucket_uri = suri(self.CreateBucket()) |
+ acl_string = self.RunGsUtil(['getacl', bucket_uri], return_stdout=True) |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ self.RunGsUtil(['setacl', 'public-read', bucket_uri]) |
+ acl_string2 = self.RunGsUtil(['getacl', bucket_uri], return_stdout=True) |
+ self.RunGsUtil(['setacl', inpath, bucket_uri]) |
+ acl_string3 = self.RunGsUtil(['getacl', bucket_uri], return_stdout=True) |
+ |
+ self.assertNotEqual(acl_string, acl_string2) |
+ self.assertEqual(acl_string, acl_string3) |
+ |
+ def test_invalid_canned_acl_object(self): |
+ """Ensures that an invalid canned ACL returns a CommandException.""" |
+ obj_uri = suri(self.CreateObject(contents='foo')) |
+ stderr = self.RunGsUtil(['setacl', 'not-a-canned-acl', |
+ obj_uri], return_stderr=True, expected_status=1) |
+ self.assertIn('CommandException', stderr) |
+ self.assertIn('Invalid canned ACL', stderr) |
+ |
+ def test_set_valid_def_acl_bucket(self): |
+ """Ensures that valid default canned and XML ACLs works with get/set.""" |
+ bucket_uri = self.CreateBucket() |
+ |
+ # Default ACL is project private. |
+ obj_uri1 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo')) |
+ acl_string = self.RunGsUtil(['getacl', obj_uri1], return_stdout=True) |
+ |
+ # Change it to authenticated-read. |
+ self.RunGsUtil(['setdefacl', 'authenticated-read', suri(bucket_uri)]) |
+ obj_uri2 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo2')) |
+ acl_string2 = self.RunGsUtil(['getacl', obj_uri2], return_stdout=True) |
+ |
+ # Now change it back to the default via XML. |
+ inpath = self.CreateTempFile(contents=acl_string) |
+ self.RunGsUtil(['setdefacl', inpath, suri(bucket_uri)]) |
+ obj_uri3 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo3')) |
+ acl_string3 = self.RunGsUtil(['getacl', obj_uri3], return_stdout=True) |
+ |
+ self.assertNotEqual(acl_string, acl_string2) |
+ self.assertIn('AllAuthenticatedUsers', acl_string2) |
+ self.assertEqual(acl_string, acl_string3) |
+ |
+ def test_setacl_version_specific_uri(self): |
+ bucket_uri = self.CreateVersionedBucket() |
+ # Create initial object version. |
+ uri = self.CreateObject(bucket_uri=bucket_uri, contents='data') |
+ # Create a second object version. |
+ inpath = self.CreateTempFile(contents='def') |
+ self.RunGsUtil(['cp', inpath, uri.uri]) |
+ |
+ # Find out the two object version IDs. |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, delay=1, backoff=1) |
+ def _GetVersions(): |
+ stdout = self.RunGsUtil(['ls', '-a', uri.uri], return_stdout=True) |
+ lines = stdout.split('\n') |
+ # There should be 3 lines, counting final \n. |
+ self.assertEqual(len(lines), 3) |
+ return lines[0], lines[1] |
+ |
+ v0_uri_str, v1_uri_str = _GetVersions() |
+ |
+ # Check that neither version currently has public-read permission |
+ # (default ACL is project-private). |
+ orig_acls = [] |
+ for uri_str in (v0_uri_str, v1_uri_str): |
+ acl = self.RunGsUtil(['getacl', uri_str], return_stdout=True) |
+ self.assertNotIn(PUBLIC_READ_ACL_TEXT, self._strip_xml_whitespace(acl)) |
+ orig_acls.append(acl) |
+ |
+ # Set the ACL for the older version of the object to public-read. |
+ self.RunGsUtil(['setacl', 'public-read', v0_uri_str]) |
+ # Check that the older version's ACL is public-read, but newer version |
+ # is not. |
+ acl = self.RunGsUtil(['getacl', v0_uri_str], return_stdout=True) |
+ self.assertIn(PUBLIC_READ_ACL_TEXT, self._strip_xml_whitespace(acl)) |
+ acl = self.RunGsUtil(['getacl', v1_uri_str], return_stdout=True) |
+ self.assertNotIn(PUBLIC_READ_ACL_TEXT, self._strip_xml_whitespace(acl)) |
+ |
+ # Check that reading the ACL with the version-less URI returns the |
+ # original ACL (since the version-less URI means the current version). |
+ acl = self.RunGsUtil(['getacl', uri.uri], return_stdout=True) |
+ self.assertEqual(acl, orig_acls[0]) |
+ |
+ def _strip_xml_whitespace(self, xml): |
+ s = re.sub('>\s*', '>', xml.replace('\n', '')) |
+ return re.sub('\s*<', '<', s) |