| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2013 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 """Unit tests for classes in annotator.py.""" | 6 """Unit tests for classes in annotator.py.""" |
| 7 | 7 |
| 8 import cStringIO | 8 import cStringIO |
| 9 import json | 9 import json |
| 10 import types | 10 import types |
| 11 import os | 11 import os |
| 12 import sys | 12 import sys |
| 13 import tempfile | 13 import tempfile |
| 14 import unittest | 14 import unittest |
| 15 | 15 |
| 16 import test_env # pylint: disable=W0611 | 16 import test_env # pylint: disable=W0611 |
| 17 | 17 |
| 18 from common import annotator | 18 from common import annotator |
| 19 from common import chromium_utils | |
| 20 | 19 |
| 21 | 20 |
| 22 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | 21 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
| 23 | 22 |
| 24 | 23 |
| 25 class TestAnnotationStreams(unittest.TestCase): | 24 class TestAnnotationStreams(unittest.TestCase): |
| 26 def setUp(self): | 25 def setUp(self): |
| 27 self.buf = cStringIO.StringIO() | 26 self.buf = cStringIO.StringIO() |
| 28 | 27 |
| 29 def _getLines(self): | 28 def _getLines(self): |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 104 with stream.step('one') as s: | 103 with stream.step('one') as s: |
| 105 with self.assertRaisesRegexp(TypeError, r'1 argument \(2 given\)'): | 104 with self.assertRaisesRegexp(TypeError, r'1 argument \(2 given\)'): |
| 106 s.step_warnings('bar') | 105 s.step_warnings('bar') |
| 107 with self.assertRaisesRegexp(TypeError, r'2 arguments \(3 given\)'): | 106 with self.assertRaisesRegexp(TypeError, r'2 arguments \(3 given\)'): |
| 108 s.step_summary_text('hello!', 'bar') | 107 s.step_summary_text('hello!', 'bar') |
| 109 with self.assertRaisesRegexp(TypeError, r'3 arguments \(1 given\)'): | 108 with self.assertRaisesRegexp(TypeError, r'3 arguments \(1 given\)'): |
| 110 s.step_log_line() | 109 s.step_log_line() |
| 111 | 110 |
| 112 def testStepAnnotationsDocstring(self): | 111 def testStepAnnotationsDocstring(self): |
| 113 self.assertEqual( | 112 self.assertEqual( |
| 114 annotator.AdvancedAnnotationStep.step_link.__doc__, | 113 annotator.StructuredAnnotationStep.step_link.__doc__, |
| 115 'Emits an annotation for STEP_LINK.' | 114 'Emits an annotation for STEP_LINK.' |
| 116 ) | 115 ) |
| 117 | 116 |
| 118 | 117 |
| 119 def testException(self): | 118 def testException(self): |
| 120 stream = annotator.StructuredAnnotationStream(stream=self.buf) | 119 stream = annotator.StructuredAnnotationStream(stream=self.buf) |
| 121 | 120 |
| 122 def dummy_func(): | 121 def dummy_func(): |
| 123 with stream.step('one'): | 122 with stream.step('one'): |
| 124 raise Exception('oh no!') | 123 raise Exception('oh no!') |
| (...skipping 13 matching lines...) Expand all Loading... |
| 138 self.assertRaises(Exception, dummy_func) | 137 self.assertRaises(Exception, dummy_func) |
| 139 | 138 |
| 140 def testDupLogs(self): | 139 def testDupLogs(self): |
| 141 stream = annotator.StructuredAnnotationStream(stream=self.buf) | 140 stream = annotator.StructuredAnnotationStream(stream=self.buf) |
| 142 | 141 |
| 143 with stream.step('one') as s: | 142 with stream.step('one') as s: |
| 144 lines = ['one', 'two'] | 143 lines = ['one', 'two'] |
| 145 s.write_log_lines('mylog', lines) | 144 s.write_log_lines('mylog', lines) |
| 146 self.assertRaises(ValueError, s.write_log_lines, 'mylog', lines) | 145 self.assertRaises(ValueError, s.write_log_lines, 'mylog', lines) |
| 147 | 146 |
| 148 def testAdvanced(self): | 147 def testStructured(self): |
| 149 step = annotator.AdvancedAnnotationStep(stream=self.buf, flush_before=None) | 148 stream = annotator.StructuredAnnotationStream( |
| 150 stream = annotator.AdvancedAnnotationStream(stream=self.buf, | 149 stream=self.buf, flush_before=None) |
| 151 flush_before=None) | 150 step = annotator.StructuredAnnotationStep( |
| 151 annotation_stream=stream, stream=self.buf, flush_before=None) |
| 152 stream.step_cursor('one') | 152 stream.step_cursor('one') |
| 153 step.step_started() | 153 step.step_started() |
| 154 stream.step_cursor('two') | 154 stream.step_cursor('two') |
| 155 step.step_started() | 155 step.step_started() |
| 156 stream.step_cursor('one') | 156 stream.step_cursor('one') |
| 157 step.step_closed() | 157 step.step_closed() |
| 158 stream.step_cursor('two') | 158 stream.step_cursor('two') |
| 159 step.step_closed() | 159 step.step_closed() |
| 160 | 160 |
| 161 result = [ | 161 result = [ |
| 162 '@@@STEP_CURSOR one@@@', | 162 '@@@STEP_CURSOR one@@@', |
| 163 '@@@STEP_STARTED@@@', | 163 '@@@STEP_STARTED@@@', |
| 164 '@@@STEP_CURSOR two@@@', | 164 '@@@STEP_CURSOR two@@@', |
| 165 '@@@STEP_STARTED@@@', | 165 '@@@STEP_STARTED@@@', |
| 166 '@@@STEP_CURSOR one@@@', | 166 '@@@STEP_CURSOR one@@@', |
| 167 '@@@STEP_CLOSED@@@', | 167 '@@@STEP_CLOSED@@@', |
| 168 '@@@STEP_CURSOR two@@@', | 168 '@@@STEP_CURSOR two@@@', |
| 169 '@@@STEP_CLOSED@@@', | 169 '@@@STEP_CLOSED@@@', |
| 170 ] | 170 ] |
| 171 | 171 |
| 172 self.assertEquals(result, self._getLines()) | 172 self.assertEquals(result, self._getLines()) |
| 173 | 173 |
| 174 | 174 |
| 175 def _synthesizeCmd(args): | |
| 176 basecmd = [sys.executable, '-c'] | |
| 177 basecmd.extend(args) | |
| 178 return basecmd | |
| 179 | |
| 180 | |
| 181 class TestExecution(unittest.TestCase): | |
| 182 def setUp(self): | |
| 183 self.capture = chromium_utils.FilterCapture() | |
| 184 self.tempfd, self.tempfn = tempfile.mkstemp() | |
| 185 self.temp = os.fdopen(self.tempfd, 'wb') | |
| 186 self.script = os.path.join(SCRIPT_DIR, os.pardir, 'annotator.py') | |
| 187 | |
| 188 def tearDown(self): | |
| 189 self.temp.close() | |
| 190 if os.path.exists(self.tempfn): | |
| 191 os.remove(self.tempfn) | |
| 192 | |
| 193 def _runAnnotator(self, cmdlist, env=None): | |
| 194 json.dump(cmdlist, self.temp) | |
| 195 self.temp.close() | |
| 196 cmd = [sys.executable, self.script, self.tempfn] | |
| 197 cmd_env = os.environ.copy() | |
| 198 cmd_env['PYTHONPATH'] = os.pathsep.join(sys.path) | |
| 199 if env: | |
| 200 cmd_env.update(env) | |
| 201 return chromium_utils.RunCommand(cmd, filter_obj=self.capture, env=cmd_env, | |
| 202 print_cmd=False) | |
| 203 | |
| 204 def testSimpleExecution(self): | |
| 205 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])}, | |
| 206 {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}] | |
| 207 | |
| 208 ret = self._runAnnotator(cmdlist) | |
| 209 | |
| 210 self.assertEquals(ret, 0) | |
| 211 | |
| 212 step_one_header = [ | |
| 213 '@@@STEP_CURSOR one@@@', | |
| 214 '', | |
| 215 '@@@STEP_STARTED@@@', | |
| 216 '', | |
| 217 sys.executable + " -c print 'hello!'", | |
| 218 'in dir %s:' % os.getcwd(), | |
| 219 ' allow_subannotations: False', | |
| 220 ' cmd: [' + repr(sys.executable) + ', \'-c\', "print \'hello!\'"]', | |
| 221 ' name: one', | |
| 222 'full environment:', | |
| 223 ] | |
| 224 step_one_result = [ | |
| 225 'hello!', | |
| 226 '', | |
| 227 '@@@STEP_CURSOR one@@@', | |
| 228 '', | |
| 229 '@@@STEP_CLOSED@@@', | |
| 230 ] | |
| 231 step_two_header = [ | |
| 232 '@@@STEP_CURSOR two@@@', | |
| 233 '', | |
| 234 '@@@STEP_STARTED@@@', | |
| 235 '', | |
| 236 sys.executable + " -c print 'yo!'", | |
| 237 'in dir %s:' % os.getcwd(), | |
| 238 ' allow_subannotations: False', | |
| 239 ' cmd: [' + repr(sys.executable) + ', \'-c\', "print \'yo!\'"]', | |
| 240 ' name: two', | |
| 241 'full environment:', | |
| 242 ] | |
| 243 step_two_result = [ | |
| 244 'yo!', | |
| 245 '', | |
| 246 '@@@STEP_CURSOR two@@@', | |
| 247 '', | |
| 248 '@@@STEP_CLOSED@@@', | |
| 249 ] | |
| 250 | |
| 251 def has_sublist(whole, part): | |
| 252 n = len(part) | |
| 253 return any((part == whole[i:i+n]) for i in xrange(len(whole) - n+1)) | |
| 254 | |
| 255 self.assertTrue(has_sublist(self.capture.text, step_one_header)) | |
| 256 self.assertTrue(has_sublist(self.capture.text, step_one_result)) | |
| 257 self.assertTrue(has_sublist(self.capture.text, step_two_header)) | |
| 258 self.assertTrue(has_sublist(self.capture.text, step_two_result)) | |
| 259 | |
| 260 def testFailBuild(self): | |
| 261 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['print \'hello!\''])}, | |
| 262 {'name': 'two', 'cmd': _synthesizeCmd(['error'])}] | |
| 263 | |
| 264 ret = self._runAnnotator(cmdlist) | |
| 265 | |
| 266 self.assertTrue('@@@STEP_FAILURE@@@' in self.capture.text) | |
| 267 self.assertEquals(ret, 1) | |
| 268 | |
| 269 def testStopBuild(self): | |
| 270 cmdlist = [{'name': 'one', 'cmd': _synthesizeCmd(['error'])}, | |
| 271 {'name': 'two', 'cmd': _synthesizeCmd(['print \'yo!\''])}] | |
| 272 | |
| 273 ret = self._runAnnotator(cmdlist) | |
| 274 | |
| 275 self.assertTrue('@@@STEP_CURSOR two@@@' not in self.capture.text) | |
| 276 self.assertEquals(ret, 1) | |
| 277 | |
| 278 def testException(self): | |
| 279 cmdlist = [{'name': 'one', 'cmd': ['doesn\'t exist']}] | |
| 280 | |
| 281 ret = self._runAnnotator(cmdlist) | |
| 282 | |
| 283 self.assertTrue('@@@STEP_EXCEPTION@@@' in self.capture.text) | |
| 284 self.assertEquals(ret, 1) | |
| 285 | |
| 286 def testCwd(self): | |
| 287 tmpdir = os.path.realpath(tempfile.mkdtemp()) | |
| 288 try: | |
| 289 cmdlist = [{'name': 'one', | |
| 290 'cmd': _synthesizeCmd(['import os; print os.getcwd()']), | |
| 291 'cwd': tmpdir | |
| 292 },] | |
| 293 ret = self._runAnnotator(cmdlist) | |
| 294 finally: | |
| 295 os.rmdir(tmpdir) | |
| 296 self.assertTrue(tmpdir in self.capture.text) | |
| 297 self.assertEquals(ret, 0) | |
| 298 | |
| 299 def testStepEnvKeep(self): | |
| 300 cmdlist = [{'name': 'one', | |
| 301 'cmd': _synthesizeCmd([ | |
| 302 'import os; print os.environ[\'SOME_ENV\']' | |
| 303 ]), | |
| 304 'env': {'SOME_OTHER_ENV': '123'} | |
| 305 },] | |
| 306 ret = self._runAnnotator(cmdlist, env={'SOME_ENV': 'blah-blah'}) | |
| 307 self.assertTrue('blah-blah' in self.capture.text) | |
| 308 self.assertEquals(ret, 0) | |
| 309 | |
| 310 def testStepEnvAdd(self): | |
| 311 cmdlist = [{'name': 'one', | |
| 312 'cmd': _synthesizeCmd([ | |
| 313 'import os; print os.environ[\'SOME_ENV\']' | |
| 314 ]), | |
| 315 'env': {'SOME_ENV': 'blah-blah'} | |
| 316 },] | |
| 317 ret = self._runAnnotator(cmdlist) | |
| 318 self.assertTrue('blah-blah' in self.capture.text) | |
| 319 self.assertEquals(ret, 0) | |
| 320 | |
| 321 def testStepEnvReplace(self): | |
| 322 cmdlist = [{'name': 'one', | |
| 323 'cmd': _synthesizeCmd([ | |
| 324 'import os; print os.environ[\'SOME_ENV\']' | |
| 325 ]), | |
| 326 'env': {'SOME_ENV': 'two'} | |
| 327 },] | |
| 328 ret = self._runAnnotator(cmdlist, env={'SOME_ENV': 'one'}) | |
| 329 self.assertTrue('two' in self.capture.text) | |
| 330 self.assertEquals(ret, 0) | |
| 331 | |
| 332 def testStepEnvRemove(self): | |
| 333 cmdlist = [{'name': 'one', | |
| 334 'cmd': _synthesizeCmd([ | |
| 335 'import os\n' | |
| 336 'print \'SOME_ENV is set:\', \'SOME_ENV\' in os.environ' | |
| 337 ]), | |
| 338 'env': {'SOME_ENV': None} | |
| 339 },] | |
| 340 ret = self._runAnnotator(cmdlist, env={'SOME_ENV': 'one'}) | |
| 341 self.assertTrue('SOME_ENV is set: False' in self.capture.text) | |
| 342 self.assertEquals(ret, 0) | |
| 343 | |
| 344 def testIgnoreAnnotations(self): | |
| 345 cmdlist = [{'name': 'one', | |
| 346 'cmd': _synthesizeCmd(['print \'@@@SEED_STEP@blah@@@\'']), | |
| 347 'ignore_annotations': True | |
| 348 },] | |
| 349 ret = self._runAnnotator(cmdlist) | |
| 350 self.assertFalse('@@@SEED_STEP@blah@@@' in self.capture.text) | |
| 351 self.assertEquals(ret, 0) | |
| 352 | |
| 353 | |
| 354 class TestMatchAnnotation(unittest.TestCase): | 175 class TestMatchAnnotation(unittest.TestCase): |
| 355 class Callback(object): | 176 class Callback(object): |
| 356 def __init__(self): | 177 def __init__(self): |
| 357 self.called = [] | 178 self.called = [] |
| 358 | 179 |
| 359 def STEP_WARNINGS(self): | 180 def STEP_WARNINGS(self): |
| 360 self.called.append(('STEP_WARNINGS', [])) | 181 self.called.append(('STEP_WARNINGS', [])) |
| 361 | 182 |
| 362 def STEP_LOG_LINE(self, log_name, line): | 183 def STEP_LOG_LINE(self, log_name, line): |
| 363 self.called.append(('STEP_LOG_LINE', [log_name, line])) | 184 self.called.append(('STEP_LOG_LINE', [log_name, line])) |
| (...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 480 continue | 301 continue |
| 481 implemented.add(name) | 302 implemented.add(name) |
| 482 self.assertIsInstance(fn, types.FunctionType) | 303 self.assertIsInstance(fn, types.FunctionType) |
| 483 expected_num_args = annotator.ALL_ANNOTATIONS[name] | 304 expected_num_args = annotator.ALL_ANNOTATIONS[name] |
| 484 self.assertEqual(expected_num_args, fn.func_code.co_argcount - 1) | 305 self.assertEqual(expected_num_args, fn.func_code.co_argcount - 1) |
| 485 self.assertSetEqual(required, implemented) | 306 self.assertSetEqual(required, implemented) |
| 486 | 307 |
| 487 | 308 |
| 488 if __name__ == '__main__': | 309 if __name__ == '__main__': |
| 489 unittest.main() | 310 unittest.main() |
| OLD | NEW |