Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: tests/gstools_unittest.py

Issue 12042069: Scripts to download files from google storage based on sha1 sums (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/depot_tools.git@master
Patch Set: Test fix Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Unit tests for gstools.py and download_to/upload_from_google_storage.py."""
7
8 import os
9 import sys
10 import unittest
11 import threading
12 import StringIO
13 import Queue
14 import optparse
15
16 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
17
18 import gstools
19 import upload_to_google_storage
20 import download_from_google_storage
21
22 # ../third_party/gsutil/gsutil
23 GSUTIL_DEFAULT_PATH = os.path.join(
24 os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
25 'third_party', 'gsutil', 'gsutil')
26
27
28 class GsutilMock(object):
29 def __init__(self, path, boto_path=None, timeout=None):
30 self.path = path
31 self.timeout = timeout
32 self.boto_path = boto_path
33 self.expected = []
34 self.history = []
35 self.lock = threading.Lock()
36
37 def add_expected(self, return_code, out, err):
38 self.expected.append((return_code, out, err))
39
40 def append_history(self, method, args):
41 with self.lock:
42 self.history.append((method, args))
43
44 def call(self, *args):
45 self.append_history('call', args)
46 if self.expected:
47 return self.expected.pop(0)[0]
48 else:
49 return 0
50
51 def check_call(self, *args):
52 self.append_history('check_call', args)
53 if self.expected:
54 return self.expected.pop(0)
55 else:
56 return (0, '', '')
57
58 def clone(self):
59 return self
60
61
62 class GstoolsUnitTests(unittest.TestCase):
63 def setUp(self):
64 self.base_path = os.path.join(
65 os.path.dirname(os.path.abspath(__file__)), 'gstools')
66
67 def test_gsutil(self):
68 gsutil = gstools.Gsutil(GSUTIL_DEFAULT_PATH)
69 self.assertEquals(gsutil.path, GSUTIL_DEFAULT_PATH)
70 code, _, err = gsutil.check_call()
71 self.assertEquals(code, 0)
72 self.assertEquals(err, '')
73
74 def test_gsutil_version(self):
75 gsutil = gstools.Gsutil(GSUTIL_DEFAULT_PATH)
76 _, _, err = gsutil.check_call('version')
77 err_lines = err.splitlines()
78 self.assertEquals(err_lines[0], 'gsutil version 3.25')
79 self.assertEquals(
80 err_lines[1],
81 'checksum 6ceb2cabffe2492167043c76fb067fd4 (OK)')
82
83 def test_get_sha1(self):
84 lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
85 self.assertEquals(
86 gstools.GetSHA1(lorem_ipsum),
87 '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
88
89 def test_get_md5(self):
90 lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
91 lock = threading.Lock()
92 self.assertEquals(
93 gstools.GetMD5(lorem_ipsum, lock),
94 '634d7c1ed3545383837428f031840a1e')
95
96 def test_get_md5_cached_read(self):
97 lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
98 lock = threading.Lock()
99 # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum.
100 self.assertEquals(
101 gstools.GetMD5Cached(lorem_ipsum, lock),
102 '734d7c1ed3545383837428f031840a1e')
103
104 def test_get_md5_cached_write(self):
105 lorem_ipsum2 = os.path.join(self.base_path, 'lorem_ipsum2.txt')
106 lorem_ipsum2_md5 = os.path.join(self.base_path, 'lorem_ipsum2.txt.md5')
107 if os.path.exists(lorem_ipsum2_md5):
108 os.remove(lorem_ipsum2_md5)
109 lock = threading.Lock()
110 # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum.
111 self.assertEquals(
112 gstools.GetMD5Cached(lorem_ipsum2, lock),
113 '4c02d1eb455a0f22c575265d17b84b6d')
114 self.assertTrue(os.path.exists(lorem_ipsum2_md5))
115 self.assertEquals(
116 open(lorem_ipsum2_md5, 'rb').read(),
117 '4c02d1eb455a0f22c575265d17b84b6d')
118 os.remove(lorem_ipsum2_md5) # Clean up.
119 self.assertFalse(os.path.exists(lorem_ipsum2_md5))
120
121
122 class UploadTests(unittest.TestCase):
123 def setUp(self):
124 self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH)
125 self.base_path = os.path.join(
126 os.path.dirname(os.path.abspath(__file__)), 'gstools')
127 self.base_url = 'gs://sometesturl'
128 self.parser = optparse.OptionParser()
129 self.options = self.parser.parse_args()[0]
130 self.options.num_threads = 1
131 self.options.force = False
132 self.options.use_md5 = False
133 self.options.use_null_terminator = False
134 self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
135 self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
136
137 def test_upload_single_file(self):
138 filenames = [self.lorem_ipsum]
139 output_filename = '%s.sha1' % self.lorem_ipsum
140 if os.path.exists(output_filename):
141 os.remove(output_filename)
142 self.options.force = True
143 upload_to_google_storage.upload_to_google_storage(
144 filenames,
145 self.base_url,
146 self.gsutil,
147 self.options)
148 self.assertEquals(
149 self.gsutil.history,
150 [('check_call',
151 ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
152 ('call',
153 ('cp', '-q', filenames[0], '%s/%s' % (self.base_url,
154 self.lorem_ipsum_sha1)))])
155 self.assertTrue(os.path.exists(output_filename))
156 self.assertEquals(
157 open(output_filename, 'rb').read(),
158 '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
159 os.remove(output_filename)
160
161 def test_upload_single_file_remote_exists(self):
162 filenames = [self.lorem_ipsum]
163 output_filename = '%s.sha1' % self.lorem_ipsum
164 etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e'
165 if os.path.exists(output_filename):
166 os.remove(output_filename)
167 self.gsutil.add_expected(0, '', '')
168 self.gsutil.add_expected(0, etag_string, '')
169 upload_to_google_storage.upload_to_google_storage(
170 filenames,
171 self.base_url,
172 self.gsutil,
173 self.options)
174 self.assertEquals(
175 self.gsutil.history,
176 [('check_call',
177 ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
178 ('check_call',
179 ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))])
180 self.assertTrue(os.path.exists(output_filename))
181 self.assertEquals(
182 open(output_filename, 'rb').read(),
183 '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
184 os.remove(output_filename)
185
186 def test_skip_hashing(self):
187 filenames = [self.lorem_ipsum]
188 output_filename = '%s.sha1' % self.lorem_ipsum
189 fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f'
190 with open(output_filename, 'wb') as f:
191 f.write(fake_hash) # Fake hash.
192 self.options.skip_hashing = True
193 upload_to_google_storage.upload_to_google_storage(
194 filenames,
195 self.base_url,
196 self.gsutil,
197 self.options)
198 self.assertEquals(
199 self.gsutil.history,
200 [('check_call',
201 ('ls', '%s/%s' % (self.base_url, fake_hash))),
202 ('check_call',
203 ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))),
204 ('call',
205 ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, fake_hash)))])
206 self.assertEquals(
207 open(output_filename, 'rb').read(), fake_hash)
208 os.remove(output_filename)
209
210 def test_get_targets_no_args(self):
211 try:
212 upload_to_google_storage.get_targets(self.options, [], self.parser)
213 except SystemExit, e:
214 self.assertEquals(type(e), type(SystemExit()))
215 self.assertEquals(e.code, 2)
216 except Exception, e:
217 self.fail('unexpected exception: %s' % e)
218 else:
219 self.fail('SystemExit exception expected')
220
221 def test_get_targets_passthrough(self):
222 result = upload_to_google_storage.get_targets(
223 self.options,
224 ['a', 'b', 'c', 'd', 'e'],
225 self.parser)
226 self.assertEquals(result, ['a', 'b', 'c', 'd', 'e'])
227
228 def test_get_targets_multiple_stdin(self):
229 inputs = ['a', 'b', 'c', 'd', 'e']
230 sys.stdin = StringIO.StringIO(os.linesep.join(inputs))
231 result = upload_to_google_storage.get_targets(
232 self.options,
233 ['-'],
234 self.parser)
235 self.assertEquals(result, inputs)
236
237 def test_get_targets_multiple_stdin_null(self):
238 inputs = ['a', 'b', 'c', 'd', 'e']
239 sys.stdin = StringIO.StringIO('\0'.join(inputs))
240 self.options.use_null_terminator = True
241 result = upload_to_google_storage.get_targets(
242 self.options,
243 ['-'],
244 self.parser)
245 self.assertEquals(result, inputs)
246
247
248 class DownloadTests(unittest.TestCase):
249 def setUp(self):
250 self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH)
251 self.base_path = os.path.join(
252 os.path.dirname(os.path.abspath(__file__)),
253 'gstools',
254 'download_test_data')
255 self.base_url = 'gs://sometesturl'
256 self.parser = optparse.OptionParser()
257 self.options = self.parser.parse_args()[0]
258 self.options.recursive = False
259 self.options.force = False
260 self.options.directory = False
261 self.options.num_threads = 1
262 self.queue = Queue.Queue()
263 self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
264 self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
265 self.maxDiff = None
266
267 def test_enumerate_files_non_recursive(self):
268 self.options.directory = True
269 queue_size = download_from_google_storage.enumerate_work_queue(
270 self.base_path, self.queue, self.options)
271 result = list(self.queue.queue)
272 expected_queue = [
273 ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe',
274 os.path.join(self.base_path, 'rootfolder_text.txt')),
275 ('7871c8e24da15bad8b0be2c36edc9dc77e37727f',
276 os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt'))]
277 for item in result:
278 self.assertTrue(item in expected_queue)
279 self.assertEquals(queue_size, 2)
280
281 def test_enumerate_files_recursive(self):
282 self.options.directory = True
283 self.options.recursive = True
284 queue_size = download_from_google_storage.enumerate_work_queue(
285 self.base_path, self.queue, self.options)
286 expected_queue = [
287 ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe',
288 os.path.join(self.base_path, 'rootfolder_text.txt')),
289 ('7871c8e24da15bad8b0be2c36edc9dc77e37727f',
290 os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')),
291 ('b5415aa0b64006a95c0c409182e628881d6d6463',
292 os.path.join(self.base_path, 'subfolder', 'subfolder_text.txt'))]
293 result = list(self.queue.queue)
294 for item in result:
295 self.assertTrue(item in expected_queue)
296 self.assertEquals(queue_size, 3)
297
298 def test_download_worker_single_file(self):
299 sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
300 input_filename = '%s/%s' % (self.base_url, sha1_hash)
301 output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')
302 self.queue.put((sha1_hash, output_filename))
303 self.queue.put((None, None))
304 stdout_queue = Queue.Queue()
305 # pylint: disable=W0212
306 download_from_google_storage._downloader_worker_thread(
307 0, self.queue, self.options, self.base_url, self.gsutil, stdout_queue)
308 expected_calls = [
309 ('check_call',
310 ('ls', input_filename)),
311 ('call',
312 ('cp', '-q', input_filename, output_filename))]
313 expected_output = [
314 'Downloading %s to %s...' % (input_filename, output_filename),
315 'Thread 0 is done']
316 self.assertEquals(list(stdout_queue.queue), expected_output)
317 self.assertEquals(self.gsutil.history, expected_calls)
318
319 def test_download_worker_skips_file(self):
320 sha1_hash = 'e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe'
321 output_filename = os.path.join(self.base_path, 'rootfolder_text.txt')
322 self.queue.put((sha1_hash, output_filename))
323 self.queue.put((None, None))
324 stdout_queue = Queue.Queue()
325 # pylint: disable=W0212
326 download_from_google_storage._downloader_worker_thread(
327 0, self.queue, self.options, self.base_url, self.gsutil, stdout_queue)
328 expected_output = [
329 'File %s exists and SHA1 sum (%s) matches. Skipping.' %
330 (output_filename, sha1_hash),
331 'Thread 0 is done'
332 ]
333 self.assertEquals(list(stdout_queue.queue), expected_output)
334 self.assertEquals(self.gsutil.history, [])
335
336 def test_download_worker_skips_not_found_file(self):
337 sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
338 input_filename = '%s/%s' % (self.base_url, sha1_hash)
339 output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')
340 self.queue.put((sha1_hash, output_filename))
341 self.queue.put((None, None))
342 stdout_queue = Queue.Queue()
343 self.gsutil.add_expected(1, '', '') # Return error when 'ls' is called.
344 # pylint: disable=W0212
345 download_from_google_storage._downloader_worker_thread(
346 0, self.queue, self.options, self.base_url, self.gsutil, stdout_queue)
347 expected_output = [
348 'File %s for %s does not exist, skipping.' % (
349 input_filename, output_filename),
350 'Thread 0 is done'
351 ]
352 expected_calls = [
353 ('check_call',
354 ('ls', input_filename))
355 ]
356 self.assertEquals(list(stdout_queue.queue), expected_output)
357 self.assertEquals(self.gsutil.history, expected_calls)
358
359 def test_download_directory_no_recursive_non_force(self):
360 self.options.directory = True
361 sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
362 input_filename = '%s/%s' % (self.base_url, sha1_hash)
363 output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')
364 download_from_google_storage.download_from_google_storage(
365 self.base_path, self.base_url, self.gsutil, self.options)
366 expected_calls = [
367 ('check_call',
368 ('ls', input_filename)),
369 ('call',
370 ('cp', '-q', input_filename, output_filename))]
371 self.assertEquals(self.gsutil.history, expected_calls)
372
373
374 if __name__ == '__main__':
375 unittest.main()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698