Chromium Code Reviews| Index: tests/gstools_unittest.py |
| diff --git a/tests/gstools_unittest.py b/tests/gstools_unittest.py |
| new file mode 100755 |
| index 0000000000000000000000000000000000000000..0a9b4e0cb0b4e8439523949199f2f216a21c04c8 |
| --- /dev/null |
| +++ b/tests/gstools_unittest.py |
| @@ -0,0 +1,357 @@ |
| +#!/usr/bin/env python |
| +# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +"""Unit tests for gstools.py and download_to/upload_from_google_storage.py.""" |
| + |
| +import os |
| +import sys |
| +import unittest |
| +import threading |
| +import StringIO |
| +import Queue |
| +import optparse |
| + |
| +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| + |
| +import gstools |
| +import upload_to_google_storage |
| +import download_from_google_storage |
| + |
| +# ../third_party/gsutil/gsutil |
| +GSUTIL_DEFAULT_PATH = os.path.join( |
| + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), |
| + 'third_party', 'gsutil', 'gsutil') |
| + |
|
M-A Ruel
2013/02/27 21:52:07
2 lines
Ryan Tseng
2013/02/27 23:34:41
Done.
|
| +class GsutilMock(object): |
| + def __init__(self, path, boto_path=None, timeout=None): |
| + self.path = path |
| + self.timeout = timeout |
| + self.boto_path = boto_path |
| + self.expected = [] |
| + self.history = [] |
| + self.lock = threading.Lock() |
| + |
| + def add_expected(self, return_code, out, err): |
| + self.expected.append((return_code, out, err)) |
| + |
| + def append_history(self, method, args): |
| + with self.lock: |
| + self.history.append((method, args)) |
| + |
| + def call(self, *args): |
| + self.append_history('call', args) |
| + if self.expected: |
| + return self.expected.pop(0)[0] |
| + else: |
| + return 0 |
| + |
| + def check_call(self, *args): |
| + self.append_history('check_call', args) |
| + if self.expected: |
| + return self.expected.pop(0) |
| + else: |
| + return (0, '', '') |
| + |
| + def clone(self): |
| + return self |
| + |
| + |
| +class GstoolsUnitTests(unittest.TestCase): |
| + def setUp(self): |
| + self.base_path = os.path.join( |
| + os.path.dirname(os.path.abspath(__file__)), 'gstools') |
| + |
| + def test_gsutil(self): |
| + gsutil = gstools.Gsutil(GSUTIL_DEFAULT_PATH) |
| + self.assertEquals(gsutil.path, GSUTIL_DEFAULT_PATH) |
| + code, _, err = gsutil.check_call() |
| + self.assertEquals(code, 0) |
| + self.assertEquals(err, '') |
| + |
| + def test_gsutil_version(self): |
| + gsutil = gstools.Gsutil(GSUTIL_DEFAULT_PATH) |
| + _, _, err = gsutil.check_call('version') |
| + err_lines = err.splitlines() |
| + self.assertEquals(err_lines[0], 'gsutil version 3.25') |
| + self.assertEquals( |
| + err_lines[1], |
| + 'checksum 6783bc26b80c538a136ba5392d44b0e3 (OK)') |
| + |
| + def test_get_sha1(self): |
| + lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
| + self.assertEquals( |
| + gstools.GetSHA1(lorem_ipsum), |
| + '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
| + |
| + def test_get_md5(self): |
| + lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
| + lock = threading.Lock() |
| + self.assertEquals( |
| + gstools.GetMD5(lorem_ipsum, lock), |
| + '634d7c1ed3545383837428f031840a1e') |
| + |
| + def test_get_md5_cached_read(self): |
| + lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
| + lock = threading.Lock() |
| + # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. |
| + self.assertEquals( |
| + gstools.GetMD5Cached(lorem_ipsum, lock), |
| + '734d7c1ed3545383837428f031840a1e') |
| + |
| + def test_get_md5_cached_write(self): |
| + lorem_ipsum2 = os.path.join(self.base_path, 'lorem_ipsum2.txt') |
| + lorem_ipsum2_md5 = os.path.join(self.base_path, 'lorem_ipsum2.txt.md5') |
| + if os.path.exists(lorem_ipsum2_md5): |
| + os.remove(lorem_ipsum2_md5) |
| + lock = threading.Lock() |
| + # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum. |
| + self.assertEquals( |
| + gstools.GetMD5Cached(lorem_ipsum2, lock), |
| + '4c02d1eb455a0f22c575265d17b84b6d') |
| + self.assertTrue(os.path.exists(lorem_ipsum2_md5)) |
| + self.assertEquals( |
| + open(lorem_ipsum2_md5, 'rb').read(), |
| + '4c02d1eb455a0f22c575265d17b84b6d') |
| + os.remove(lorem_ipsum2_md5) # Clean up. |
| + self.assertFalse(os.path.exists(lorem_ipsum2_md5)) |
| + |
| + |
| +class UploadTests(unittest.TestCase): |
| + def setUp(self): |
| + self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) |
| + self.base_path = os.path.join( |
| + os.path.dirname(os.path.abspath(__file__)), 'gstools') |
| + self.base_url = 'gs://sometesturl' |
| + self.parser = optparse.OptionParser() |
| + self.options = self.parser.parse_args()[0] |
| + self.options.num_threads = 1 |
| + self.options.force = False |
| + self.options.use_md5 = False |
| + self.options.use_null_terminator = False |
| + self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
| + self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| + |
| + def test_upload_single_file(self): |
| + filenames = [self.lorem_ipsum] |
| + output_filename = '%s.sha1' % self.lorem_ipsum |
| + if os.path.exists(output_filename): |
| + os.remove(output_filename) |
| + self.options.force = True |
| + upload_to_google_storage.upload_to_google_storage( |
| + filenames, |
| + self.base_url, |
| + self.gsutil, |
| + self.options) |
| + self.assertEquals( |
| + self.gsutil.history, |
| + [('check_call', |
| + ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
| + ('call', |
| + ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, |
| + self.lorem_ipsum_sha1)))]) |
| + self.assertTrue(os.path.exists(output_filename)) |
| + self.assertEquals( |
| + open(output_filename, 'rb').read(), |
| + '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
| + os.remove(output_filename) |
| + |
| + def test_upload_single_file_remote_exists(self): |
| + filenames = [self.lorem_ipsum] |
| + output_filename = '%s.sha1' % self.lorem_ipsum |
| + etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e' |
| + if os.path.exists(output_filename): |
| + os.remove(output_filename) |
| + self.gsutil.add_expected(0, '', '') |
| + self.gsutil.add_expected(0, etag_string, '') |
| + upload_to_google_storage.upload_to_google_storage( |
| + filenames, |
| + self.base_url, |
| + self.gsutil, |
| + self.options) |
| + self.assertEquals( |
| + self.gsutil.history, |
| + [('check_call', |
| + ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
| + ('check_call', |
| + ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))]) |
| + self.assertTrue(os.path.exists(output_filename)) |
| + self.assertEquals( |
| + open(output_filename, 'rb').read(), |
| + '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
| + os.remove(output_filename) |
| + |
| + def test_skip_hashing(self): |
| + filenames = [self.lorem_ipsum] |
| + output_filename = '%s.sha1' % self.lorem_ipsum |
| + fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| + with open(output_filename, 'wb') as f: |
| + f.write(fake_hash) # Fake hash. |
| + self.options.skip_hashing = True |
| + upload_to_google_storage.upload_to_google_storage( |
| + filenames, |
| + self.base_url, |
| + self.gsutil, |
| + self.options) |
| + self.assertEquals( |
| + self.gsutil.history, |
| + [('check_call', |
| + ('ls', '%s/%s' % (self.base_url, fake_hash))), |
| + ('check_call', |
| + ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))), |
| + ('call', |
| + ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, fake_hash)))]) |
| + self.assertEquals( |
| + open(output_filename, 'rb').read(), fake_hash) |
| + os.remove(output_filename) |
| + |
| + def test_get_targets_no_args(self): |
| + try: |
| + upload_to_google_storage.get_targets(self.options, [], self.parser) |
| + except SystemExit, e: |
| + self.assertEquals(type(e), type(SystemExit())) |
| + self.assertEquals(e.code, 2) |
| + except Exception, e: |
| + self.fail('unexpected exception: %s' % e) |
| + else: |
| + self.fail('SystemExit exception expected') |
| + |
| + def test_get_targets_passthrough(self): |
| + result = upload_to_google_storage.get_targets( |
| + self.options, |
| + ['a', 'b', 'c', 'd', 'e'], |
| + self.parser) |
| + self.assertEquals(result, ['a', 'b', 'c', 'd', 'e']) |
| + |
| + def test_get_targets_multiple_stdin(self): |
| + inputs = ['a', 'b', 'c', 'd', 'e'] |
| + sys.stdin = StringIO.StringIO(os.linesep.join(inputs)) |
| + result = upload_to_google_storage.get_targets( |
| + self.options, |
| + ['-'], |
| + self.parser) |
| + self.assertEquals(result, inputs) |
| + |
| + def test_get_targets_multiple_stdin_null(self): |
| + inputs = ['a', 'b', 'c', 'd', 'e'] |
| + sys.stdin = StringIO.StringIO('\0'.join(inputs)) |
| + self.options.use_null_terminator = True |
| + result = upload_to_google_storage.get_targets( |
| + self.options, |
| + ['-'], |
| + self.parser) |
| + self.assertEquals(result, inputs) |
| + |
| + |
| +class DownloadTests(unittest.TestCase): |
| + def setUp(self): |
| + self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) |
| + self.base_path = os.path.join( |
| + os.path.dirname(os.path.abspath(__file__)), |
| + 'gstools', |
| + 'download_test_data') |
| + self.base_url = 'gs://sometesturl' |
| + self.parser = optparse.OptionParser() |
| + self.options = self.parser.parse_args()[0] |
| + self.options.recursive = False |
| + self.options.force = False |
| + self.options.directory = False |
| + self.queue = Queue.Queue() |
| + self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
| + self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| + self.maxDiff = None |
| + |
| + def test_enumerate_files_non_recursive(self): |
| + self.options.directory = True |
| + queue_size = download_from_google_storage.enumerate_work_queue( |
| + self.base_path, self.queue, self.options) |
| + result = list(self.queue.queue) |
| + self.assertEquals( |
| + result, |
| + [('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', |
| + os.path.join(self.base_path, 'rootfolder_text.txt')), |
| + ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', |
| + os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt'))]) |
| + self.assertEquals(queue_size, 2) |
| + |
| + def test_enumerate_files_recursive(self): |
| + self.options.directory = True |
| + self.options.recursive = True |
| + queue_size = download_from_google_storage.enumerate_work_queue( |
| + self.base_path, self.queue, self.options) |
| + result = list(self.queue.queue) |
| + self.assertEquals( |
| + result, |
| + [('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe', |
| + os.path.join(self.base_path, 'rootfolder_text.txt')), |
| + ('7871c8e24da15bad8b0be2c36edc9dc77e37727f', |
| + os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')), |
| + ('b5415aa0b64006a95c0c409182e628881d6d6463', |
| + os.path.join(self.base_path, 'subfolder', 'subfolder_text.txt'))]) |
| + self.assertEquals(queue_size, 3) |
| + |
| + def test_download_worker_single_file(self): |
| + sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| + input_filename = '%s/%s' % (self.base_url, sha1_hash) |
| + output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') |
| + self.queue.put((sha1_hash, output_filename)) |
| + self.queue.put((None, None)) |
| + stdout_queue = Queue.Queue() |
| + # pylint: disable=W0212 |
| + download_from_google_storage._downloader_worker_thread( |
| + 0, self.queue, self.options, self.base_url, self.gsutil, stdout_queue) |
| + expected_calls = [ |
| + ('check_call', |
| + ('ls', input_filename)), |
| + ('call', |
| + ('cp', '-q', input_filename, output_filename))] |
| + expected_output = [ |
| + 'Downloading %s to %s...' % (input_filename, output_filename), |
| + 'Thread 0 is done'] |
| + self.assertEquals(list(stdout_queue.queue), expected_output) |
| + self.assertEquals(self.gsutil.history, expected_calls) |
| + |
| + def test_download_worker_skips_file(self): |
| + sha1_hash = 'e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe' |
| + output_filename = os.path.join(self.base_path, 'rootfolder_text.txt') |
| + self.queue.put((sha1_hash, output_filename)) |
| + self.queue.put((None, None)) |
| + stdout_queue = Queue.Queue() |
| + # pylint: disable=W0212 |
| + download_from_google_storage._downloader_worker_thread( |
| + 0, self.queue, self.options, self.base_url, self.gsutil, stdout_queue) |
| + expected_output = [ |
| + 'File %s exists and SHA1 sum (%s) matches. Skipping.' % |
| + (output_filename, sha1_hash), |
| + 'Thread 0 is done' |
| + ] |
| + self.assertEquals(list(stdout_queue.queue), expected_output) |
| + self.assertEquals(self.gsutil.history, []) |
| + |
| + def test_download_worker_skips_not_found_file(self): |
| + sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| + input_filename = '%s/%s' % (self.base_url, sha1_hash) |
| + output_filename = os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt') |
| + self.queue.put((sha1_hash, output_filename)) |
| + self.queue.put((None, None)) |
| + stdout_queue = Queue.Queue() |
| + self.gsutil.add_expected(1, '', '') # Return error when 'ls' is called. |
| + # pylint: disable=W0212 |
| + download_from_google_storage._downloader_worker_thread( |
| + 0, self.queue, self.options, self.base_url, self.gsutil, stdout_queue) |
| + expected_output = [ |
| + 'File %s for %s does not exist, skipping.' % ( |
| + input_filename, output_filename), |
| + 'Thread 0 is done' |
| + ] |
| + expected_calls = [ |
| + ('check_call', |
| + ('ls', input_filename)) |
| + ] |
| + self.assertEquals(list(stdout_queue.queue), expected_output) |
| + self.assertEquals(self.gsutil.history, expected_calls) |
| + |
| + |
| +if __name__ == '__main__': |
| + unittest.main() |