Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(205)

Side by Side Diff: third_party/gsutil/boto/tests/unit/s3/test_uri.py

Issue 12317103: Added gsutil to depot tools (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: added readme Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright (c) 2013 Google, Inc.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import boto
17 import tempfile
18 import urllib
19 from boto.exception import InvalidUriError
20 from boto import storage_uri
21 from boto.s3.keyfile import KeyFile
22 from tests.integration.s3.mock_storage_service import MockBucket
23 from tests.integration.s3.mock_storage_service import MockBucketStorageUri
24 from tests.integration.s3.mock_storage_service import MockConnection
25 from tests.unit import unittest
26
27 """Unit tests for StorageUri interface."""
28
29 class UriTest(unittest.TestCase):
30
31 def test_provider_uri(self):
32 for prov in ('gs', 's3'):
33 uri_str = '%s://' % prov
34 uri = boto.storage_uri(uri_str, validate=False,
35 suppress_consec_slashes=False)
36 self.assertEqual(prov, uri.scheme)
37 self.assertEqual(uri_str, uri.uri)
38 self.assertFalse(hasattr(uri, 'versionless_uri'))
39 self.assertEqual('', uri.bucket_name)
40 self.assertEqual('', uri.object_name)
41 self.assertEqual(None, uri.version_id)
42 self.assertEqual(None, uri.generation)
43 self.assertEqual(uri.names_provider(), True)
44 self.assertEqual(uri.names_container(), True)
45 self.assertEqual(uri.names_bucket(), False)
46 self.assertEqual(uri.names_object(), False)
47 self.assertEqual(uri.names_directory(), False)
48 self.assertEqual(uri.names_file(), False)
49 self.assertEqual(uri.is_stream(), False)
50 self.assertEqual(uri.is_version_specific, False)
51
52 def test_bucket_uri_no_trailing_slash(self):
53 for prov in ('gs', 's3'):
54 uri_str = '%s://bucket' % prov
55 uri = boto.storage_uri(uri_str, validate=False,
56 suppress_consec_slashes=False)
57 self.assertEqual(prov, uri.scheme)
58 self.assertEqual('%s/' % uri_str, uri.uri)
59 self.assertFalse(hasattr(uri, 'versionless_uri'))
60 self.assertEqual('bucket', uri.bucket_name)
61 self.assertEqual('', uri.object_name)
62 self.assertEqual(None, uri.version_id)
63 self.assertEqual(None, uri.generation)
64 self.assertEqual(uri.names_provider(), False)
65 self.assertEqual(uri.names_container(), True)
66 self.assertEqual(uri.names_bucket(), True)
67 self.assertEqual(uri.names_object(), False)
68 self.assertEqual(uri.names_directory(), False)
69 self.assertEqual(uri.names_file(), False)
70 self.assertEqual(uri.is_stream(), False)
71 self.assertEqual(uri.is_version_specific, False)
72
73 def test_bucket_uri_with_trailing_slash(self):
74 for prov in ('gs', 's3'):
75 uri_str = '%s://bucket/' % prov
76 uri = boto.storage_uri(uri_str, validate=False,
77 suppress_consec_slashes=False)
78 self.assertEqual(prov, uri.scheme)
79 self.assertEqual(uri_str, uri.uri)
80 self.assertFalse(hasattr(uri, 'versionless_uri'))
81 self.assertEqual('bucket', uri.bucket_name)
82 self.assertEqual('', uri.object_name)
83 self.assertEqual(None, uri.version_id)
84 self.assertEqual(None, uri.generation)
85 self.assertEqual(uri.names_provider(), False)
86 self.assertEqual(uri.names_container(), True)
87 self.assertEqual(uri.names_bucket(), True)
88 self.assertEqual(uri.names_object(), False)
89 self.assertEqual(uri.names_directory(), False)
90 self.assertEqual(uri.names_file(), False)
91 self.assertEqual(uri.is_stream(), False)
92 self.assertEqual(uri.is_version_specific, False)
93
94 def test_non_versioned_object_uri(self):
95 for prov in ('gs', 's3'):
96 uri_str = '%s://bucket/obj/a/b' % prov
97 uri = boto.storage_uri(uri_str, validate=False,
98 suppress_consec_slashes=False)
99 self.assertEqual(prov, uri.scheme)
100 self.assertEqual(uri_str, uri.uri)
101 self.assertEqual(uri_str, uri.versionless_uri)
102 self.assertEqual('bucket', uri.bucket_name)
103 self.assertEqual('obj/a/b', uri.object_name)
104 self.assertEqual(None, uri.version_id)
105 self.assertEqual(None, uri.generation)
106 self.assertEqual(uri.names_provider(), False)
107 self.assertEqual(uri.names_container(), False)
108 self.assertEqual(uri.names_bucket(), False)
109 self.assertEqual(uri.names_object(), True)
110 self.assertEqual(uri.names_directory(), False)
111 self.assertEqual(uri.names_file(), False)
112 self.assertEqual(uri.is_stream(), False)
113 self.assertEqual(uri.is_version_specific, False)
114
115 def test_versioned_gs_object_uri(self):
116 uri_str = 'gs://bucket/obj/a/b#1359908801674000'
117 uri = boto.storage_uri(uri_str, validate=False,
118 suppress_consec_slashes=False)
119 self.assertEqual('gs', uri.scheme)
120 self.assertEqual(uri_str, uri.uri)
121 self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri)
122 self.assertEqual('bucket', uri.bucket_name)
123 self.assertEqual('obj/a/b', uri.object_name)
124 self.assertEqual(None, uri.version_id)
125 self.assertEqual(1359908801674000, uri.generation)
126 self.assertEqual(uri.names_provider(), False)
127 self.assertEqual(uri.names_container(), False)
128 self.assertEqual(uri.names_bucket(), False)
129 self.assertEqual(uri.names_object(), True)
130 self.assertEqual(uri.names_directory(), False)
131 self.assertEqual(uri.names_file(), False)
132 self.assertEqual(uri.is_stream(), False)
133 self.assertEqual(uri.is_version_specific, True)
134
135 def test_versioned_gs_object_uri_with_legacy_generation_value(self):
136 uri_str = 'gs://bucket/obj/a/b#1'
137 uri = boto.storage_uri(uri_str, validate=False,
138 suppress_consec_slashes=False)
139 self.assertEqual('gs', uri.scheme)
140 self.assertEqual(uri_str, uri.uri)
141 self.assertEqual('gs://bucket/obj/a/b', uri.versionless_uri)
142 self.assertEqual('bucket', uri.bucket_name)
143 self.assertEqual('obj/a/b', uri.object_name)
144 self.assertEqual(None, uri.version_id)
145 self.assertEqual(1, uri.generation)
146 self.assertEqual(uri.names_provider(), False)
147 self.assertEqual(uri.names_container(), False)
148 self.assertEqual(uri.names_bucket(), False)
149 self.assertEqual(uri.names_object(), True)
150 self.assertEqual(uri.names_directory(), False)
151 self.assertEqual(uri.names_file(), False)
152 self.assertEqual(uri.is_stream(), False)
153 self.assertEqual(uri.is_version_specific, True)
154
155 def test_roundtrip_versioned_gs_object_uri_parsed(self):
156 uri_str = 'gs://bucket/obj#1359908801674000'
157 uri = boto.storage_uri(uri_str, validate=False,
158 suppress_consec_slashes=False)
159 roundtrip_uri = boto.storage_uri(uri.uri, validate=False,
160 suppress_consec_slashes=False)
161 self.assertEqual(uri.uri, roundtrip_uri.uri)
162 self.assertEqual(uri.is_version_specific, True)
163
164 def test_versioned_s3_object_uri(self):
165 uri_str = 's3://bucket/obj/a/b#eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S'
166 uri = boto.storage_uri(uri_str, validate=False,
167 suppress_consec_slashes=False)
168 self.assertEqual('s3', uri.scheme)
169 self.assertEqual(uri_str, uri.uri)
170 self.assertEqual('s3://bucket/obj/a/b', uri.versionless_uri)
171 self.assertEqual('bucket', uri.bucket_name)
172 self.assertEqual('obj/a/b', uri.object_name)
173 self.assertEqual('eMuM0J15HkJ9QHlktfNP5MfA.oYR2q6S', uri.version_id)
174 self.assertEqual(None, uri.generation)
175 self.assertEqual(uri.names_provider(), False)
176 self.assertEqual(uri.names_container(), False)
177 self.assertEqual(uri.names_bucket(), False)
178 self.assertEqual(uri.names_object(), True)
179 self.assertEqual(uri.names_directory(), False)
180 self.assertEqual(uri.names_file(), False)
181 self.assertEqual(uri.is_stream(), False)
182 self.assertEqual(uri.is_version_specific, True)
183
184 def test_explicit_file_uri(self):
185 tmp_dir = tempfile.tempdir
186 uri_str = 'file://%s' % urllib.pathname2url(tmp_dir)
187 uri = boto.storage_uri(uri_str, validate=False,
188 suppress_consec_slashes=False)
189 self.assertEqual('file', uri.scheme)
190 self.assertEqual(uri_str, uri.uri)
191 self.assertFalse(hasattr(uri, 'versionless_uri'))
192 self.assertEqual('', uri.bucket_name)
193 self.assertEqual(tmp_dir, uri.object_name)
194 self.assertFalse(hasattr(uri, 'version_id'))
195 self.assertFalse(hasattr(uri, 'generation'))
196 self.assertFalse(hasattr(uri, 'is_version_specific'))
197 self.assertEqual(uri.names_provider(), False)
198 self.assertEqual(uri.names_bucket(), False)
199 # Don't check uri.names_container(), uri.names_directory(),
200 # uri.names_file(), or uri.names_object(), because for file URIs these
201 # functions look at the file system and apparently unit tests run
202 # chroot'd.
203 self.assertEqual(uri.is_stream(), False)
204
205 def test_implicit_file_uri(self):
206 tmp_dir = tempfile.tempdir
207 uri_str = '%s' % urllib.pathname2url(tmp_dir)
208 uri = boto.storage_uri(uri_str, validate=False,
209 suppress_consec_slashes=False)
210 self.assertEqual('file', uri.scheme)
211 self.assertEqual('file://%s' % tmp_dir, uri.uri)
212 self.assertFalse(hasattr(uri, 'versionless_uri'))
213 self.assertEqual('', uri.bucket_name)
214 self.assertEqual(tmp_dir, uri.object_name)
215 self.assertFalse(hasattr(uri, 'version_id'))
216 self.assertFalse(hasattr(uri, 'generation'))
217 self.assertFalse(hasattr(uri, 'is_version_specific'))
218 self.assertEqual(uri.names_provider(), False)
219 self.assertEqual(uri.names_bucket(), False)
220 # Don't check uri.names_container(), uri.names_directory(),
221 # uri.names_file(), or uri.names_object(), because for file URIs these
222 # functions look at the file system and apparently unit tests run
223 # chroot'd.
224 self.assertEqual(uri.is_stream(), False)
225
226 def test_gs_object_uri_contains_sharp_not_matching_version_syntax(self):
227 uri_str = 'gs://bucket/obj#13a990880167400'
228 uri = boto.storage_uri(uri_str, validate=False,
229 suppress_consec_slashes=False)
230 self.assertEqual('gs', uri.scheme)
231 self.assertEqual(uri_str, uri.uri)
232 self.assertEqual('gs://bucket/obj#13a990880167400',
233 uri.versionless_uri)
234 self.assertEqual('bucket', uri.bucket_name)
235 self.assertEqual('obj#13a990880167400', uri.object_name)
236 self.assertEqual(None, uri.version_id)
237 self.assertEqual(None, uri.generation)
238 self.assertEqual(uri.names_provider(), False)
239 self.assertEqual(uri.names_container(), False)
240 self.assertEqual(uri.names_bucket(), False)
241 self.assertEqual(uri.names_object(), True)
242 self.assertEqual(uri.names_directory(), False)
243 self.assertEqual(uri.names_file(), False)
244 self.assertEqual(uri.is_stream(), False)
245 self.assertEqual(uri.is_version_specific, False)
246
247 def test_invalid_scheme(self):
248 uri_str = 'mars://bucket/object'
249 try:
250 boto.storage_uri(uri_str, validate=False,
251 suppress_consec_slashes=False)
252 except InvalidUriError as e:
253 self.assertIn('Unrecognized scheme', e.message)
254
255
256 if __name__ == '__main__':
257 unittest.main()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698