Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(777)

Unified Diff: scripts/common/unittests/annotator_test.py

Issue 12386016: Initial implementation of annotator library. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Add unit tests, refactor step classes for protection. Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: scripts/common/unittests/annotator_test.py
diff --git a/scripts/common/unittests/annotator_test.py b/scripts/common/unittests/annotator_test.py
new file mode 100755
index 0000000000000000000000000000000000000000..735ea58574654b2614a862f45283eb83f32b8b60
--- /dev/null
+++ b/scripts/common/unittests/annotator_test.py
@@ -0,0 +1,320 @@
+#!/usr/bin/env python
+# Copyright (c) 2013 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Unit tests for classes in annotator.py."""
+
+import cStringIO
+import json
+import os
+import sys
+import tempfile
+import unittest
+
+import test_env # pylint: disable=W0611
+
+from common import annotator
+from common import chromium_utils
+
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+class FilterCapture(chromium_utils.RunCommandFilter):
+ """Captures the text and places it into an array."""
+ def __init__(self):
+ chromium_utils.RunCommandFilter.__init__(self)
+ self.text = []
+
+ def FilterLine(self, line):
+ self.text.append(line.rstrip())
+
+ def FilterDone(self, text):
+ self.text.append(text)
+
+
+class TestAnnotationStreams(unittest.TestCase):
+ def setUp(self):
+ self.buf = cStringIO.StringIO()
+
+ def _getLines(self):
+ return self.buf.getvalue().rstrip().split('\n')
+
+ def testBasicUsage(self):
+ stream = annotator.StructuredAnnotationStream(stream=self.buf)
+ with stream.step('one') as _:
+ pass
+ with stream.step('two') as _:
+ pass
+
+ result = [
+ '@@@SEED_STEP one@@@',
+ '@@@STEP_CURSOR one@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CLOSED@@@',
+ '@@@SEED_STEP two@@@',
+ '@@@STEP_CURSOR two@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CLOSED@@@',
+ ]
+
+ self.assertEquals(result, self._getLines())
+
+ def testStepAnnotations(self):
+ stream = annotator.StructuredAnnotationStream(stream=self.buf)
+ with stream.step('one') as s:
+ s.step_warnings()
+ s.step_failure()
+ s.step_exception()
+ s.step_clear()
+ s.step_summary_clear()
+ s.step_text('hello')
+ s.step_summary_text('hello!')
+ s.step_log_line('mylog', 'test')
+ s.step_log_end('mylog')
+ s.step_log_line('myperflog', 'perf data')
+ s.step_log_end_perf('myperflog', 'dashboardname')
+ s.write_log_lines('full_log', ['line one', 'line two'])
+ s.write_log_lines('full_perf_log', ['perf line one', 'perf line two'],
+ perf='full_perf')
iannucci 2013/03/02 01:49:14 Should there be a test that tries to write to logs
Mike Stip (use stip instead) 2013/03/02 02:13:45 The master already throws up an exception when thi
+
+ result = [
+ '@@@SEED_STEP one@@@',
+ '@@@STEP_CURSOR one@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_WARNINGS@@@',
+ '@@@STEP_FAILURE@@@',
+ '@@@STEP_EXCEPTION@@@',
+ '@@@STEP_CLEAR@@@',
+ '@@@STEP_SUMMARY_CLEAR@@@',
+ '@@@STEP_TEXT@hello@@@',
+ '@@@STEP_SUMMARY_TEXT@hello!@@@',
+ '@@@STEP_LOG_LINE@mylog@test@@@',
+ '@@@STEP_LOG_END@mylog@@@',
+ '@@@STEP_LOG_LINE@myperflog@perf data@@@',
+ '@@@STEP_LOG_END_PERF@myperflog@dashboardname@@@',
+ '@@@STEP_LOG_LINE@full_log@line one@@@',
+ '@@@STEP_LOG_LINE@full_log@line two@@@',
+ '@@@STEP_LOG_END@full_log@@@',
+ '@@@STEP_LOG_LINE@full_perf_log@perf line one@@@',
+ '@@@STEP_LOG_LINE@full_perf_log@perf line two@@@',
+ '@@@STEP_LOG_END_PERF@full_perf_log@full_perf@@@',
+ '@@@STEP_CLOSED@@@',
+ ]
+
+ self.assertEquals(result, self._getLines())
+
+ def testSeedStep(self):
+ steps = ['one', 'two']
+ stream = annotator.StructuredAnnotationStream(seed_steps=steps,
+ stream=self.buf)
+ with stream.step('one'):
+ pass
+ with stream.step('two'):
+ pass
+
+ result = [
+ '@@@SEED_STEP one@@@',
+ '@@@SEED_STEP two@@@',
+ '@@@STEP_CURSOR one@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CLOSED@@@',
+ '@@@STEP_CURSOR two@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CLOSED@@@'
+ ]
+
+ self.assertEquals(result, self._getLines())
+
+ def testSeedStepSkip(self):
+ steps = ['one', 'two', 'three']
+ stream = annotator.StructuredAnnotationStream(seed_steps=steps,
+ stream=self.buf)
+ with stream.step('one'):
+ pass
+ with stream.step('three'):
+ pass
+ with stream.step('two'):
+ pass
+
+ result = [
+ '@@@SEED_STEP one@@@',
+ '@@@SEED_STEP two@@@',
+ '@@@SEED_STEP three@@@',
+ '@@@STEP_CURSOR one@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CLOSED@@@',
+ '@@@STEP_CURSOR three@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CLOSED@@@',
+ '@@@SEED_STEP two@@@',
+ '@@@STEP_CURSOR two@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CLOSED@@@',
+ ]
+
+ self.assertEquals(result, self._getLines())
+
+ def testException(self):
+ stream = annotator.StructuredAnnotationStream(stream=self.buf)
+
+ with self.assertRaises(Exception):
+ with stream.step('one'):
+ raise Exception('oh no!')
+
+ log_string = '@@@STEP_LOG_LINE@exception'
+ exception = any(line.startswith(log_string) for line in self._getLines())
+ self.assertTrue(exception)
+
+ def testNoNesting(self):
+ stream = annotator.StructuredAnnotationStream(stream=self.buf)
+
+ with self.assertRaises(Exception):
+ with stream.step('one'):
+ with stream.step('two'):
+ pass
+
+ def testProtectedStartStop(self):
+ stream = annotator.StructuredAnnotationStream(stream=self.buf)
+
+ with self.assertRaises(AttributeError):
+ with stream.step('one') as s:
+ s.step_started()
+ with self.assertRaises(AttributeError):
+ with stream.step('two') as s:
+ s.step_closed()
+
+ def testAdvanced(self):
+ step = annotator.AdvancedAnnotationStep(stream=self.buf)
+ stream = annotator.AdvancedAnnotationStream(stream=self.buf)
+ stream.seed_step('one')
+ stream.seed_step('two')
+ stream.step_cursor('one')
+ step.step_started()
+ stream.step_cursor('two')
+ step.step_started()
+ stream.step_cursor('one')
+ step.step_closed()
+ stream.step_cursor('two')
+ step.step_closed()
+
+ result = [
+ '@@@SEED_STEP one@@@',
+ '@@@SEED_STEP two@@@',
+ '@@@STEP_CURSOR one@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CURSOR two@@@',
+ '@@@STEP_STARTED@@@',
+ '@@@STEP_CURSOR one@@@',
+ '@@@STEP_CLOSED@@@',
+ '@@@STEP_CURSOR two@@@',
+ '@@@STEP_CLOSED@@@',
+ ]
+
+ self.assertEquals(result, self._getLines())
+
+
+def _synthesizeCmd(args):
+ basecmd = [sys.executable, '-c']
+ basecmd.extend(args)
+ return basecmd
+
+
+class TestExecution(unittest.TestCase):
+ def setUp(self):
+ self.capture = FilterCapture()
+ self.tempfd, self.tempfn = tempfile.mkstemp()
+ self.temp = os.fdopen(self.tempfd, 'wb')
+ self.script = os.path.join(SCRIPT_DIR, os.pardir, 'annotator.py')
+
+ def tearDown(self):
+ self.temp.close()
+ if os.path.exists(self.tempfn):
+ os.remove(self.tempfn)
+
+ def _runAnnotator(self, cmdlist):
+ json.dump(cmdlist, self.temp)
+ self.temp.close()
+ cmd = [sys.executable, self.script, self.tempfn]
+ env = os.environ.copy()
+ env['PYTHONPATH'] = os.pathsep.join(sys.path)
+ return chromium_utils.RunCommand(cmd, filter_obj=self.capture, env=env,
+ print_cmd=False)
+
+ def testSimpleExecution(self):
+ cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])},
+ {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}]
+
+ ret = self._runAnnotator(cmdlist)
+
+ self.assertEquals(ret, 0)
+
+ result = [
+ '@@@SEED_STEP one@@@',
+ '@@@SEED_STEP two@@@',
+ '@@@STEP_CURSOR one@@@',
+ '@@@STEP_STARTED@@@',
+ '',
+ '/usr/bin/python -c "print \'hello!\'"',
+ 'hello!',
+ '@@@STEP_CLOSED@@@',
+ '@@@STEP_CURSOR two@@@',
+ '@@@STEP_STARTED@@@',
+ '',
+ '/usr/bin/python -c "print \'yo!\'"',
+ 'yo!',
+ '@@@STEP_CLOSED@@@',
+ ]
+ self.assertEquals(result, self.capture.text)
+
+ def testFailBuild(self):
+ cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])},
+ {'name': 'two', 'cmd': _synthesizeCmd(['error'])}]
+
+ ret = self._runAnnotator(cmdlist)
+
+ self.assertIn('@@@STEP_FAILURE@@@', self.capture.text)
+ self.assertEquals(ret, 1)
+
+ def testStopBuild(self):
+ cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['error'])},
+ {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}]
+
+ ret = self._runAnnotator(cmdlist)
+
+ self.assertNotIn('@@@STEP_CURSOR two@@@', self.capture.text)
+ self.assertEquals(ret, 1)
+
+ def testException(self):
+ cmdlist = [{'name': 'one', 'cmd': ['doesn\'t exist']}]
+
+ ret = self._runAnnotator(cmdlist)
+
+ self.assertIn('@@@STEP_EXCEPTION@@@', self.capture.text)
+ self.assertEquals(ret, 1)
+
+ def testAlwaysRun(self):
+ cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['error'])},
+ {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\'']),
+ 'always_run': True}]
+
+ ret = self._runAnnotator(cmdlist)
+ self.assertIn('@@@STEP_CURSOR two@@@', self.capture.text)
+ self.assertIn('yo!', self.capture.text)
+ self.assertEquals(ret, 1)
+
+ def testAlwaysRunNoDupes(self):
+ cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'yo!\'']),
+ 'always_run': True},
+ {'name': 'two', 'cmd': _synthesizeCmd(['error'])},
+ {'name': 'three', 'cmd': _synthesizeCmd(['print \'hello!\'']),
+ 'always_run': True}]
+
+ ret = self._runAnnotator(cmdlist)
+ self.assertEquals(self.capture.text.count('yo!'), 1)
+ self.assertIn('hello!', self.capture.text)
+ self.assertEquals(ret, 1)
+
+if __name__ == '__main__':
+ unittest.main()

Powered by Google App Engine
This is Rietveld 408576698