Chromium Code Reviews| Index: tests/git_common_test.py |
| diff --git a/tests/git_common_test.py b/tests/git_common_test.py |
| new file mode 100755 |
| index 0000000000000000000000000000000000000000..c5f3e416b4ac7fa87f7032713404e6da46718cc7 |
| --- /dev/null |
| +++ b/tests/git_common_test.py |
| @@ -0,0 +1,280 @@ |
| +#!/usr/bin/env python |
| +# Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +"""Unit tests for git_common.py""" |
| + |
| +import binascii |
| +import collections |
| +import os |
| +import signal |
| +import sys |
| +import tempfile |
| +import time |
| +import unittest |
| + |
| +DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| +sys.path.insert(0, DEPOT_TOOLS_ROOT) |
| + |
| +from testing_support import git_test_utils |
|
M-A Ruel
2013/11/17 19:54:58
sort
iannucci
2013/11/18 05:38:56
Dangit! Why don't we have pylint do this....
|
| +from testing_support import coverage_utils |
| + |
| + |
| +class GitCommonTestBase(unittest.TestCase): |
| + @classmethod |
| + def setUpClass(cls): |
| + super(GitCommonTestBase, cls).setUpClass() |
| + import git_common |
| + cls.gc = git_common |
| + |
| + |
| +class Support(GitCommonTestBase): |
| + def _testMemoizeOneBody(self, threadsafe): |
| + calls = collections.defaultdict(int) |
| + def double_if_even(val): |
| + calls[val] += 1 |
| + return val * 2 if val % 2 == 0 else None |
| + # Use this explicitly as a wrapper fn instead of a decorator. Otherwise |
| + # pylint crashes (!!) |
| + double_if_even = self.gc.memoize_one(threadsafe=threadsafe)(double_if_even) |
| + |
| + self.assertEqual(4, double_if_even(2)) |
| + self.assertEqual(4, double_if_even(2)) |
| + self.assertEqual(None, double_if_even(1)) |
| + self.assertEqual(None, double_if_even(1)) |
| + self.assertDictEqual({1: 2, 2: 1}, calls) |
| + |
| + double_if_even.set(10, 20) |
| + self.assertEqual(20, double_if_even(10)) |
| + self.assertDictEqual({1: 2, 2: 1}, calls) |
| + |
| + double_if_even.clear() |
| + self.assertEqual(4, double_if_even(2)) |
| + self.assertEqual(4, double_if_even(2)) |
| + self.assertEqual(None, double_if_even(1)) |
| + self.assertEqual(None, double_if_even(1)) |
| + self.assertEqual(20, double_if_even(10)) |
| + self.assertDictEqual({1: 4, 2: 2, 10: 1}, calls) |
| + |
| + def testMemoizeOne(self): |
| + self._testMemoizeOneBody(threadsafe=False) |
| + |
| + def testMemoizeOneThreadsafe(self): |
| + self._testMemoizeOneBody(threadsafe=True) |
| + |
| + |
| +def slow_square(i): |
| + """Helper for ScopedPoolTest. |
| + |
| + Must be global because non top-level functions aren't pickleable. |
| + """ |
| + time.sleep(0.2) |
| + return i ** 2 |
| + |
| + |
| +class ScopedPoolTest(GitCommonTestBase): |
| + if sys.platform.startswith('win'): |
| + CTRL_C = signal.CTRL_C_EVENT |
| + else: |
| + CTRL_C = signal.SIGINT |
|
M-A Ruel
2013/11/17 19:54:58
CTRL_C = signal.CTRL_C_EVENT if sys.platform == 'w
iannucci
2013/11/18 05:38:56
Yeah, I know... it's weird. Done.
|
| + |
| + def testThreads(self): |
| + result = [] |
| + with self.gc.ScopedPool(kind='threads') as pool: |
| + for i in pool.imap(slow_square, xrange(10)): |
|
M-A Ruel
2013/11/17 19:54:58
result = list(pool.imap(slow_square, xrange(10)))
iannucci
2013/11/18 05:38:56
Lol, done.
|
| + result.append(i) |
| + self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result) |
| + |
| + def testThreadsCtrlC(self): |
| + result = [] |
| + with self.assertRaises(KeyboardInterrupt): |
| + with self.gc.ScopedPool(kind='threads') as pool: |
| + # Make sure this pool is interrupted in mid-swing |
| + for i in pool.imap(slow_square, xrange(1000000)): |
|
M-A Ruel
2013/11/17 19:54:58
One thing I prefer is to use a list of threading.E
iannucci
2013/11/18 05:38:56
Tried it. Surprise! Python signal-handling is brok
|
| + if i > 32: |
| + os.kill(os.getpid(), self.CTRL_C) |
| + result.append(i) |
| + self.assertEqual([0, 1, 4, 9, 16, 25], result) |
| + |
| + def testProcs(self): |
| + result = [] |
| + with self.gc.ScopedPool() as pool: |
| + for i in pool.imap(slow_square, xrange(10)): |
|
M-A Ruel
2013/11/17 19:54:58
same
iannucci
2013/11/18 05:38:56
Done.
|
| + result.append(i) |
| + self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result) |
| + |
| + def testProcsCtrlC(self): |
| + result = [] |
| + with self.assertRaises(KeyboardInterrupt): |
| + with self.gc.ScopedPool() as pool: |
| + # Make sure this pool is interrupted in mid-swing |
| + for i in pool.imap(slow_square, xrange(1000000)): |
| + if i > 32: |
| + os.kill(os.getpid(), self.CTRL_C) |
| + result.append(i) |
| + self.assertEqual([0, 1, 4, 9, 16, 25], result) |
| + |
| + |
| +class ProgressPrinterTest(GitCommonTestBase): |
| + class FakeStream(object): |
| + def __init__(self): |
| + self.data = set() |
| + self.count = 0 |
| + |
| + def write(self, line): |
| + self.data.add(line) |
| + |
| + def flush(self): |
| + self.count += 1 |
| + |
| + @unittest.expectedFailure |
| + def testBasic(self): |
| + """This test is probably racy, but I don't have a better alternative.""" |
| + fmt = '%(count)d/10' |
| + stream = self.FakeStream() |
| + |
| + pp = self.gc.ProgressPrinter(fmt, enabled=True, stream=stream, period=0.01) |
| + with pp as inc: |
| + for _ in xrange(10): |
| + time.sleep(0.02) |
| + inc() |
| + |
| + filtered = set(x.strip() for x in stream.data) |
| + rslt = set(fmt % {'count': i} for i in xrange(11)) |
| + self.assertSetEqual(filtered, rslt) |
| + self.assertGreaterEqual(stream.count, 10) |
| + |
| + |
| +class GitReadOnlyFunctionsTest(git_test_utils.GitRepoReadOnlyTestBase, |
| + GitCommonTestBase): |
| + REPO = """ |
| + A B C D |
| + B E D |
| + """ |
| + |
| + COMMIT_A = { |
| + 'some/files/file1': {'data': 'file1'}, |
| + 'some/files/file2': {'data': 'file2'}, |
| + 'some/files/file3': {'data': 'file3'}, |
| + 'some/other/file': {'data': 'otherfile'}, |
| + } |
| + |
| + COMMIT_C = { |
| + 'some/files/file2': { |
| + 'mode': 0755, |
| + 'data': 'file2 - vanilla'}, |
| + } |
| + |
| + COMMIT_E = { |
| + 'some/files/file2': {'data': 'file2 - merged'}, |
| + } |
| + |
| + COMMIT_D = { |
| + 'some/files/file2': {'data': 'file2 - vanilla\nfile2 - merged'}, |
| + } |
| + |
| + def testHashes(self): |
| + ret = self.repo.run( |
| + self.gc.hashes, *[ |
| + 'master', |
| + 'master~3', |
| + self.repo['E']+'~', |
| + self.repo['D']+'^2', |
| + 'tag_C^{}', |
| + ] |
| + ) |
| + self.assertEqual([ |
| + self.repo['D'], |
| + self.repo['A'], |
| + self.repo['B'], |
| + self.repo['E'], |
| + self.repo['C'], |
| + ], ret) |
| + |
| + def testParseCommitrefs(self): |
| + ret = self.repo.run( |
| + self.gc.parse_commitrefs, *[ |
| + 'master', |
| + 'master~3', |
| + self.repo['E']+'~', |
| + self.repo['D']+'^2', |
| + 'tag_C^{}', |
| + ] |
| + ) |
| + self.assertEqual(ret, map(binascii.unhexlify, [ |
| + self.repo['D'], |
| + self.repo['A'], |
| + self.repo['B'], |
| + self.repo['E'], |
| + self.repo['C'], |
| + ])) |
| + |
| + with self.assertRaisesRegexp(Exception, r"one of \('master', 'bananas'\)"): |
| + self.repo.run(self.gc.parse_commitrefs, 'master', 'bananas') |
| + |
| + def testTree(self): |
| + tree = self.repo.run(self.gc.tree, 'master:some/files') |
| + file1 = self.COMMIT_A['some/files/file1']['data'] |
| + file2 = self.COMMIT_D['some/files/file2']['data'] |
| + file3 = self.COMMIT_A['some/files/file3']['data'] |
| + self.assertEquals(tree['file1'], |
| + ('100644', 'blob', git_test_utils.git_hash_data(file1))) |
| + self.assertEquals(tree['file2'], |
| + ('100755', 'blob', git_test_utils.git_hash_data(file2))) |
| + self.assertEquals(tree['file3'], |
| + ('100644', 'blob', git_test_utils.git_hash_data(file3))) |
| + |
| + tree = self.repo.run(self.gc.tree, 'master:some') |
| + self.assertEquals(len(tree), 2) |
| + # Don't check the tree hash because we're lazy :) |
| + self.assertEquals(tree['files'][:2], ('040000', 'tree')) |
| + |
| + tree = self.repo.run(self.gc.tree, 'master:wat') |
| + self.assertEqual(tree, None) |
| + |
| + def testTreeRecursive(self): |
| + tree = self.repo.run(self.gc.tree, 'master:some', recurse=True) |
| + file1 = self.COMMIT_A['some/files/file1']['data'] |
| + file2 = self.COMMIT_D['some/files/file2']['data'] |
| + file3 = self.COMMIT_A['some/files/file3']['data'] |
| + other = self.COMMIT_A['some/other/file']['data'] |
| + self.assertEquals(tree['files/file1'], |
| + ('100644', 'blob', git_test_utils.git_hash_data(file1))) |
| + self.assertEquals(tree['files/file2'], |
| + ('100755', 'blob', git_test_utils.git_hash_data(file2))) |
| + self.assertEquals(tree['files/file3'], |
| + ('100644', 'blob', git_test_utils.git_hash_data(file3))) |
| + self.assertEquals(tree['other/file'], |
| + ('100644', 'blob', git_test_utils.git_hash_data(other))) |
| + |
| + |
| +class GitMutableFunctionsTest(git_test_utils.GitRepoReadWriteTestBase, |
| + GitCommonTestBase): |
| + REPO = '' |
| + |
| + def _intern_data(self, data): |
| + with tempfile.TemporaryFile() as f: |
| + f.write(data) |
| + f.seek(0) |
| + return self.repo.run(self.gc.intern_f, f) |
| + |
| + def testInternF(self): |
| + data = 'CoolBobcatsBro' |
| + data_hash = self._intern_data(data) |
| + self.assertEquals(git_test_utils.git_hash_data(data), data_hash) |
| + self.assertEquals(data, self.repo.git('cat-file', 'blob', data_hash).stdout) |
| + |
| + def testMkTree(self): |
| + tree = {} |
| + for i in 1, 2, 3: |
| + name = 'file%d' % i |
| + tree[name] = ('100644', 'blob', self._intern_data(name)) |
| + tree_hash = self.repo.run(self.gc.mktree, tree) |
| + self.assertEquals('37b61866d6e061c4ba478e7eb525be7b5752737d', tree_hash) |
| + |
| + |
| +if __name__ == '__main__': |
| + sys.exit(coverage_utils.covered_main( |
| + os.path.join(DEPOT_TOOLS_ROOT, 'git_common.py') |
| + )) |