Index: third_party/gsutil/gslib/tests/test_ls.py |
diff --git a/third_party/gsutil/gslib/tests/test_ls.py b/third_party/gsutil/gslib/tests/test_ls.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f960cb204c06eaa8e3124fb45cbb3cb5b690c9c4 |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/test_ls.py |
@@ -0,0 +1,91 @@ |
+# Copyright 2013 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+ |
+import gslib.tests.testcase as testcase |
+from gslib.util import Retry |
+from gslib.tests.util import ObjectToURI as suri |
+ |
+ |
+class TestLs(testcase.GsUtilIntegrationTestCase): |
+ """Integration tests for ls command.""" |
+ |
+ def test_blank_ls(self): |
+ self.RunGsUtil(['ls']) |
+ |
+ def test_empty_bucket(self): |
+ bucket_uri = self.CreateBucket() |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, delay=1, backoff=1) |
+ def _Check1(): |
+ stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True) |
+ self.assertEqual('', stdout) |
+ _Check1() |
+ |
+ def test_empty_bucket_with_b(self): |
+ bucket_uri = self.CreateBucket() |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, delay=1, backoff=1) |
+ def _Check1(): |
+ stdout = self.RunGsUtil(['ls', '-b', suri(bucket_uri)], |
+ return_stdout=True) |
+ self.assertEqual('%s/\n' % suri(bucket_uri), stdout) |
+ _Check1() |
+ |
+ def test_with_one_object(self): |
+ bucket_uri = self.CreateBucket(test_objects=1) |
+ objuri = [suri(bucket_uri.clone_replace_name(key.name)) |
+ for key in bucket_uri.list_bucket()][0] |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, delay=1, backoff=1) |
+ def _Check1(): |
+ stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True) |
+ self.assertEqual('%s\n' % objuri, stdout) |
+ _Check1() |
+ |
+ def test_subdir(self): |
+ bucket_uri = self.CreateBucket(test_objects=1) |
+ k1_uri = bucket_uri.clone_replace_name('foo') |
+ k1_uri.set_contents_from_string('baz') |
+ k2_uri = bucket_uri.clone_replace_name('dir/foo') |
+ k2_uri.set_contents_from_string('bar') |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, delay=1, backoff=1) |
+ def _Check1(): |
+ stdout = self.RunGsUtil(['ls', '%s/dir' % suri(bucket_uri)], |
+ return_stdout=True) |
+ self.assertEqual('%s\n' % suri(k2_uri), stdout) |
+ stdout = self.RunGsUtil(['ls', suri(k1_uri)], return_stdout=True) |
+ self.assertEqual('%s\n' % suri(k1_uri), stdout) |
+ _Check1() |
+ |
+ def test_versioning(self): |
+ bucket1_uri = self.CreateBucket(test_objects=1) |
+ bucket2_uri = self.CreateVersionedBucket(test_objects=1) |
+ bucket_list = list(bucket1_uri.list_bucket()) |
+ objuri = [bucket1_uri.clone_replace_key(key).versionless_uri |
+ for key in bucket_list][0] |
+ self.RunGsUtil(['cp', objuri, suri(bucket2_uri)]) |
+ self.RunGsUtil(['cp', objuri, suri(bucket2_uri)]) |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, delay=1, backoff=1) |
+ def _Check1(): |
+ stdout = self.RunGsUtil(['ls', '-a', suri(bucket2_uri)], |
+ return_stdout=True) |
+ self.assertNumLines(stdout, 3) |
+ stdout = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)], |
+ return_stdout=True) |
+ self.assertIn('%s#' % bucket2_uri.clone_replace_name(bucket_list[0].name), |
+ stdout) |
+ self.assertIn('meta_generation=', stdout) |
+ _Check1() |