Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(32)

Side by Side Diff: third_party/gsutil/boto/tests/integration/s3/test_versioning.py

Issue 12317103: Added gsutil to depot tools (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: added readme Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2010 Mitch Garnaat http://garnaat.org/
2 # Copyright (c) 2010, Eucalyptus Systems, Inc.
3 # All rights reserved.
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a
6 # copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish, dis-
9 # tribute, sublicense, and/or sell copies of the Software, and to permit
10 # persons to whom the Software is furnished to do so, subject to the fol-
11 # lowing conditions:
12 #
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
15 #
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
17 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
18 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
19 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
20 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 # IN THE SOFTWARE.
23
24 """
25 Some unit tests for the S3 Versioning.
26 """
27
28 import unittest
29 import time
30 from boto.s3.connection import S3Connection
31 from boto.exception import S3ResponseError
32 from boto.s3.deletemarker import DeleteMarker
33
34 class S3VersionTest (unittest.TestCase):
35
36 def setUp(self):
37 self.conn = S3Connection()
38 self.bucket_name = 'version-%d' % int(time.time())
39 self.bucket = self.conn.create_bucket(self.bucket_name)
40
41 def tearDown(self):
42 for k in self.bucket.list_versions():
43 self.bucket.delete_key(k.name, version_id=k.version_id)
44 self.bucket.delete()
45
46 def test_1_versions(self):
47 # check versioning off
48 d = self.bucket.get_versioning_status()
49 self.assertFalse('Versioning' in d)
50
51 # enable versioning
52 self.bucket.configure_versioning(versioning=True)
53 d = self.bucket.get_versioning_status()
54 self.assertEqual('Enabled', d['Versioning'])
55
56 # create a new key in the versioned bucket
57 k = self.bucket.new_key("foobar")
58 s1 = 'This is v1'
59 k.set_contents_from_string(s1)
60
61 # remember the version id of this object
62 v1 = k.version_id
63
64 # now get the contents from s3
65 o1 = k.get_contents_as_string()
66
67 # check to make sure content read from k is identical to original
68 self.assertEqual(s1, o1)
69
70 # now overwrite that same key with new data
71 s2 = 'This is v2'
72 k.set_contents_from_string(s2)
73 v2 = k.version_id
74
75 # now retrieve latest contents as a string and compare
76 k2 = self.bucket.new_key("foobar")
77 o2 = k2.get_contents_as_string()
78 self.assertEqual(s2, o2)
79
80 # next retrieve explicit versions and compare
81 o1 = k.get_contents_as_string(version_id=v1)
82 o2 = k.get_contents_as_string(version_id=v2)
83 self.assertEqual(s1, o1)
84 self.assertEqual(s2, o2)
85
86 # Now list all versions and compare to what we have
87 rs = self.bucket.get_all_versions()
88 self.assertEqual(v2, rs[0].version_id)
89 self.assertEqual(v1, rs[1].version_id)
90
91 # Now do a regular list command and make sure only the new key shows up
92 rs = self.bucket.get_all_keys()
93 self.assertEqual(1, len(rs))
94
95 # Now do regular delete
96 self.bucket.delete_key('foobar')
97
98 # Now list versions and make sure old versions are there
99 # plus the DeleteMarker which is latest.
100 rs = self.bucket.get_all_versions()
101 self.assertEqual(3, len(rs))
102 self.assertTrue(isinstance(rs[0], DeleteMarker))
103
104 # Now delete v1 of the key
105 self.bucket.delete_key('foobar', version_id=v1)
106
107 # Now list versions again and make sure v1 is not there
108 rs = self.bucket.get_all_versions()
109 versions = [k.version_id for k in rs]
110 self.assertTrue(v1 not in versions)
111 self.assertTrue(v2 in versions)
112
113 # Now suspend Versioning on the bucket
114 self.bucket.configure_versioning(False)
115 # Allow time for the change to fully propagate.
116 time.sleep(3)
117 d = self.bucket.get_versioning_status()
118 self.assertEqual('Suspended', d['Versioning'])
119
120 def test_latest_version(self):
121 self.bucket.configure_versioning(versioning=True)
122
123 # add v1 of an object
124 key_name = "key"
125 kv1 = self.bucket.new_key(key_name)
126 kv1.set_contents_from_string("v1")
127
128 # read list which should contain latest v1
129 listed_kv1 = iter(self.bucket.get_all_versions()).next()
130 self.assertEqual(listed_kv1.name, key_name)
131 self.assertEqual(listed_kv1.version_id, kv1.version_id)
132 self.assertEqual(listed_kv1.is_latest, True)
133
134 # add v2 of the object
135 kv2 = self.bucket.new_key(key_name)
136 kv2.set_contents_from_string("v2")
137
138 # read 2 versions, confirm v2 is latest
139 i = iter(self.bucket.get_all_versions())
140 listed_kv2 = i.next()
141 listed_kv1 = i.next()
142 self.assertEqual(listed_kv2.version_id, kv2.version_id)
143 self.assertEqual(listed_kv1.version_id, kv1.version_id)
144 self.assertEqual(listed_kv2.is_latest, True)
145 self.assertEqual(listed_kv1.is_latest, False)
146
147 # delete key, which creates a delete marker as latest
148 self.bucket.delete_key(key_name)
149 i = iter(self.bucket.get_all_versions())
150 listed_kv3 = i.next()
151 listed_kv2 = i.next()
152 listed_kv1 = i.next()
153 self.assertNotEqual(listed_kv3.version_id, None)
154 self.assertEqual(listed_kv2.version_id, kv2.version_id)
155 self.assertEqual(listed_kv1.version_id, kv1.version_id)
156 self.assertEqual(listed_kv3.is_latest, True)
157 self.assertEqual(listed_kv2.is_latest, False)
158 self.assertEqual(listed_kv1.is_latest, False)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698