Index: tests/trace_inputs_test.py |
diff --git a/tests/trace_inputs_test.py b/tests/trace_inputs_test.py |
index 70a026e1ec8737908559bec0bc8ab08bb313bcb6..8092c895b4bc69316d9ee9fd22610045cde63077 100755 |
--- a/tests/trace_inputs_test.py |
+++ b/tests/trace_inputs_test.py |
@@ -20,6 +20,9 @@ FILE_PATH = unicode(os.path.abspath(__file__)) |
import trace_inputs |
+# Access to a protected member _FOO of a client class |
+# pylint: disable=W0212 |
+ |
def join_norm(*args): |
"""Joins and normalizes path in a single step.""" |
@@ -43,7 +46,7 @@ class TraceInputs(unittest.TestCase): |
), |
) |
for actual, expected in test_cases: |
- self.assertEquals( |
+ self.assertEqual( |
expected, trace_inputs.strace_process_quoted_arguments(actual)) |
def test_process_escaped_arguments(self): |
@@ -53,28 +56,28 @@ class TraceInputs(unittest.TestCase): |
('\\"foo\\"\\0', ['"foo"']), |
) |
for actual, expected in test_cases: |
- self.assertEquals( |
+ self.assertEqual( |
expected, |
trace_inputs.Dtrace.Context.process_escaped_arguments(actual)) |
def test_variable_abs(self): |
value = trace_inputs.Results.File(None, u'/foo/bar', False, False) |
actual = value.replace_variables({'$FOO': '/foo'}) |
- self.assertEquals('$FOO/bar', actual.path) |
- self.assertEquals('$FOO/bar', actual.full_path) |
- self.assertEquals(True, actual.tainted) |
+ self.assertEqual('$FOO/bar', actual.path) |
+ self.assertEqual('$FOO/bar', actual.full_path) |
+ self.assertEqual(True, actual.tainted) |
def test_variable_rel(self): |
value = trace_inputs.Results.File(u'/usr', u'foo/bar', False, False) |
actual = value.replace_variables({'$FOO': 'foo'}) |
- self.assertEquals('$FOO/bar', actual.path) |
- self.assertEquals(os.path.join('/usr', '$FOO/bar'), actual.full_path) |
- self.assertEquals(True, actual.tainted) |
+ self.assertEqual('$FOO/bar', actual.path) |
+ self.assertEqual(os.path.join('/usr', '$FOO/bar'), actual.full_path) |
+ self.assertEqual(True, actual.tainted) |
def test_native_case_end_with_os_path_sep(self): |
# Make sure the trailing os.path.sep is kept. |
path = trace_inputs.get_native_path_case(ROOT_DIR) + os.path.sep |
- self.assertEquals(trace_inputs.get_native_path_case(path), path) |
+ self.assertEqual(trace_inputs.get_native_path_case(path), path) |
def test_native_case_non_existing(self): |
# Make sure it doesn't throw on non-existing files. |
@@ -82,7 +85,7 @@ class TraceInputs(unittest.TestCase): |
path = os.path.expanduser('~/' + non_existing) |
self.assertFalse(os.path.exists(path)) |
path = trace_inputs.get_native_path_case(ROOT_DIR) + os.path.sep |
- self.assertEquals(trace_inputs.get_native_path_case(path), path) |
+ self.assertEqual(trace_inputs.get_native_path_case(path), path) |
def test_strace_filename(self): |
filename = u'foo, bar, ~p#o,,ué^t%t.txt' |
@@ -136,8 +139,8 @@ class TraceInputs(unittest.TestCase): |
path = path[0].upper() + path[1:] |
# This test assumes the variable is in the native path case on disk, this |
# should be the case. Verify this assumption: |
- self.assertEquals(path, trace_inputs.get_native_path_case(path)) |
- self.assertEquals( |
+ self.assertEqual(path, trace_inputs.get_native_path_case(path)) |
+ self.assertEqual( |
trace_inputs.get_native_path_case(path.lower()), |
trace_inputs.get_native_path_case(path.upper())) |
@@ -154,70 +157,70 @@ class TraceInputs(unittest.TestCase): |
# Make sure non-existing element is not modified: |
self.assertTrue(lower.endswith(non_existing.lower())) |
self.assertTrue(upper.endswith(non_existing.upper())) |
- self.assertEquals(lower[:-len(non_existing)], upper[:-len(non_existing)]) |
+ self.assertEqual(lower[:-len(non_existing)], upper[:-len(non_existing)]) |
if sys.platform != 'win32': |
def test_symlink(self): |
# This test will fail if the checkout is in a symlink. |
actual = trace_inputs.split_at_symlink(None, ROOT_DIR) |
expected = (ROOT_DIR, None, None) |
- self.assertEquals(expected, actual) |
+ self.assertEqual(expected, actual) |
actual = trace_inputs.split_at_symlink( |
None, os.path.join(BASE_DIR, 'trace_inputs')) |
expected = ( |
os.path.join(BASE_DIR, 'trace_inputs'), None, None) |
- self.assertEquals(expected, actual) |
+ self.assertEqual(expected, actual) |
actual = trace_inputs.split_at_symlink( |
None, os.path.join(BASE_DIR, 'trace_inputs', 'files2')) |
expected = ( |
os.path.join(BASE_DIR, 'trace_inputs'), 'files2', '') |
- self.assertEquals(expected, actual) |
+ self.assertEqual(expected, actual) |
actual = trace_inputs.split_at_symlink( |
ROOT_DIR, os.path.join('tests', 'trace_inputs', 'files2')) |
expected = ( |
os.path.join('tests', 'trace_inputs'), 'files2', '') |
- self.assertEquals(expected, actual) |
+ self.assertEqual(expected, actual) |
actual = trace_inputs.split_at_symlink( |
ROOT_DIR, os.path.join('tests', 'trace_inputs', 'files2', 'bar')) |
expected = ( |
os.path.join('tests', 'trace_inputs'), 'files2', '/bar') |
- self.assertEquals(expected, actual) |
+ self.assertEqual(expected, actual) |
def test_native_case_symlink_right_case(self): |
actual = trace_inputs.get_native_path_case( |
os.path.join(BASE_DIR, 'trace_inputs')) |
- self.assertEquals('trace_inputs', os.path.basename(actual)) |
+ self.assertEqual('trace_inputs', os.path.basename(actual)) |
# Make sure the symlink is not resolved. |
actual = trace_inputs.get_native_path_case( |
os.path.join(BASE_DIR, 'trace_inputs', 'files2')) |
- self.assertEquals('files2', os.path.basename(actual)) |
+ self.assertEqual('files2', os.path.basename(actual)) |
if sys.platform == 'darwin': |
def test_native_case_symlink_wrong_case(self): |
base_dir = trace_inputs.get_native_path_case(BASE_DIR) |
trace_inputs_dir = os.path.join(base_dir, 'trace_inputs') |
actual = trace_inputs.get_native_path_case(trace_inputs_dir) |
- self.assertEquals(trace_inputs_dir, actual) |
+ self.assertEqual(trace_inputs_dir, actual) |
# Make sure the symlink is not resolved. |
data = os.path.join(trace_inputs_dir, 'Files2') |
actual = trace_inputs.get_native_path_case(data) |
- self.assertEquals( |
+ self.assertEqual( |
os.path.join(trace_inputs_dir, 'files2'), actual) |
data = os.path.join(trace_inputs_dir, 'Files2', '') |
actual = trace_inputs.get_native_path_case(data) |
- self.assertEquals( |
+ self.assertEqual( |
os.path.join(trace_inputs_dir, 'files2', ''), actual) |
data = os.path.join(trace_inputs_dir, 'Files2', 'Child1.py') |
actual = trace_inputs.get_native_path_case(data) |
# TODO(maruel): Should be child1.py. |
- self.assertEquals( |
+ self.assertEqual( |
os.path.join(trace_inputs_dir, 'files2', 'Child1.py'), actual) |
if sys.platform == 'win32': |
@@ -259,7 +262,14 @@ if sys.platform != 'win32': |
context = trace_inputs.Strace.Context(lambda _: False, initial_cwd) |
for line in lines: |
context.on_line(*line) |
- return context.to_results().flatten() |
+ done = any(p._done for p in context._process_lookup.itervalues()) |
+ return context.to_results().flatten(), done |
+ |
+ def assertContext(self, lines, initial_cwd, expected, expected_done): |
+ actual, actual_done = self._load_context(lines, initial_cwd) |
+ self.assertEqual(expected, actual) |
+ # If actual_done is True, this means the log was cut off abruptly. |
+ self.assertEqual(expected_done, actual_done) |
def _test_lines(self, lines, initial_cwd, files, command=None): |
filepath = join_norm(initial_cwd, '../out/unittests') |
@@ -277,7 +287,7 @@ if sys.platform != 'win32': |
if not files: |
expected['root']['command'] = None |
expected['root']['executable'] = None |
- self.assertEquals(expected, self._load_context(lines, initial_cwd)) |
+ self.assertContext(lines, initial_cwd, expected, False) |
def test_execve(self): |
lines = [ |
@@ -315,7 +325,7 @@ if sys.platform != 'win32': |
None, |
None, |
[]) |
- self.assertEquals(expected, e.args) |
+ self.assertEqual(expected, e.args) |
def test_chmod(self): |
lines = [ |
@@ -377,7 +387,7 @@ if sys.platform != 'win32': |
'pid': self._ROOT_PID, |
}, |
} |
- self.assertEquals(expected, self._load_context(lines, BASE_DIR)) |
+ self.assertContext(lines, BASE_DIR, expected, False) |
def test_clone_chdir(self): |
# Grand-child with relative directory. |
@@ -448,7 +458,7 @@ if sys.platform != 'win32': |
'pid': self._ROOT_PID, |
}, |
} |
- self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) |
+ self.assertContext(lines, ROOT_DIR, expected, False) |
def test_faccess(self): |
lines = [ |
@@ -465,7 +475,7 @@ if sys.platform != 'win32': |
'pid': self._ROOT_PID, |
}, |
} |
- self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) |
+ self.assertContext(lines, ROOT_DIR, expected, False) |
def test_futex_died(self): |
# That's a pretty bad fork, copy-pasted from a real log. |
@@ -483,7 +493,7 @@ if sys.platform != 'win32': |
'pid': self._ROOT_PID, |
}, |
} |
- self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) |
+ self.assertContext(lines, ROOT_DIR, expected, True) |
def test_futex_missing_in_action(self): |
# That's how futex() calls roll. |
@@ -520,7 +530,7 @@ if sys.platform != 'win32': |
'pid': self._ROOT_PID, |
}, |
} |
- self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) |
+ self.assertContext(lines, ROOT_DIR, expected, True) |
def test_futex_missing_in_partial_action(self): |
# That's how futex() calls roll even more. |
@@ -551,7 +561,7 @@ if sys.platform != 'win32': |
'pid': self._ROOT_PID, |
}, |
} |
- self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) |
+ self.assertContext(lines, ROOT_DIR, expected, True) |
def test_futex_missing_in_partial_action_with_no_process(self): |
# That's how futex() calls roll even more (again). |
@@ -569,7 +579,7 @@ if sys.platform != 'win32': |
'pid': self._ROOT_PID, |
}, |
} |
- self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) |
+ self.assertContext(lines, ROOT_DIR, expected, True) |
def test_open(self): |
lines = [ |
@@ -634,6 +644,25 @@ if sys.platform != 'win32': |
self._test_lines(lines, u'/home/foo_bar_user/src', files) |
+ def test_openat_died(self): |
+ lines = [ |
+ # It's fine as long as there is nothing after. |
+ ( self._ROOT_PID, |
+ 'openat(AT_FDCWD, "/tmp/.org.chromium.Chromium.NLRojh/Plugins", ' |
+ 'O_RDONLY|O_NONBLOCK|O_DIRECTORY|O_CLOEXEC'), |
+ ] |
+ expected = { |
+ 'root': { |
+ 'children': [], |
+ 'command': None, |
+ 'executable': None, |
+ 'files': [], |
+ 'initial_cwd': unicode(ROOT_DIR), |
+ 'pid': self._ROOT_PID, |
+ }, |
+ } |
+ self.assertContext(lines, ROOT_DIR, expected, True) |
+ |
def test_rmdir(self): |
lines = [ |
(self._ROOT_PID, 'rmdir("directory/to/delete") = 0'), |