Chromium Code Reviews| Index: scripts/common/unittests/annotator_test.py |
| diff --git a/scripts/common/unittests/annotator_test.py b/scripts/common/unittests/annotator_test.py |
| new file mode 100755 |
| index 0000000000000000000000000000000000000000..735ea58574654b2614a862f45283eb83f32b8b60 |
| --- /dev/null |
| +++ b/scripts/common/unittests/annotator_test.py |
| @@ -0,0 +1,320 @@ |
| +#!/usr/bin/env python |
| +# Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +"""Unit tests for classes in annotator.py.""" |
| + |
| +import cStringIO |
| +import json |
| +import os |
| +import sys |
| +import tempfile |
| +import unittest |
| + |
| +import test_env # pylint: disable=W0611 |
| + |
| +from common import annotator |
| +from common import chromium_utils |
| + |
| + |
| +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
| + |
| + |
| +class FilterCapture(chromium_utils.RunCommandFilter): |
| + """Captures the text and places it into an array.""" |
| + def __init__(self): |
| + chromium_utils.RunCommandFilter.__init__(self) |
| + self.text = [] |
| + |
| + def FilterLine(self, line): |
| + self.text.append(line.rstrip()) |
| + |
| + def FilterDone(self, text): |
| + self.text.append(text) |
| + |
| + |
| +class TestAnnotationStreams(unittest.TestCase): |
| + def setUp(self): |
| + self.buf = cStringIO.StringIO() |
| + |
| + def _getLines(self): |
| + return self.buf.getvalue().rstrip().split('\n') |
| + |
| + def testBasicUsage(self): |
| + stream = annotator.StructuredAnnotationStream(stream=self.buf) |
| + with stream.step('one') as _: |
| + pass |
| + with stream.step('two') as _: |
| + pass |
| + |
| + result = [ |
| + '@@@SEED_STEP one@@@', |
| + '@@@STEP_CURSOR one@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CLOSED@@@', |
| + '@@@SEED_STEP two@@@', |
| + '@@@STEP_CURSOR two@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CLOSED@@@', |
| + ] |
| + |
| + self.assertEquals(result, self._getLines()) |
| + |
| + def testStepAnnotations(self): |
| + stream = annotator.StructuredAnnotationStream(stream=self.buf) |
| + with stream.step('one') as s: |
| + s.step_warnings() |
| + s.step_failure() |
| + s.step_exception() |
| + s.step_clear() |
| + s.step_summary_clear() |
| + s.step_text('hello') |
| + s.step_summary_text('hello!') |
| + s.step_log_line('mylog', 'test') |
| + s.step_log_end('mylog') |
| + s.step_log_line('myperflog', 'perf data') |
| + s.step_log_end_perf('myperflog', 'dashboardname') |
| + s.write_log_lines('full_log', ['line one', 'line two']) |
| + s.write_log_lines('full_perf_log', ['perf line one', 'perf line two'], |
| + perf='full_perf') |
|
iannucci
2013/03/02 01:49:14
Should there be a test that tries to write to logs
Mike Stip (use stip instead)
2013/03/02 02:13:45
The master already throws up an exception when thi
|
| + |
| + result = [ |
| + '@@@SEED_STEP one@@@', |
| + '@@@STEP_CURSOR one@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_WARNINGS@@@', |
| + '@@@STEP_FAILURE@@@', |
| + '@@@STEP_EXCEPTION@@@', |
| + '@@@STEP_CLEAR@@@', |
| + '@@@STEP_SUMMARY_CLEAR@@@', |
| + '@@@STEP_TEXT@hello@@@', |
| + '@@@STEP_SUMMARY_TEXT@hello!@@@', |
| + '@@@STEP_LOG_LINE@mylog@test@@@', |
| + '@@@STEP_LOG_END@mylog@@@', |
| + '@@@STEP_LOG_LINE@myperflog@perf data@@@', |
| + '@@@STEP_LOG_END_PERF@myperflog@dashboardname@@@', |
| + '@@@STEP_LOG_LINE@full_log@line one@@@', |
| + '@@@STEP_LOG_LINE@full_log@line two@@@', |
| + '@@@STEP_LOG_END@full_log@@@', |
| + '@@@STEP_LOG_LINE@full_perf_log@perf line one@@@', |
| + '@@@STEP_LOG_LINE@full_perf_log@perf line two@@@', |
| + '@@@STEP_LOG_END_PERF@full_perf_log@full_perf@@@', |
| + '@@@STEP_CLOSED@@@', |
| + ] |
| + |
| + self.assertEquals(result, self._getLines()) |
| + |
| + def testSeedStep(self): |
| + steps = ['one', 'two'] |
| + stream = annotator.StructuredAnnotationStream(seed_steps=steps, |
| + stream=self.buf) |
| + with stream.step('one'): |
| + pass |
| + with stream.step('two'): |
| + pass |
| + |
| + result = [ |
| + '@@@SEED_STEP one@@@', |
| + '@@@SEED_STEP two@@@', |
| + '@@@STEP_CURSOR one@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CLOSED@@@', |
| + '@@@STEP_CURSOR two@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CLOSED@@@' |
| + ] |
| + |
| + self.assertEquals(result, self._getLines()) |
| + |
| + def testSeedStepSkip(self): |
| + steps = ['one', 'two', 'three'] |
| + stream = annotator.StructuredAnnotationStream(seed_steps=steps, |
| + stream=self.buf) |
| + with stream.step('one'): |
| + pass |
| + with stream.step('three'): |
| + pass |
| + with stream.step('two'): |
| + pass |
| + |
| + result = [ |
| + '@@@SEED_STEP one@@@', |
| + '@@@SEED_STEP two@@@', |
| + '@@@SEED_STEP three@@@', |
| + '@@@STEP_CURSOR one@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CLOSED@@@', |
| + '@@@STEP_CURSOR three@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CLOSED@@@', |
| + '@@@SEED_STEP two@@@', |
| + '@@@STEP_CURSOR two@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CLOSED@@@', |
| + ] |
| + |
| + self.assertEquals(result, self._getLines()) |
| + |
| + def testException(self): |
| + stream = annotator.StructuredAnnotationStream(stream=self.buf) |
| + |
| + with self.assertRaises(Exception): |
| + with stream.step('one'): |
| + raise Exception('oh no!') |
| + |
| + log_string = '@@@STEP_LOG_LINE@exception' |
| + exception = any(line.startswith(log_string) for line in self._getLines()) |
| + self.assertTrue(exception) |
| + |
| + def testNoNesting(self): |
| + stream = annotator.StructuredAnnotationStream(stream=self.buf) |
| + |
| + with self.assertRaises(Exception): |
| + with stream.step('one'): |
| + with stream.step('two'): |
| + pass |
| + |
| + def testProtectedStartStop(self): |
| + stream = annotator.StructuredAnnotationStream(stream=self.buf) |
| + |
| + with self.assertRaises(AttributeError): |
| + with stream.step('one') as s: |
| + s.step_started() |
| + with self.assertRaises(AttributeError): |
| + with stream.step('two') as s: |
| + s.step_closed() |
| + |
| + def testAdvanced(self): |
| + step = annotator.AdvancedAnnotationStep(stream=self.buf) |
| + stream = annotator.AdvancedAnnotationStream(stream=self.buf) |
| + stream.seed_step('one') |
| + stream.seed_step('two') |
| + stream.step_cursor('one') |
| + step.step_started() |
| + stream.step_cursor('two') |
| + step.step_started() |
| + stream.step_cursor('one') |
| + step.step_closed() |
| + stream.step_cursor('two') |
| + step.step_closed() |
| + |
| + result = [ |
| + '@@@SEED_STEP one@@@', |
| + '@@@SEED_STEP two@@@', |
| + '@@@STEP_CURSOR one@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CURSOR two@@@', |
| + '@@@STEP_STARTED@@@', |
| + '@@@STEP_CURSOR one@@@', |
| + '@@@STEP_CLOSED@@@', |
| + '@@@STEP_CURSOR two@@@', |
| + '@@@STEP_CLOSED@@@', |
| + ] |
| + |
| + self.assertEquals(result, self._getLines()) |
| + |
| + |
| +def _synthesizeCmd(args): |
| + basecmd = [sys.executable, '-c'] |
| + basecmd.extend(args) |
| + return basecmd |
| + |
| + |
| +class TestExecution(unittest.TestCase): |
| + def setUp(self): |
| + self.capture = FilterCapture() |
| + self.tempfd, self.tempfn = tempfile.mkstemp() |
| + self.temp = os.fdopen(self.tempfd, 'wb') |
| + self.script = os.path.join(SCRIPT_DIR, os.pardir, 'annotator.py') |
| + |
| + def tearDown(self): |
| + self.temp.close() |
| + if os.path.exists(self.tempfn): |
| + os.remove(self.tempfn) |
| + |
| + def _runAnnotator(self, cmdlist): |
| + json.dump(cmdlist, self.temp) |
| + self.temp.close() |
| + cmd = [sys.executable, self.script, self.tempfn] |
| + env = os.environ.copy() |
| + env['PYTHONPATH'] = os.pathsep.join(sys.path) |
| + return chromium_utils.RunCommand(cmd, filter_obj=self.capture, env=env, |
| + print_cmd=False) |
| + |
| + def testSimpleExecution(self): |
| + cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])}, |
| + {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}] |
| + |
| + ret = self._runAnnotator(cmdlist) |
| + |
| + self.assertEquals(ret, 0) |
| + |
| + result = [ |
| + '@@@SEED_STEP one@@@', |
| + '@@@SEED_STEP two@@@', |
| + '@@@STEP_CURSOR one@@@', |
| + '@@@STEP_STARTED@@@', |
| + '', |
| + '/usr/bin/python -c "print \'hello!\'"', |
| + 'hello!', |
| + '@@@STEP_CLOSED@@@', |
| + '@@@STEP_CURSOR two@@@', |
| + '@@@STEP_STARTED@@@', |
| + '', |
| + '/usr/bin/python -c "print \'yo!\'"', |
| + 'yo!', |
| + '@@@STEP_CLOSED@@@', |
| + ] |
| + self.assertEquals(result, self.capture.text) |
| + |
| + def testFailBuild(self): |
| + cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])}, |
| + {'name': 'two', 'cmd': _synthesizeCmd(['error'])}] |
| + |
| + ret = self._runAnnotator(cmdlist) |
| + |
| + self.assertIn('@@@STEP_FAILURE@@@', self.capture.text) |
| + self.assertEquals(ret, 1) |
| + |
| + def testStopBuild(self): |
| + cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['error'])}, |
| + {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}] |
| + |
| + ret = self._runAnnotator(cmdlist) |
| + |
| + self.assertNotIn('@@@STEP_CURSOR two@@@', self.capture.text) |
| + self.assertEquals(ret, 1) |
| + |
| + def testException(self): |
| + cmdlist = [{'name': 'one', 'cmd': ['doesn\'t exist']}] |
| + |
| + ret = self._runAnnotator(cmdlist) |
| + |
| + self.assertIn('@@@STEP_EXCEPTION@@@', self.capture.text) |
| + self.assertEquals(ret, 1) |
| + |
| + def testAlwaysRun(self): |
| + cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['error'])}, |
| + {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\'']), |
| + 'always_run': True}] |
| + |
| + ret = self._runAnnotator(cmdlist) |
| + self.assertIn('@@@STEP_CURSOR two@@@', self.capture.text) |
| + self.assertIn('yo!', self.capture.text) |
| + self.assertEquals(ret, 1) |
| + |
| + def testAlwaysRunNoDupes(self): |
| + cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'yo!\'']), |
| + 'always_run': True}, |
| + {'name': 'two', 'cmd': _synthesizeCmd(['error'])}, |
| + {'name': 'three', 'cmd': _synthesizeCmd(['print \'hello!\'']), |
| + 'always_run': True}] |
| + |
| + ret = self._runAnnotator(cmdlist) |
| + self.assertEquals(self.capture.text.count('yo!'), 1) |
| + self.assertIn('hello!', self.capture.text) |
| + self.assertEquals(ret, 1) |
| + |
| +if __name__ == '__main__': |
| + unittest.main() |