OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. |
| 5 |
| 6 """Unit tests for upload_to_google_storage.py.""" |
| 7 |
| 8 import optparse |
| 9 import os |
| 10 import Queue |
| 11 import shutil |
| 12 import StringIO |
| 13 import sys |
| 14 import tempfile |
| 15 import threading |
| 16 import unittest |
| 17 |
| 18 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| 19 |
| 20 import upload_to_google_storage |
| 21 from download_from_google_storage_unittests import GsutilMock |
| 22 |
| 23 # ../third_party/gsutil/gsutil |
| 24 GSUTIL_DEFAULT_PATH = os.path.join( |
| 25 os.path.dirname(os.path.dirname(os.path.abspath(__file__))), |
| 26 'third_party', 'gsutil', 'gsutil') |
| 27 TEST_DIR = os.path.dirname(os.path.abspath(__file__)) |
| 28 |
| 29 |
| 30 class UploadTests(unittest.TestCase): |
| 31 def setUp(self): |
| 32 self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH) |
| 33 self.temp_dir = tempfile.mkdtemp(prefix='gstools_test') |
| 34 self.base_path = os.path.join(self.temp_dir, 'gstools') |
| 35 shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path) |
| 36 self.base_url = 'gs://sometesturl' |
| 37 self.parser = optparse.OptionParser() |
| 38 self.ret_codes = Queue.Queue() |
| 39 self.stdout_queue = Queue.Queue() |
| 40 self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
| 41 self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| 42 |
| 43 def cleanUp(self): |
| 44 shutil.rmtree(self.temp_dir) |
| 45 sys.stdin = sys.__stdin__ |
| 46 |
| 47 def test_upload_single_file(self): |
| 48 filenames = [self.lorem_ipsum] |
| 49 output_filename = '%s.sha1' % self.lorem_ipsum |
| 50 code = upload_to_google_storage.upload_to_google_storage( |
| 51 filenames, self.base_url, self.gsutil, True, False, 1, False) |
| 52 self.assertEqual( |
| 53 self.gsutil.history, |
| 54 [('check_call', |
| 55 ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
| 56 ('check_call', |
| 57 ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, |
| 58 self.lorem_ipsum_sha1)))]) |
| 59 self.assertTrue(os.path.exists(output_filename)) |
| 60 self.assertEqual( |
| 61 open(output_filename, 'rb').read(), |
| 62 '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
| 63 os.remove(output_filename) |
| 64 self.assertEqual(code, 0) |
| 65 |
| 66 def test_upload_single_file_remote_exists(self): |
| 67 filenames = [self.lorem_ipsum] |
| 68 output_filename = '%s.sha1' % self.lorem_ipsum |
| 69 etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e' |
| 70 self.gsutil.add_expected(0, '', '') |
| 71 self.gsutil.add_expected(0, etag_string, '') |
| 72 code = upload_to_google_storage.upload_to_google_storage( |
| 73 filenames, self.base_url, self.gsutil, False, False, 1, False) |
| 74 self.assertEqual( |
| 75 self.gsutil.history, |
| 76 [('check_call', |
| 77 ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
| 78 ('check_call', |
| 79 ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))]) |
| 80 self.assertTrue(os.path.exists(output_filename)) |
| 81 self.assertEqual( |
| 82 open(output_filename, 'rb').read(), |
| 83 '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
| 84 os.remove(output_filename) |
| 85 self.assertEqual(code, 0) |
| 86 |
| 87 def test_upload_worker_errors(self): |
| 88 work_queue = Queue.Queue() |
| 89 work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1)) |
| 90 work_queue.put((None, None)) |
| 91 self.gsutil.add_expected(1, '', '') # For the first ls call. |
| 92 self.gsutil.add_expected(20, '', 'Expected error message') |
| 93 # pylint: disable=W0212 |
| 94 upload_to_google_storage._upload_worker( |
| 95 0, |
| 96 work_queue, |
| 97 self.base_url, |
| 98 self.gsutil, |
| 99 threading.Lock(), |
| 100 False, |
| 101 False, |
| 102 self.stdout_queue, |
| 103 self.ret_codes) |
| 104 expected_ret_codes = [ |
| 105 (20, |
| 106 'Encountered error on uploading %s to %s/%s\nExpected error message' % |
| 107 (self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))] |
| 108 self.assertEqual(list(self.ret_codes.queue), expected_ret_codes) |
| 109 |
| 110 def test_skip_hashing(self): |
| 111 filenames = [self.lorem_ipsum] |
| 112 output_filename = '%s.sha1' % self.lorem_ipsum |
| 113 fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| 114 with open(output_filename, 'wb') as f: |
| 115 f.write(fake_hash) # Fake hash. |
| 116 code = upload_to_google_storage.upload_to_google_storage( |
| 117 filenames, self.base_url, self.gsutil, False, False, 1, True) |
| 118 self.assertEqual( |
| 119 self.gsutil.history, |
| 120 [('check_call', |
| 121 ('ls', '%s/%s' % (self.base_url, fake_hash))), |
| 122 ('check_call', |
| 123 ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))), |
| 124 ('check_call', |
| 125 ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, fake_hash)))]) |
| 126 self.assertEqual( |
| 127 open(output_filename, 'rb').read(), fake_hash) |
| 128 os.remove(output_filename) |
| 129 self.assertEqual(code, 0) |
| 130 |
| 131 def test_get_targets_no_args(self): |
| 132 try: |
| 133 upload_to_google_storage.get_targets([], self.parser, False) |
| 134 self.fail() |
| 135 except SystemExit, e: |
| 136 self.assertEqual(e.code, 2) |
| 137 |
| 138 def test_get_targets_passthrough(self): |
| 139 result = upload_to_google_storage.get_targets( |
| 140 ['a', 'b', 'c', 'd', 'e'], |
| 141 self.parser, |
| 142 False) |
| 143 self.assertEqual(result, ['a', 'b', 'c', 'd', 'e']) |
| 144 |
| 145 def test_get_targets_multiple_stdin(self): |
| 146 inputs = ['a', 'b', 'c', 'd', 'e'] |
| 147 sys.stdin = StringIO.StringIO(os.linesep.join(inputs)) |
| 148 result = upload_to_google_storage.get_targets( |
| 149 ['-'], |
| 150 self.parser, |
| 151 False) |
| 152 self.assertEqual(result, inputs) |
| 153 |
| 154 def test_get_targets_multiple_stdin_null(self): |
| 155 inputs = ['a', 'b', 'c', 'd', 'e'] |
| 156 sys.stdin = StringIO.StringIO('\0'.join(inputs)) |
| 157 result = upload_to_google_storage.get_targets( |
| 158 ['-'], |
| 159 self.parser, |
| 160 True) |
| 161 self.assertEqual(result, inputs) |
| 162 |
| 163 |
| 164 if __name__ == '__main__': |
| 165 unittest.main() |
OLD | NEW |