Index: third_party/gsutil/boto/tests/integration/s3/test_multidelete.py |
diff --git a/third_party/gsutil/boto/tests/integration/s3/test_multidelete.py b/third_party/gsutil/boto/tests/integration/s3/test_multidelete.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b22581bbae3354df258fd927fd54c94c883001f5 |
--- /dev/null |
+++ b/third_party/gsutil/boto/tests/integration/s3/test_multidelete.py |
@@ -0,0 +1,181 @@ |
+# -*- coding: utf-8 -*- |
+ |
+# Copyright (c) 2011 Mitch Garnaat http://garnaat.org/ |
+# All rights reserved. |
+# |
+# Permission is hereby granted, free of charge, to any person obtaining a |
+# copy of this software and associated documentation files (the |
+# "Software"), to deal in the Software without restriction, including |
+# without limitation the rights to use, copy, modify, merge, publish, dis- |
+# tribute, sublicense, and/or sell copies of the Software, and to permit |
+# persons to whom the Software is furnished to do so, subject to the fol- |
+# lowing conditions: |
+# |
+# The above copyright notice and this permission notice shall be included |
+# in all copies or substantial portions of the Software. |
+# |
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
+# IN THE SOFTWARE. |
+ |
+""" |
+Some unit tests for the S3 MultiDelete |
+""" |
+ |
+import unittest |
+import time |
+from boto.s3.key import Key |
+from boto.s3.deletemarker import DeleteMarker |
+from boto.s3.prefix import Prefix |
+from boto.s3.connection import S3Connection |
+from boto.exception import S3ResponseError |
+ |
+class S3MultiDeleteTest(unittest.TestCase): |
+ s3 = True |
+ |
+ def setUp(self): |
+ self.conn = S3Connection() |
+ self.bucket_name = 'multidelete-%d' % int(time.time()) |
+ self.bucket = self.conn.create_bucket(self.bucket_name) |
+ |
+ def tearDown(self): |
+ for key in self.bucket: |
+ key.delete() |
+ self.bucket.delete() |
+ |
+ def test_delete_nothing(self): |
+ result = self.bucket.delete_keys([]) |
+ self.assertEqual(len(result.deleted), 0) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ def test_delete_illegal(self): |
+ result = self.bucket.delete_keys([{"dict":"notallowed"}]) |
+ self.assertEqual(len(result.deleted), 0) |
+ self.assertEqual(len(result.errors), 1) |
+ |
+ def test_delete_mix(self): |
+ result = self.bucket.delete_keys(["king", |
+ ("mice", None), |
+ Key(name="regular"), |
+ Key(), |
+ Prefix(name="folder/"), |
+ DeleteMarker(name="deleted"), |
+ {"bad":"type"}]) |
+ self.assertEqual(len(result.deleted), 4) |
+ self.assertEqual(len(result.errors), 3) |
+ |
+ def test_delete_quietly(self): |
+ result = self.bucket.delete_keys(["king"], quiet=True) |
+ self.assertEqual(len(result.deleted), 0) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ def test_delete_must_escape(self): |
+ result = self.bucket.delete_keys([Key(name=">_<;")]) |
+ self.assertEqual(len(result.deleted), 1) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ def test_delete_unknown_version(self): |
+ no_ver = Key(name="no") |
+ no_ver.version_id = "version" |
+ result = self.bucket.delete_keys([no_ver]) |
+ self.assertEqual(len(result.deleted), 0) |
+ self.assertEqual(len(result.errors), 1) |
+ |
+ def test_delete_kanji(self): |
+ result = self.bucket.delete_keys([u"漢字", Key(name=u"日本語")]) |
+ self.assertEqual(len(result.deleted), 2) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ def test_delete_empty_by_list(self): |
+ result = self.bucket.delete_keys(self.bucket.list()) |
+ self.assertEqual(len(result.deleted), 0) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ def test_delete_kanji_by_list(self): |
+ for key_name in [u"漢字", u"日本語", u"テスト"]: |
+ key = self.bucket.new_key(key_name) |
+ key.set_contents_from_string('this is a test') |
+ result = self.bucket.delete_keys(self.bucket.list()) |
+ self.assertEqual(len(result.deleted), 3) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ def test_delete_with_prefixes(self): |
+ for key_name in ["a", "a/b", "b"]: |
+ key = self.bucket.new_key(key_name) |
+ key.set_contents_from_string('this is a test') |
+ |
+ # First delete all "files": "a" and "b" |
+ result = self.bucket.delete_keys(self.bucket.list(delimiter="/")) |
+ self.assertEqual(len(result.deleted), 2) |
+ # Using delimiter will cause 1 common prefix to be listed |
+ # which will be skipped as an error. |
+ self.assertEqual(len(result.errors), 1) |
+ self.assertEqual(result.errors[0].key, "a/") |
+ |
+ # Next delete any remaining objects: "a/b" |
+ result = self.bucket.delete_keys(self.bucket.list()) |
+ self.assertEqual(len(result.deleted), 1) |
+ self.assertEqual(len(result.errors), 0) |
+ self.assertEqual(result.deleted[0].key, "a/b") |
+ |
+ def test_delete_too_many_versions(self): |
+ # configure versioning first |
+ self.bucket.configure_versioning(True) |
+ |
+ # Add 1000 initial versions as DMs by deleting them :-) |
+ # Adding 1000 objects is painful otherwise... |
+ key_names = ['key-%03d' % i for i in range(0, 1000)] |
+ result = self.bucket.delete_keys(key_names) |
+ self.assertEqual(len(result.deleted), 1000) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ # delete them again to create 1000 more delete markers |
+ result = self.bucket.delete_keys(key_names) |
+ self.assertEqual(len(result.deleted), 1000) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ # Sometimes takes AWS sometime to settle |
+ time.sleep(10) |
+ |
+ # delete all versions to delete 2000 objects. |
+ # this tests the 1000 limit. |
+ result = self.bucket.delete_keys(self.bucket.list_versions()) |
+ self.assertEqual(len(result.deleted), 2000) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ def test_1(self): |
+ nkeys = 100 |
+ |
+ # create a bunch of keynames |
+ key_names = ['key-%03d' % i for i in range(0, nkeys)] |
+ |
+ # create the corresponding keys |
+ for key_name in key_names: |
+ key = self.bucket.new_key(key_name) |
+ key.set_contents_from_string('this is a test') |
+ |
+ # now count keys in bucket |
+ n = 0 |
+ for key in self.bucket: |
+ n += 1 |
+ |
+ self.assertEqual(n, nkeys) |
+ |
+ # now delete them all |
+ result = self.bucket.delete_keys(key_names) |
+ |
+ self.assertEqual(len(result.deleted), nkeys) |
+ self.assertEqual(len(result.errors), 0) |
+ |
+ time.sleep(5) |
+ |
+ # now count keys in bucket |
+ n = 0 |
+ for key in self.bucket: |
+ n += 1 |
+ |
+ self.assertEqual(n, 0) |