OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
| 3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
| 4 # |
| 5 # This file is part of logilab-common. |
| 6 # |
| 7 # logilab-common is free software: you can redistribute it and/or modify it unde
r |
| 8 # the terms of the GNU Lesser General Public License as published by the Free |
| 9 # Software Foundation, either version 2.1 of the License, or (at your option) an
y |
| 10 # later version. |
| 11 # |
| 12 # logilab-common is distributed in the hope that it will be useful, but WITHOUT |
| 13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 14 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
| 15 # details. |
| 16 # |
| 17 # You should have received a copy of the GNU Lesser General Public License along |
| 18 # with logilab-common. If not, see <http://www.gnu.org/licenses/>. |
| 19 """Run tests. |
| 20 |
| 21 This will find all modules whose name match a given prefix in the test |
| 22 directory, and run them. Various command line options provide |
| 23 additional facilities. |
| 24 |
| 25 Command line options: |
| 26 |
| 27 -v verbose -- run tests in verbose mode with output to stdout |
| 28 -q quiet -- don't print anything except if a test fails |
| 29 -t testdir -- directory where the tests will be found |
| 30 -x exclude -- add a test to exclude |
| 31 -p profile -- profiled execution |
| 32 -d dbc -- enable design-by-contract |
| 33 -m match -- only run test matching the tag pattern which follow |
| 34 |
| 35 If no non-option arguments are present, prefixes used are 'test', |
| 36 'regrtest', 'smoketest' and 'unittest'. |
| 37 |
| 38 """ |
| 39 __docformat__ = "restructuredtext en" |
| 40 # modified copy of some functions from test/regrtest.py from PyXml |
| 41 # disable camel case warning |
| 42 # pylint: disable=C0103 |
| 43 |
| 44 import sys |
| 45 import os, os.path as osp |
| 46 import re |
| 47 import traceback |
| 48 import inspect |
| 49 import difflib |
| 50 import tempfile |
| 51 import math |
| 52 import warnings |
| 53 from shutil import rmtree |
| 54 from operator import itemgetter |
| 55 from ConfigParser import ConfigParser |
| 56 from logilab.common.deprecation import deprecated |
| 57 from itertools import dropwhile |
| 58 |
| 59 import unittest as unittest_legacy |
| 60 if not getattr(unittest_legacy, "__package__", None): |
| 61 try: |
| 62 import unittest2 as unittest |
| 63 from unittest2 import SkipTest |
| 64 except ImportError: |
| 65 sys.exit("You have to install python-unittest2 to use this module") |
| 66 else: |
| 67 import unittest |
| 68 from unittest import SkipTest |
| 69 |
| 70 try: |
| 71 from functools import wraps |
| 72 except ImportError: |
| 73 def wraps(wrapped): |
| 74 def proxy(callable): |
| 75 callable.__name__ = wrapped.__name__ |
| 76 return callable |
| 77 return proxy |
| 78 try: |
| 79 from test import test_support |
| 80 except ImportError: |
| 81 # not always available |
| 82 class TestSupport: |
| 83 def unload(self, test): |
| 84 pass |
| 85 test_support = TestSupport() |
| 86 |
| 87 # pylint: disable=W0622 |
| 88 from logilab.common.compat import any, InheritableSet, callable |
| 89 # pylint: enable=W0622 |
| 90 from logilab.common.debugger import Debugger, colorize_source |
| 91 from logilab.common.decorators import cached, classproperty |
| 92 from logilab.common import textutils |
| 93 |
| 94 |
| 95 __all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn'] |
| 96 |
| 97 DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest', |
| 98 'func', 'validation') |
| 99 |
| 100 |
| 101 if sys.version_info >= (2, 6): |
| 102 # FIXME : this does not work as expected / breaks tests on testlib |
| 103 # however testlib does not work on py3k for many reasons ... |
| 104 from inspect import CO_GENERATOR |
| 105 else: |
| 106 from compiler.consts import CO_GENERATOR |
| 107 |
| 108 if sys.version_info >= (3, 0): |
| 109 def is_generator(function): |
| 110 flags = function.__code__.co_flags |
| 111 return flags & CO_GENERATOR |
| 112 |
| 113 else: |
| 114 def is_generator(function): |
| 115 flags = function.func_code.co_flags |
| 116 return flags & CO_GENERATOR |
| 117 |
| 118 # used by unittest to count the number of relevant levels in the traceback |
| 119 __unittest = 1 |
| 120 |
| 121 |
| 122 def with_tempdir(callable): |
| 123 """A decorator ensuring no temporary file left when the function return |
| 124 Work only for temporary file create with the tempfile module""" |
| 125 @wraps(callable) |
| 126 def proxy(*args, **kargs): |
| 127 |
| 128 old_tmpdir = tempfile.gettempdir() |
| 129 new_tmpdir = tempfile.mkdtemp(prefix="temp-lgc-") |
| 130 tempfile.tempdir = new_tmpdir |
| 131 try: |
| 132 return callable(*args, **kargs) |
| 133 finally: |
| 134 try: |
| 135 rmtree(new_tmpdir, ignore_errors=True) |
| 136 finally: |
| 137 tempfile.tempdir = old_tmpdir |
| 138 return proxy |
| 139 |
| 140 def in_tempdir(callable): |
| 141 """A decorator moving the enclosed function inside the tempfile.tempfdir |
| 142 """ |
| 143 @wraps(callable) |
| 144 def proxy(*args, **kargs): |
| 145 |
| 146 old_cwd = os.getcwd() |
| 147 os.chdir(tempfile.tempdir) |
| 148 try: |
| 149 return callable(*args, **kargs) |
| 150 finally: |
| 151 os.chdir(old_cwd) |
| 152 return proxy |
| 153 |
| 154 def within_tempdir(callable): |
| 155 """A decorator run the enclosed function inside a tmpdir removed after execu
tion |
| 156 """ |
| 157 proxy = with_tempdir(in_tempdir(callable)) |
| 158 proxy.__name__ = callable.__name__ |
| 159 return proxy |
| 160 |
| 161 def find_tests(testdir, |
| 162 prefixes=DEFAULT_PREFIXES, suffix=".py", |
| 163 excludes=(), |
| 164 remove_suffix=True): |
| 165 """ |
| 166 Return a list of all applicable test modules. |
| 167 """ |
| 168 tests = [] |
| 169 for name in os.listdir(testdir): |
| 170 if not suffix or name.endswith(suffix): |
| 171 for prefix in prefixes: |
| 172 if name.startswith(prefix): |
| 173 if remove_suffix and name.endswith(suffix): |
| 174 name = name[:-len(suffix)] |
| 175 if name not in excludes: |
| 176 tests.append(name) |
| 177 tests.sort() |
| 178 return tests |
| 179 |
| 180 |
| 181 ## PostMortem Debug facilities ##### |
| 182 def start_interactive_mode(result): |
| 183 """starts an interactive shell so that the user can inspect errors |
| 184 """ |
| 185 debuggers = result.debuggers |
| 186 descrs = result.error_descrs + result.fail_descrs |
| 187 if len(debuggers) == 1: |
| 188 # don't ask for test name if there's only one failure |
| 189 debuggers[0].start() |
| 190 else: |
| 191 while True: |
| 192 testindex = 0 |
| 193 print "Choose a test to debug:" |
| 194 # order debuggers in the same way than errors were printed |
| 195 print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr) |
| 196 in enumerate(descrs)]) |
| 197 print "Type 'exit' (or ^D) to quit" |
| 198 print |
| 199 try: |
| 200 todebug = raw_input('Enter a test name: ') |
| 201 if todebug.strip().lower() == 'exit': |
| 202 print |
| 203 break |
| 204 else: |
| 205 try: |
| 206 testindex = int(todebug) |
| 207 debugger = debuggers[descrs[testindex][0]] |
| 208 except (ValueError, IndexError): |
| 209 print "ERROR: invalid test number %r" % (todebug, ) |
| 210 else: |
| 211 debugger.start() |
| 212 except (EOFError, KeyboardInterrupt): |
| 213 print |
| 214 break |
| 215 |
| 216 |
| 217 # test utils ################################################################## |
| 218 |
| 219 class SkipAwareTestResult(unittest._TextTestResult): |
| 220 |
| 221 def __init__(self, stream, descriptions, verbosity, |
| 222 exitfirst=False, pdbmode=False, cvg=None, colorize=False): |
| 223 super(SkipAwareTestResult, self).__init__(stream, |
| 224 descriptions, verbosity) |
| 225 self.skipped = [] |
| 226 self.debuggers = [] |
| 227 self.fail_descrs = [] |
| 228 self.error_descrs = [] |
| 229 self.exitfirst = exitfirst |
| 230 self.pdbmode = pdbmode |
| 231 self.cvg = cvg |
| 232 self.colorize = colorize |
| 233 self.pdbclass = Debugger |
| 234 self.verbose = verbosity > 1 |
| 235 |
| 236 def descrs_for(self, flavour): |
| 237 return getattr(self, '%s_descrs' % flavour.lower()) |
| 238 |
| 239 def _create_pdb(self, test_descr, flavour): |
| 240 self.descrs_for(flavour).append( (len(self.debuggers), test_descr) ) |
| 241 if self.pdbmode: |
| 242 self.debuggers.append(self.pdbclass(sys.exc_info()[2])) |
| 243 |
| 244 def _iter_valid_frames(self, frames): |
| 245 """only consider non-testlib frames when formatting traceback""" |
| 246 lgc_testlib = osp.abspath(__file__) |
| 247 std_testlib = osp.abspath(unittest.__file__) |
| 248 invalid = lambda fi: osp.abspath(fi[1]) in (lgc_testlib, std_testlib) |
| 249 for frameinfo in dropwhile(invalid, frames): |
| 250 yield frameinfo |
| 251 |
| 252 def _exc_info_to_string(self, err, test): |
| 253 """Converts a sys.exc_info()-style tuple of values into a string. |
| 254 |
| 255 This method is overridden here because we want to colorize |
| 256 lines if --color is passed, and display local variables if |
| 257 --verbose is passed |
| 258 """ |
| 259 exctype, exc, tb = err |
| 260 output = ['Traceback (most recent call last)'] |
| 261 frames = inspect.getinnerframes(tb) |
| 262 colorize = self.colorize |
| 263 frames = enumerate(self._iter_valid_frames(frames)) |
| 264 for index, (frame, filename, lineno, funcname, ctx, ctxindex) in frames: |
| 265 filename = osp.abspath(filename) |
| 266 if ctx is None: # pyc files or C extensions for instance |
| 267 source = '<no source available>' |
| 268 else: |
| 269 source = ''.join(ctx) |
| 270 if colorize: |
| 271 filename = textutils.colorize_ansi(filename, 'magenta') |
| 272 source = colorize_source(source) |
| 273 output.append(' File "%s", line %s, in %s' % (filename, lineno, fun
cname)) |
| 274 output.append(' %s' % source.strip()) |
| 275 if self.verbose: |
| 276 output.append('%r == %r' % (dir(frame), test.__module__)) |
| 277 output.append('') |
| 278 output.append(' ' + ' local variables '.center(66, '-')) |
| 279 for varname, value in sorted(frame.f_locals.items()): |
| 280 output.append(' %s: %r' % (varname, value)) |
| 281 if varname == 'self': # special handy processing for self |
| 282 for varname, value in sorted(vars(value).items()): |
| 283 output.append(' self.%s: %r' % (varname, value)
) |
| 284 output.append(' ' + '-' * 66) |
| 285 output.append('') |
| 286 output.append(''.join(traceback.format_exception_only(exctype, exc))) |
| 287 return '\n'.join(output) |
| 288 |
| 289 def addError(self, test, err): |
| 290 """err -> (exc_type, exc, tcbk)""" |
| 291 exc_type, exc, _ = err |
| 292 if isinstance(exc, SkipTest): |
| 293 assert exc_type == SkipTest |
| 294 self.addSkip(test, exc) |
| 295 else: |
| 296 if self.exitfirst: |
| 297 self.shouldStop = True |
| 298 descr = self.getDescription(test) |
| 299 super(SkipAwareTestResult, self).addError(test, err) |
| 300 self._create_pdb(descr, 'error') |
| 301 |
| 302 def addFailure(self, test, err): |
| 303 if self.exitfirst: |
| 304 self.shouldStop = True |
| 305 descr = self.getDescription(test) |
| 306 super(SkipAwareTestResult, self).addFailure(test, err) |
| 307 self._create_pdb(descr, 'fail') |
| 308 |
| 309 def addSkip(self, test, reason): |
| 310 self.skipped.append((test, reason)) |
| 311 if self.showAll: |
| 312 self.stream.writeln("SKIPPED") |
| 313 elif self.dots: |
| 314 self.stream.write('S') |
| 315 |
| 316 def printErrors(self): |
| 317 super(SkipAwareTestResult, self).printErrors() |
| 318 self.printSkippedList() |
| 319 |
| 320 def printSkippedList(self): |
| 321 # format (test, err) compatible with unittest2 |
| 322 for test, err in self.skipped: |
| 323 descr = self.getDescription(test) |
| 324 self.stream.writeln(self.separator1) |
| 325 self.stream.writeln("%s: %s" % ('SKIPPED', descr)) |
| 326 self.stream.writeln("\t%s" % err) |
| 327 |
| 328 def printErrorList(self, flavour, errors): |
| 329 for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors): |
| 330 self.stream.writeln(self.separator1) |
| 331 self.stream.writeln("%s: %s" % (flavour, descr)) |
| 332 self.stream.writeln(self.separator2) |
| 333 self.stream.writeln(err) |
| 334 self.stream.writeln('no stdout'.center(len(self.separator2))) |
| 335 self.stream.writeln('no stderr'.center(len(self.separator2))) |
| 336 |
| 337 # Add deprecation warnings about new api used by module level fixtures in unitte
st2 |
| 338 # http://www.voidspace.org.uk/python/articles/unittest2.shtml#setupmodule-and-te
ardownmodule |
| 339 class _DebugResult(object): # simplify import statement among unittest flavors.. |
| 340 "Used by the TestSuite to hold previous class when running in debug." |
| 341 _previousTestClass = None |
| 342 _moduleSetUpFailed = False |
| 343 shouldStop = False |
| 344 |
| 345 from logilab.common.decorators import monkeypatch |
| 346 @monkeypatch(unittest.TestSuite) |
| 347 def _handleModuleTearDown(self, result): |
| 348 previousModule = self._get_previous_module(result) |
| 349 if previousModule is None: |
| 350 return |
| 351 if result._moduleSetUpFailed: |
| 352 return |
| 353 try: |
| 354 module = sys.modules[previousModule] |
| 355 except KeyError: |
| 356 return |
| 357 # add testlib specific deprecation warning and switch to new api |
| 358 if hasattr(module, 'teardown_module'): |
| 359 warnings.warn('Please rename teardown_module() to tearDownModule() inste
ad.', |
| 360 DeprecationWarning) |
| 361 setattr(module, 'tearDownModule', module.teardown_module) |
| 362 # end of monkey-patching |
| 363 tearDownModule = getattr(module, 'tearDownModule', None) |
| 364 if tearDownModule is not None: |
| 365 try: |
| 366 tearDownModule() |
| 367 except Exception, e: |
| 368 if isinstance(result, _DebugResult): |
| 369 raise |
| 370 errorName = 'tearDownModule (%s)' % previousModule |
| 371 self._addClassOrModuleLevelException(result, e, errorName) |
| 372 |
| 373 @monkeypatch(unittest.TestSuite) |
| 374 def _handleModuleFixture(self, test, result): |
| 375 previousModule = self._get_previous_module(result) |
| 376 currentModule = test.__class__.__module__ |
| 377 if currentModule == previousModule: |
| 378 return |
| 379 self._handleModuleTearDown(result) |
| 380 result._moduleSetUpFailed = False |
| 381 try: |
| 382 module = sys.modules[currentModule] |
| 383 except KeyError: |
| 384 return |
| 385 # add testlib specific deprecation warning and switch to new api |
| 386 if hasattr(module, 'setup_module'): |
| 387 warnings.warn('Please rename setup_module() to setUpModule() instead.', |
| 388 DeprecationWarning) |
| 389 setattr(module, 'setUpModule', module.setup_module) |
| 390 # end of monkey-patching |
| 391 setUpModule = getattr(module, 'setUpModule', None) |
| 392 if setUpModule is not None: |
| 393 try: |
| 394 setUpModule() |
| 395 except Exception, e: |
| 396 if isinstance(result, _DebugResult): |
| 397 raise |
| 398 result._moduleSetUpFailed = True |
| 399 errorName = 'setUpModule (%s)' % currentModule |
| 400 self._addClassOrModuleLevelException(result, e, errorName) |
| 401 |
| 402 # backward compatibility: TestSuite might be imported from lgc.testlib |
| 403 TestSuite = unittest.TestSuite |
| 404 |
| 405 class keywords(dict): |
| 406 """Keyword args (**kwargs) support for generative tests.""" |
| 407 |
| 408 class starargs(tuple): |
| 409 """Variable arguments (*args) for generative tests.""" |
| 410 def __new__(cls, *args): |
| 411 return tuple.__new__(cls, args) |
| 412 |
| 413 unittest_main = unittest.main |
| 414 |
| 415 |
| 416 class InnerTestSkipped(SkipTest): |
| 417 """raised when a test is skipped""" |
| 418 pass |
| 419 |
| 420 def parse_generative_args(params): |
| 421 args = [] |
| 422 varargs = () |
| 423 kwargs = {} |
| 424 flags = 0 # 2 <=> starargs, 4 <=> kwargs |
| 425 for param in params: |
| 426 if isinstance(param, starargs): |
| 427 varargs = param |
| 428 if flags: |
| 429 raise TypeError('found starargs after keywords !') |
| 430 flags |= 2 |
| 431 args += list(varargs) |
| 432 elif isinstance(param, keywords): |
| 433 kwargs = param |
| 434 if flags & 4: |
| 435 raise TypeError('got multiple keywords parameters') |
| 436 flags |= 4 |
| 437 elif flags & 2 or flags & 4: |
| 438 raise TypeError('found parameters after kwargs or args') |
| 439 else: |
| 440 args.append(param) |
| 441 |
| 442 return args, kwargs |
| 443 |
| 444 |
| 445 class InnerTest(tuple): |
| 446 def __new__(cls, name, *data): |
| 447 instance = tuple.__new__(cls, data) |
| 448 instance.name = name |
| 449 return instance |
| 450 |
| 451 class Tags(InheritableSet): # 2.4 compat |
| 452 """A set of tag able validate an expression""" |
| 453 |
| 454 def __init__(self, *tags, **kwargs): |
| 455 self.inherit = kwargs.pop('inherit', True) |
| 456 if kwargs: |
| 457 raise TypeError("%s are an invalid keyword argument for this function
" % kwargs.keys()) |
| 458 |
| 459 if len(tags) == 1 and not isinstance(tags[0], basestring): |
| 460 tags = tags[0] |
| 461 super(Tags, self).__init__(tags, **kwargs) |
| 462 |
| 463 def __getitem__(self, key): |
| 464 return key in self |
| 465 |
| 466 def match(self, exp): |
| 467 return eval(exp, {}, self) |
| 468 |
| 469 |
| 470 # duplicate definition from unittest2 of the _deprecate decorator |
| 471 def _deprecate(original_func): |
| 472 def deprecated_func(*args, **kwargs): |
| 473 warnings.warn( |
| 474 ('Please use %s instead.' % original_func.__name__), |
| 475 DeprecationWarning, 2) |
| 476 return original_func(*args, **kwargs) |
| 477 return deprecated_func |
| 478 |
| 479 class TestCase(unittest.TestCase): |
| 480 """A unittest.TestCase extension with some additional methods.""" |
| 481 maxDiff = None |
| 482 pdbclass = Debugger |
| 483 tags = Tags() |
| 484 |
| 485 def __init__(self, methodName='runTest'): |
| 486 super(TestCase, self).__init__(methodName) |
| 487 # internal API changed in python2.4 and needed by DocTestCase |
| 488 if sys.version_info >= (2, 4): |
| 489 self.__exc_info = sys.exc_info |
| 490 self.__testMethodName = self._testMethodName |
| 491 else: |
| 492 # let's give easier access to _testMethodName to every subclasses |
| 493 if hasattr(self, "__testMethodName"): |
| 494 self._testMethodName = self.__testMethodName |
| 495 self._current_test_descr = None |
| 496 self._options_ = None |
| 497 |
| 498 @classproperty |
| 499 @cached |
| 500 def datadir(cls): # pylint: disable=E0213 |
| 501 """helper attribute holding the standard test's data directory |
| 502 |
| 503 NOTE: this is a logilab's standard |
| 504 """ |
| 505 mod = __import__(cls.__module__) |
| 506 return osp.join(osp.dirname(osp.abspath(mod.__file__)), 'data') |
| 507 # cache it (use a class method to cache on class since TestCase is |
| 508 # instantiated for each test run) |
| 509 |
| 510 @classmethod |
| 511 def datapath(cls, *fname): |
| 512 """joins the object's datadir and `fname`""" |
| 513 return osp.join(cls.datadir, *fname) |
| 514 |
| 515 def set_description(self, descr): |
| 516 """sets the current test's description. |
| 517 This can be useful for generative tests because it allows to specify |
| 518 a description per yield |
| 519 """ |
| 520 self._current_test_descr = descr |
| 521 |
| 522 # override default's unittest.py feature |
| 523 def shortDescription(self): |
| 524 """override default unittest shortDescription to handle correctly |
| 525 generative tests |
| 526 """ |
| 527 if self._current_test_descr is not None: |
| 528 return self._current_test_descr |
| 529 return super(TestCase, self).shortDescription() |
| 530 |
| 531 def quiet_run(self, result, func, *args, **kwargs): |
| 532 try: |
| 533 func(*args, **kwargs) |
| 534 except (KeyboardInterrupt, SystemExit): |
| 535 raise |
| 536 except: |
| 537 result.addError(self, self.__exc_info()) |
| 538 return False |
| 539 return True |
| 540 |
| 541 def _get_test_method(self): |
| 542 """return the test method""" |
| 543 return getattr(self, self._testMethodName) |
| 544 |
| 545 def optval(self, option, default=None): |
| 546 """return the option value or default if the option is not define""" |
| 547 return getattr(self._options_, option, default) |
| 548 |
| 549 def __call__(self, result=None, runcondition=None, options=None): |
| 550 """rewrite TestCase.__call__ to support generative tests |
| 551 This is mostly a copy/paste from unittest.py (i.e same |
| 552 variable names, same logic, except for the generative tests part) |
| 553 """ |
| 554 from logilab.common.pytest import FILE_RESTART |
| 555 if result is None: |
| 556 result = self.defaultTestResult() |
| 557 result.pdbclass = self.pdbclass |
| 558 self._options_ = options |
| 559 # if result.cvg: |
| 560 # result.cvg.start() |
| 561 testMethod = self._get_test_method() |
| 562 if runcondition and not runcondition(testMethod): |
| 563 return # test is skipped |
| 564 result.startTest(self) |
| 565 try: |
| 566 if not self.quiet_run(result, self.setUp): |
| 567 return |
| 568 generative = is_generator(testMethod.im_func) |
| 569 # generative tests |
| 570 if generative: |
| 571 self._proceed_generative(result, testMethod, |
| 572 runcondition) |
| 573 else: |
| 574 status = self._proceed(result, testMethod) |
| 575 success = (status == 0) |
| 576 if not self.quiet_run(result, self.tearDown): |
| 577 return |
| 578 if not generative and success: |
| 579 if hasattr(options, "exitfirst") and options.exitfirst: |
| 580 # add this test to restart file |
| 581 try: |
| 582 restartfile = open(FILE_RESTART, 'a') |
| 583 try: |
| 584 descr = '.'.join((self.__class__.__module__, |
| 585 self.__class__.__name__, |
| 586 self._testMethodName)) |
| 587 restartfile.write(descr+os.linesep) |
| 588 finally: |
| 589 restartfile.close() |
| 590 except Exception, ex: |
| 591 print >> sys.__stderr__, "Error while saving \ |
| 592 succeeded test into", osp.join(os.getcwd(), FILE_RESTART) |
| 593 raise ex |
| 594 result.addSuccess(self) |
| 595 finally: |
| 596 # if result.cvg: |
| 597 # result.cvg.stop() |
| 598 result.stopTest(self) |
| 599 |
| 600 def _proceed_generative(self, result, testfunc, runcondition=None): |
| 601 # cancel startTest()'s increment |
| 602 result.testsRun -= 1 |
| 603 success = True |
| 604 try: |
| 605 for params in testfunc(): |
| 606 if runcondition and not runcondition(testfunc, |
| 607 skipgenerator=False): |
| 608 if not (isinstance(params, InnerTest) |
| 609 and runcondition(params)): |
| 610 continue |
| 611 if not isinstance(params, (tuple, list)): |
| 612 params = (params, ) |
| 613 func = params[0] |
| 614 args, kwargs = parse_generative_args(params[1:]) |
| 615 # increment test counter manually |
| 616 result.testsRun += 1 |
| 617 status = self._proceed(result, func, args, kwargs) |
| 618 if status == 0: |
| 619 result.addSuccess(self) |
| 620 success = True |
| 621 else: |
| 622 success = False |
| 623 # XXX Don't stop anymore if an error occured |
| 624 #if status == 2: |
| 625 # result.shouldStop = True |
| 626 if result.shouldStop: # either on error or on exitfirst + error |
| 627 break |
| 628 except: |
| 629 # if an error occurs between two yield |
| 630 result.addError(self, self.__exc_info()) |
| 631 success = False |
| 632 return success |
| 633 |
| 634 def _proceed(self, result, testfunc, args=(), kwargs=None): |
| 635 """proceed the actual test |
| 636 returns 0 on success, 1 on failure, 2 on error |
| 637 |
| 638 Note: addSuccess can't be called here because we have to wait |
| 639 for tearDown to be successfully executed to declare the test as |
| 640 successful |
| 641 """ |
| 642 kwargs = kwargs or {} |
| 643 try: |
| 644 testfunc(*args, **kwargs) |
| 645 except self.failureException: |
| 646 result.addFailure(self, self.__exc_info()) |
| 647 return 1 |
| 648 except KeyboardInterrupt: |
| 649 raise |
| 650 except InnerTestSkipped, e: |
| 651 result.addSkip(self, e) |
| 652 return 1 |
| 653 except SkipTest, e: |
| 654 result.addSkip(self, e) |
| 655 return 0 |
| 656 except: |
| 657 result.addError(self, self.__exc_info()) |
| 658 return 2 |
| 659 return 0 |
| 660 |
| 661 def defaultTestResult(self): |
| 662 """return a new instance of the defaultTestResult""" |
| 663 return SkipAwareTestResult() |
| 664 |
| 665 skip = _deprecate(unittest.TestCase.skipTest) |
| 666 assertEquals = _deprecate(unittest.TestCase.assertEqual) |
| 667 assertNotEquals = _deprecate(unittest.TestCase.assertNotEqual) |
| 668 assertAlmostEquals = _deprecate(unittest.TestCase.assertAlmostEqual) |
| 669 assertNotAlmostEquals = _deprecate(unittest.TestCase.assertNotAlmostEqual) |
| 670 |
| 671 def innerSkip(self, msg=None): |
| 672 """mark a generative test as skipped for the <msg> reason""" |
| 673 msg = msg or 'test was skipped' |
| 674 raise InnerTestSkipped(msg) |
| 675 |
| 676 @deprecated('Please use assertDictEqual instead.') |
| 677 def assertDictEquals(self, dict1, dict2, msg=None, context=None): |
| 678 """compares two dicts |
| 679 |
| 680 If the two dict differ, the first difference is shown in the error |
| 681 message |
| 682 :param dict1: a Python Dictionary |
| 683 :param dict2: a Python Dictionary |
| 684 :param msg: custom message (String) in case of failure |
| 685 """ |
| 686 dict1 = dict(dict1) |
| 687 msgs = [] |
| 688 for key, value in dict2.items(): |
| 689 try: |
| 690 if dict1[key] != value: |
| 691 msgs.append('%r != %r for key %r' % (dict1[key], value, |
| 692 key)) |
| 693 del dict1[key] |
| 694 except KeyError: |
| 695 msgs.append('missing %r key' % key) |
| 696 if dict1: |
| 697 msgs.append('dict2 is lacking %r' % dict1) |
| 698 if msg: |
| 699 self.failureException(msg) |
| 700 elif msgs: |
| 701 if context is not None: |
| 702 base = '%s\n' % context |
| 703 else: |
| 704 base = '' |
| 705 self.fail(base + '\n'.join(msgs)) |
| 706 |
| 707 @deprecated('Please use assertItemsEqual instead.') |
| 708 def assertUnorderedIterableEquals(self, got, expected, msg=None): |
| 709 """compares two iterable and shows difference between both |
| 710 |
| 711 :param got: the unordered Iterable that we found |
| 712 :param expected: the expected unordered Iterable |
| 713 :param msg: custom message (String) in case of failure |
| 714 """ |
| 715 got, expected = list(got), list(expected) |
| 716 self.assertSetEqual(set(got), set(expected), msg) |
| 717 if len(got) != len(expected): |
| 718 if msg is None: |
| 719 msg = ['Iterable have the same elements but not the same number'
, |
| 720 '\t<element>\t<expected>i\t<got>'] |
| 721 got_count = {} |
| 722 expected_count = {} |
| 723 for element in got: |
| 724 got_count[element] = got_count.get(element, 0) + 1 |
| 725 for element in expected: |
| 726 expected_count[element] = expected_count.get(element, 0) + 1 |
| 727 # we know that got_count.key() == expected_count.key() |
| 728 # because of assertSetEqual |
| 729 for element, count in got_count.iteritems(): |
| 730 other_count = expected_count[element] |
| 731 if other_count != count: |
| 732 msg.append('\t%s\t%s\t%s' % (element, other_count, count
)) |
| 733 |
| 734 self.fail(msg) |
| 735 |
| 736 assertUnorderedIterableEqual = assertUnorderedIterableEquals |
| 737 assertUnordIterEquals = assertUnordIterEqual = assertUnorderedIterableEqual |
| 738 |
| 739 @deprecated('Please use assertSetEqual instead.') |
| 740 def assertSetEquals(self,got,expected, msg=None): |
| 741 """compares two sets and shows difference between both |
| 742 |
| 743 Don't use it for iterables other than sets. |
| 744 |
| 745 :param got: the Set that we found |
| 746 :param expected: the second Set to be compared to the first one |
| 747 :param msg: custom message (String) in case of failure |
| 748 """ |
| 749 |
| 750 if not(isinstance(got, set) and isinstance(expected, set)): |
| 751 warnings.warn("the assertSetEquals function if now intended for set
only."\ |
| 752 "use assertUnorderedIterableEquals instead.", |
| 753 DeprecationWarning, 2) |
| 754 return self.assertUnorderedIterableEquals(got, expected, msg) |
| 755 |
| 756 items={} |
| 757 items['missing'] = expected - got |
| 758 items['unexpected'] = got - expected |
| 759 if any(items.itervalues()): |
| 760 if msg is None: |
| 761 msg = '\n'.join('%s:\n\t%s' % (key, "\n\t".join(str(value) for v
alue in values)) |
| 762 for key, values in items.iteritems() if values) |
| 763 self.fail(msg) |
| 764 |
| 765 @deprecated('Please use assertListEqual instead.') |
| 766 def assertListEquals(self, list_1, list_2, msg=None): |
| 767 """compares two lists |
| 768 |
| 769 If the two list differ, the first difference is shown in the error |
| 770 message |
| 771 |
| 772 :param list_1: a Python List |
| 773 :param list_2: a second Python List |
| 774 :param msg: custom message (String) in case of failure |
| 775 """ |
| 776 _l1 = list_1[:] |
| 777 for i, value in enumerate(list_2): |
| 778 try: |
| 779 if _l1[0] != value: |
| 780 from pprint import pprint |
| 781 pprint(list_1) |
| 782 pprint(list_2) |
| 783 self.fail('%r != %r for index %d' % (_l1[0], value, i)) |
| 784 del _l1[0] |
| 785 except IndexError: |
| 786 if msg is None: |
| 787 msg = 'list_1 has only %d elements, not %s '\ |
| 788 '(at least %r missing)'% (i, len(list_2), value) |
| 789 self.fail(msg) |
| 790 if _l1: |
| 791 if msg is None: |
| 792 msg = 'list_2 is lacking %r' % _l1 |
| 793 self.fail(msg) |
| 794 |
| 795 @deprecated('Non-standard. Please use assertMultiLineEqual instead.') |
| 796 def assertLinesEquals(self, string1, string2, msg=None, striplines=False): |
| 797 """compare two strings and assert that the text lines of the strings |
| 798 are equal. |
| 799 |
| 800 :param string1: a String |
| 801 :param string2: a String |
| 802 :param msg: custom message (String) in case of failure |
| 803 :param striplines: Boolean to trigger line stripping before comparing |
| 804 """ |
| 805 lines1 = string1.splitlines() |
| 806 lines2 = string2.splitlines() |
| 807 if striplines: |
| 808 lines1 = [l.strip() for l in lines1] |
| 809 lines2 = [l.strip() for l in lines2] |
| 810 self.assertListEqual(lines1, lines2, msg) |
| 811 assertLineEqual = assertLinesEquals |
| 812 |
| 813 @deprecated('Non-standard: please copy test method to your TestCase class') |
| 814 def assertXMLWellFormed(self, stream, msg=None, context=2): |
| 815 """asserts the XML stream is well-formed (no DTD conformance check) |
| 816 |
| 817 :param context: number of context lines in standard message |
| 818 (show all data if negative). |
| 819 Only available with element tree |
| 820 """ |
| 821 try: |
| 822 from xml.etree.ElementTree import parse |
| 823 self._assertETXMLWellFormed(stream, parse, msg) |
| 824 except ImportError: |
| 825 from xml.sax import make_parser, SAXParseException |
| 826 parser = make_parser() |
| 827 try: |
| 828 parser.parse(stream) |
| 829 except SAXParseException, ex: |
| 830 if msg is None: |
| 831 stream.seek(0) |
| 832 for _ in xrange(ex.getLineNumber()): |
| 833 line = stream.readline() |
| 834 pointer = ('' * (ex.getLineNumber() - 1)) + '^' |
| 835 msg = 'XML stream not well formed: %s\n%s%s' % (ex, line, po
inter) |
| 836 self.fail(msg) |
| 837 |
| 838 @deprecated('Non-standard: please copy test method to your TestCase class') |
| 839 def assertXMLStringWellFormed(self, xml_string, msg=None, context=2): |
| 840 """asserts the XML string is well-formed (no DTD conformance check) |
| 841 |
| 842 :param context: number of context lines in standard message |
| 843 (show all data if negative). |
| 844 Only available with element tree |
| 845 """ |
| 846 try: |
| 847 from xml.etree.ElementTree import fromstring |
| 848 except ImportError: |
| 849 from elementtree.ElementTree import fromstring |
| 850 self._assertETXMLWellFormed(xml_string, fromstring, msg) |
| 851 |
| 852 def _assertETXMLWellFormed(self, data, parse, msg=None, context=2): |
| 853 """internal function used by /assertXML(String)?WellFormed/ functions |
| 854 |
| 855 :param data: xml_data |
| 856 :param parse: appropriate parser function for this data |
| 857 :param msg: error message |
| 858 :param context: number of context lines in standard message |
| 859 (show all data if negative). |
| 860 Only available with element tree |
| 861 """ |
| 862 from xml.parsers.expat import ExpatError |
| 863 try: |
| 864 from xml.etree.ElementTree import ParseError |
| 865 except ImportError: |
| 866 # compatibility for <python2.7 |
| 867 ParseError = ExpatError |
| 868 try: |
| 869 parse(data) |
| 870 except (ExpatError, ParseError), ex: |
| 871 if msg is None: |
| 872 if hasattr(data, 'readlines'): #file like object |
| 873 data.seek(0) |
| 874 lines = data.readlines() |
| 875 else: |
| 876 lines = data.splitlines(True) |
| 877 nb_lines = len(lines) |
| 878 context_lines = [] |
| 879 |
| 880 # catch when ParseError doesn't set valid lineno |
| 881 if ex.lineno is not None: |
| 882 if context < 0: |
| 883 start = 1 |
| 884 end = nb_lines |
| 885 else: |
| 886 start = max(ex.lineno-context, 1) |
| 887 end = min(ex.lineno+context, nb_lines) |
| 888 line_number_length = len('%i' % end) |
| 889 line_pattern = " %%%ii: %%s" % line_number_length |
| 890 |
| 891 for line_no in xrange(start, ex.lineno): |
| 892 context_lines.append(line_pattern % (line_no, lines[line
_no-1])) |
| 893 context_lines.append(line_pattern % (ex.lineno, lines[ex.lin
eno-1])) |
| 894 context_lines.append('%s^\n' % (' ' * (1 + line_number_lengt
h + 2 +ex.offset))) |
| 895 for line_no in xrange(ex.lineno+1, end+1): |
| 896 context_lines.append(line_pattern % (line_no, lines[line
_no-1])) |
| 897 |
| 898 rich_context = ''.join(context_lines) |
| 899 msg = 'XML stream not well formed: %s\n%s' % (ex, rich_context) |
| 900 self.fail(msg) |
| 901 |
| 902 @deprecated('Non-standard: please copy test method to your TestCase class') |
| 903 def assertXMLEqualsTuple(self, element, tup): |
| 904 """compare an ElementTree Element to a tuple formatted as follow: |
| 905 (tagname, [attrib[, children[, text[, tail]]]])""" |
| 906 # check tag |
| 907 self.assertTextEquals(element.tag, tup[0]) |
| 908 # check attrib |
| 909 if len(element.attrib) or len(tup)>1: |
| 910 if len(tup)<=1: |
| 911 self.fail( "tuple %s has no attributes (%s expected)"%(tup, |
| 912 dict(element.attrib))) |
| 913 self.assertDictEqual(element.attrib, tup[1]) |
| 914 # check children |
| 915 if len(element) or len(tup)>2: |
| 916 if len(tup)<=2: |
| 917 self.fail( "tuple %s has no children (%i expected)"%(tup, |
| 918 len(element))) |
| 919 if len(element) != len(tup[2]): |
| 920 self.fail( "tuple %s has %i children%s (%i expected)"%(tup, |
| 921 len(tup[2]), |
| 922 ('', 's')[len(tup[2])>1], len(element))) |
| 923 for index in xrange(len(tup[2])): |
| 924 self.assertXMLEqualsTuple(element[index], tup[2][index]) |
| 925 #check text |
| 926 if element.text or len(tup)>3: |
| 927 if len(tup)<=3: |
| 928 self.fail( "tuple %s has no text value (%r expected)"%(tup, |
| 929 element.text)) |
| 930 self.assertTextEquals(element.text, tup[3]) |
| 931 #check tail |
| 932 if element.tail or len(tup)>4: |
| 933 if len(tup)<=4: |
| 934 self.fail( "tuple %s has no tail value (%r expected)"%(tup, |
| 935 element.tail)) |
| 936 self.assertTextEquals(element.tail, tup[4]) |
| 937 |
| 938 def _difftext(self, lines1, lines2, junk=None, msg_prefix='Texts differ'): |
| 939 junk = junk or (' ', '\t') |
| 940 # result is a generator |
| 941 result = difflib.ndiff(lines1, lines2, charjunk=lambda x: x in junk) |
| 942 read = [] |
| 943 for line in result: |
| 944 read.append(line) |
| 945 # lines that don't start with a ' ' are diff ones |
| 946 if not line.startswith(' '): |
| 947 self.fail('\n'.join(['%s\n'%msg_prefix]+read + list(result))) |
| 948 |
| 949 @deprecated('Non-standard. Please use assertMultiLineEqual instead.') |
| 950 def assertTextEquals(self, text1, text2, junk=None, |
| 951 msg_prefix='Text differ', striplines=False): |
| 952 """compare two multiline strings (using difflib and splitlines()) |
| 953 |
| 954 :param text1: a Python BaseString |
| 955 :param text2: a second Python Basestring |
| 956 :param junk: List of Caracters |
| 957 :param msg_prefix: String (message prefix) |
| 958 :param striplines: Boolean to trigger line stripping before comparing |
| 959 """ |
| 960 msg = [] |
| 961 if not isinstance(text1, basestring): |
| 962 msg.append('text1 is not a string (%s)'%(type(text1))) |
| 963 if not isinstance(text2, basestring): |
| 964 msg.append('text2 is not a string (%s)'%(type(text2))) |
| 965 if msg: |
| 966 self.fail('\n'.join(msg)) |
| 967 lines1 = text1.strip().splitlines(True) |
| 968 lines2 = text2.strip().splitlines(True) |
| 969 if striplines: |
| 970 lines1 = [line.strip() for line in lines1] |
| 971 lines2 = [line.strip() for line in lines2] |
| 972 self._difftext(lines1, lines2, junk, msg_prefix) |
| 973 assertTextEqual = assertTextEquals |
| 974 |
| 975 @deprecated('Non-standard: please copy test method to your TestCase class') |
| 976 def assertStreamEquals(self, stream1, stream2, junk=None, |
| 977 msg_prefix='Stream differ'): |
| 978 """compare two streams (using difflib and readlines())""" |
| 979 # if stream2 is stream2, readlines() on stream1 will also read lines |
| 980 # in stream2, so they'll appear different, although they're not |
| 981 if stream1 is stream2: |
| 982 return |
| 983 # make sure we compare from the beginning of the stream |
| 984 stream1.seek(0) |
| 985 stream2.seek(0) |
| 986 # compare |
| 987 self._difftext(stream1.readlines(), stream2.readlines(), junk, |
| 988 msg_prefix) |
| 989 |
| 990 assertStreamEqual = assertStreamEquals |
| 991 |
| 992 @deprecated('Non-standard: please copy test method to your TestCase class') |
| 993 def assertFileEquals(self, fname1, fname2, junk=(' ', '\t')): |
| 994 """compares two files using difflib""" |
| 995 self.assertStreamEqual(open(fname1), open(fname2), junk, |
| 996 msg_prefix='Files differs\n-:%s\n+:%s\n'%(fname1, fname2)) |
| 997 |
| 998 assertFileEqual = assertFileEquals |
| 999 |
| 1000 @deprecated('Non-standard: please copy test method to your TestCase class') |
| 1001 def assertDirEquals(self, path_a, path_b): |
| 1002 """compares two files using difflib""" |
| 1003 assert osp.exists(path_a), "%s doesn't exists" % path_a |
| 1004 assert osp.exists(path_b), "%s doesn't exists" % path_b |
| 1005 |
| 1006 all_a = [ (ipath[len(path_a):].lstrip('/'), idirs, ifiles) |
| 1007 for ipath, idirs, ifiles in os.walk(path_a)] |
| 1008 all_a.sort(key=itemgetter(0)) |
| 1009 |
| 1010 all_b = [ (ipath[len(path_b):].lstrip('/'), idirs, ifiles) |
| 1011 for ipath, idirs, ifiles in os.walk(path_b)] |
| 1012 all_b.sort(key=itemgetter(0)) |
| 1013 |
| 1014 iter_a, iter_b = iter(all_a), iter(all_b) |
| 1015 partial_iter = True |
| 1016 ipath_a, idirs_a, ifiles_a = data_a = None, None, None |
| 1017 while True: |
| 1018 try: |
| 1019 ipath_a, idirs_a, ifiles_a = datas_a = iter_a.next() |
| 1020 partial_iter = False |
| 1021 ipath_b, idirs_b, ifiles_b = datas_b = iter_b.next() |
| 1022 partial_iter = True |
| 1023 |
| 1024 |
| 1025 self.assert_(ipath_a == ipath_b, |
| 1026 "unexpected %s in %s while looking %s from %s" % |
| 1027 (ipath_a, path_a, ipath_b, path_b)) |
| 1028 |
| 1029 |
| 1030 errors = {} |
| 1031 sdirs_a = set(idirs_a) |
| 1032 sdirs_b = set(idirs_b) |
| 1033 errors["unexpected directories"] = sdirs_a - sdirs_b |
| 1034 errors["missing directories"] = sdirs_b - sdirs_a |
| 1035 |
| 1036 sfiles_a = set(ifiles_a) |
| 1037 sfiles_b = set(ifiles_b) |
| 1038 errors["unexpected files"] = sfiles_a - sfiles_b |
| 1039 errors["missing files"] = sfiles_b - sfiles_a |
| 1040 |
| 1041 |
| 1042 msgs = [ "%s: %s"% (name, items) |
| 1043 for name, items in errors.iteritems() if items] |
| 1044 |
| 1045 if msgs: |
| 1046 msgs.insert(0, "%s and %s differ :" % ( |
| 1047 osp.join(path_a, ipath_a), |
| 1048 osp.join(path_b, ipath_b), |
| 1049 )) |
| 1050 self.fail("\n".join(msgs)) |
| 1051 |
| 1052 for files in (ifiles_a, ifiles_b): |
| 1053 files.sort() |
| 1054 |
| 1055 for index, path in enumerate(ifiles_a): |
| 1056 self.assertFileEquals(osp.join(path_a, ipath_a, path), |
| 1057 osp.join(path_b, ipath_b, ifiles_b[index])) |
| 1058 |
| 1059 except StopIteration: |
| 1060 break |
| 1061 |
| 1062 assertDirEqual = assertDirEquals |
| 1063 |
| 1064 def assertIsInstance(self, obj, klass, msg=None, strict=False): |
| 1065 """check if an object is an instance of a class |
| 1066 |
| 1067 :param obj: the Python Object to be checked |
| 1068 :param klass: the target class |
| 1069 :param msg: a String for a custom message |
| 1070 :param strict: if True, check that the class of <obj> is <klass>; |
| 1071 else check with 'isinstance' |
| 1072 """ |
| 1073 if strict: |
| 1074 warnings.warn('[API] Non-standard. Strict parameter has vanished', |
| 1075 DeprecationWarning, stacklevel=2) |
| 1076 if msg is None: |
| 1077 if strict: |
| 1078 msg = '%r is not of class %s but of %s' |
| 1079 else: |
| 1080 msg = '%r is not an instance of %s but of %s' |
| 1081 msg = msg % (obj, klass, type(obj)) |
| 1082 if strict: |
| 1083 self.assert_(obj.__class__ is klass, msg) |
| 1084 else: |
| 1085 self.assert_(isinstance(obj, klass), msg) |
| 1086 |
| 1087 @deprecated('Please use assertIsNone instead.') |
| 1088 def assertNone(self, obj, msg=None): |
| 1089 """assert obj is None |
| 1090 |
| 1091 :param obj: Python Object to be tested |
| 1092 """ |
| 1093 if msg is None: |
| 1094 msg = "reference to %r when None expected"%(obj,) |
| 1095 self.assert_( obj is None, msg ) |
| 1096 |
| 1097 @deprecated('Please use assertIsNotNone instead.') |
| 1098 def assertNotNone(self, obj, msg=None): |
| 1099 """assert obj is not None""" |
| 1100 if msg is None: |
| 1101 msg = "unexpected reference to None" |
| 1102 self.assert_( obj is not None, msg ) |
| 1103 |
| 1104 @deprecated('Non-standard. Please use assertAlmostEqual instead.') |
| 1105 def assertFloatAlmostEquals(self, obj, other, prec=1e-5, |
| 1106 relative=False, msg=None): |
| 1107 """compares if two floats have a distance smaller than expected |
| 1108 precision. |
| 1109 |
| 1110 :param obj: a Float |
| 1111 :param other: another Float to be comparted to <obj> |
| 1112 :param prec: a Float describing the precision |
| 1113 :param relative: boolean switching to relative/absolute precision |
| 1114 :param msg: a String for a custom message |
| 1115 """ |
| 1116 if msg is None: |
| 1117 msg = "%r != %r" % (obj, other) |
| 1118 if relative: |
| 1119 prec = prec*math.fabs(obj) |
| 1120 self.assert_(math.fabs(obj - other) < prec, msg) |
| 1121 |
| 1122 def failUnlessRaises(self, excClass, callableObj=None, *args, **kwargs): |
| 1123 """override default failUnlessRaises method to return the raised |
| 1124 exception instance. |
| 1125 |
| 1126 Fail unless an exception of class excClass is thrown |
| 1127 by callableObj when invoked with arguments args and keyword |
| 1128 arguments kwargs. If a different type of exception is |
| 1129 thrown, it will not be caught, and the test case will be |
| 1130 deemed to have suffered an error, exactly as for an |
| 1131 unexpected exception. |
| 1132 |
| 1133 CAUTION! There are subtle differences between Logilab and unittest2 |
| 1134 - exc is not returned in standard version |
| 1135 - context capabilities in standard version |
| 1136 - try/except/else construction (minor) |
| 1137 |
| 1138 :param excClass: the Exception to be raised |
| 1139 :param callableObj: a callable Object which should raise <excClass> |
| 1140 :param args: a List of arguments for <callableObj> |
| 1141 :param kwargs: a List of keyword arguments for <callableObj> |
| 1142 """ |
| 1143 # XXX cube vcslib : test_branches_from_app |
| 1144 if callableObj is None: |
| 1145 _assert = super(TestCase, self).assertRaises |
| 1146 return _assert(excClass, callableObj, *args, **kwargs) |
| 1147 try: |
| 1148 callableObj(*args, **kwargs) |
| 1149 except excClass, exc: |
| 1150 class ProxyException: |
| 1151 def __init__(self, obj): |
| 1152 self._obj = obj |
| 1153 def __getattr__(self, attr): |
| 1154 warn_msg = ("This exception was retrieved with the old testl
ib way " |
| 1155 "`exc = self.assertRaises(Exc, callable)`, pleas
e use " |
| 1156 "the context manager instead'") |
| 1157 warnings.warn(warn_msg, DeprecationWarning, 2) |
| 1158 return self._obj.__getattribute__(attr) |
| 1159 return ProxyException(exc) |
| 1160 else: |
| 1161 if hasattr(excClass, '__name__'): |
| 1162 excName = excClass.__name__ |
| 1163 else: |
| 1164 excName = str(excClass) |
| 1165 raise self.failureException("%s not raised" % excName) |
| 1166 |
| 1167 assertRaises = failUnlessRaises |
| 1168 |
| 1169 |
| 1170 import doctest |
| 1171 |
| 1172 class SkippedSuite(unittest.TestSuite): |
| 1173 def test(self): |
| 1174 """just there to trigger test execution""" |
| 1175 self.skipped_test('doctest module has no DocTestSuite class') |
| 1176 |
| 1177 |
| 1178 class DocTestFinder(doctest.DocTestFinder): |
| 1179 |
| 1180 def __init__(self, *args, **kwargs): |
| 1181 self.skipped = kwargs.pop('skipped', ()) |
| 1182 doctest.DocTestFinder.__init__(self, *args, **kwargs) |
| 1183 |
| 1184 def _get_test(self, obj, name, module, globs, source_lines): |
| 1185 """override default _get_test method to be able to skip tests |
| 1186 according to skipped attribute's value |
| 1187 |
| 1188 Note: Python (<=2.4) use a _name_filter which could be used for that |
| 1189 purpose but it's no longer available in 2.5 |
| 1190 Python 2.5 seems to have a [SKIP] flag |
| 1191 """ |
| 1192 if getattr(obj, '__name__', '') in self.skipped: |
| 1193 return None |
| 1194 return doctest.DocTestFinder._get_test(self, obj, name, module, |
| 1195 globs, source_lines) |
| 1196 |
| 1197 |
| 1198 class DocTest(TestCase): |
| 1199 """trigger module doctest |
| 1200 I don't know how to make unittest.main consider the DocTestSuite instance |
| 1201 without this hack |
| 1202 """ |
| 1203 skipped = () |
| 1204 def __call__(self, result=None, runcondition=None, options=None):\ |
| 1205 # pylint: disable=W0613 |
| 1206 try: |
| 1207 finder = DocTestFinder(skipped=self.skipped) |
| 1208 if sys.version_info >= (2, 4): |
| 1209 suite = doctest.DocTestSuite(self.module, test_finder=finder) |
| 1210 if sys.version_info >= (2, 5): |
| 1211 # XXX iirk |
| 1212 doctest.DocTestCase._TestCase__exc_info = sys.exc_info |
| 1213 else: |
| 1214 suite = doctest.DocTestSuite(self.module) |
| 1215 except AttributeError: |
| 1216 suite = SkippedSuite() |
| 1217 return suite.run(result) |
| 1218 run = __call__ |
| 1219 |
| 1220 def test(self): |
| 1221 """just there to trigger test execution""" |
| 1222 |
| 1223 MAILBOX = None |
| 1224 |
| 1225 class MockSMTP: |
| 1226 """fake smtplib.SMTP""" |
| 1227 |
| 1228 def __init__(self, host, port): |
| 1229 self.host = host |
| 1230 self.port = port |
| 1231 global MAILBOX |
| 1232 self.reveived = MAILBOX = [] |
| 1233 |
| 1234 def set_debuglevel(self, debuglevel): |
| 1235 """ignore debug level""" |
| 1236 |
| 1237 def sendmail(self, fromaddr, toaddres, body): |
| 1238 """push sent mail in the mailbox""" |
| 1239 self.reveived.append((fromaddr, toaddres, body)) |
| 1240 |
| 1241 def quit(self): |
| 1242 """ignore quit""" |
| 1243 |
| 1244 |
| 1245 class MockConfigParser(ConfigParser): |
| 1246 """fake ConfigParser.ConfigParser""" |
| 1247 |
| 1248 def __init__(self, options): |
| 1249 ConfigParser.__init__(self) |
| 1250 for section, pairs in options.iteritems(): |
| 1251 self.add_section(section) |
| 1252 for key, value in pairs.iteritems(): |
| 1253 self.set(section, key, value) |
| 1254 def write(self, _): |
| 1255 raise NotImplementedError() |
| 1256 |
| 1257 |
| 1258 class MockConnection: |
| 1259 """fake DB-API 2.0 connexion AND cursor (i.e. cursor() return self)""" |
| 1260 |
| 1261 def __init__(self, results): |
| 1262 self.received = [] |
| 1263 self.states = [] |
| 1264 self.results = results |
| 1265 |
| 1266 def cursor(self): |
| 1267 """Mock cursor method""" |
| 1268 return self |
| 1269 def execute(self, query, args=None): |
| 1270 """Mock execute method""" |
| 1271 self.received.append( (query, args) ) |
| 1272 def fetchone(self): |
| 1273 """Mock fetchone method""" |
| 1274 return self.results[0] |
| 1275 def fetchall(self): |
| 1276 """Mock fetchall method""" |
| 1277 return self.results |
| 1278 def commit(self): |
| 1279 """Mock commiy method""" |
| 1280 self.states.append( ('commit', len(self.received)) ) |
| 1281 def rollback(self): |
| 1282 """Mock rollback method""" |
| 1283 self.states.append( ('rollback', len(self.received)) ) |
| 1284 def close(self): |
| 1285 """Mock close method""" |
| 1286 pass |
| 1287 |
| 1288 |
| 1289 def mock_object(**params): |
| 1290 """creates an object using params to set attributes |
| 1291 >>> option = mock_object(verbose=False, index=range(5)) |
| 1292 >>> option.verbose |
| 1293 False |
| 1294 >>> option.index |
| 1295 [0, 1, 2, 3, 4] |
| 1296 """ |
| 1297 return type('Mock', (), params)() |
| 1298 |
| 1299 |
| 1300 def create_files(paths, chroot): |
| 1301 """Creates directories and files found in <path>. |
| 1302 |
| 1303 :param paths: list of relative paths to files or directories |
| 1304 :param chroot: the root directory in which paths will be created |
| 1305 |
| 1306 >>> from os.path import isdir, isfile |
| 1307 >>> isdir('/tmp/a') |
| 1308 False |
| 1309 >>> create_files(['a/b/foo.py', 'a/b/c/', 'a/b/c/d/e.py'], '/tmp') |
| 1310 >>> isdir('/tmp/a') |
| 1311 True |
| 1312 >>> isdir('/tmp/a/b/c') |
| 1313 True |
| 1314 >>> isfile('/tmp/a/b/c/d/e.py') |
| 1315 True |
| 1316 >>> isfile('/tmp/a/b/foo.py') |
| 1317 True |
| 1318 """ |
| 1319 dirs, files = set(), set() |
| 1320 for path in paths: |
| 1321 path = osp.join(chroot, path) |
| 1322 filename = osp.basename(path) |
| 1323 # path is a directory path |
| 1324 if filename == '': |
| 1325 dirs.add(path) |
| 1326 # path is a filename path |
| 1327 else: |
| 1328 dirs.add(osp.dirname(path)) |
| 1329 files.add(path) |
| 1330 for dirpath in dirs: |
| 1331 if not osp.isdir(dirpath): |
| 1332 os.makedirs(dirpath) |
| 1333 for filepath in files: |
| 1334 open(filepath, 'w').close() |
| 1335 |
| 1336 |
| 1337 class AttrObject: # XXX cf mock_object |
| 1338 def __init__(self, **kwargs): |
| 1339 self.__dict__.update(kwargs) |
| 1340 |
| 1341 def tag(*args, **kwargs): |
| 1342 """descriptor adding tag to a function""" |
| 1343 def desc(func): |
| 1344 assert not hasattr(func, 'tags') |
| 1345 func.tags = Tags(*args, **kwargs) |
| 1346 return func |
| 1347 return desc |
| 1348 |
| 1349 def require_version(version): |
| 1350 """ Compare version of python interpreter to the given one. Skip the test |
| 1351 if older. |
| 1352 """ |
| 1353 def check_require_version(f): |
| 1354 version_elements = version.split('.') |
| 1355 try: |
| 1356 compare = tuple([int(v) for v in version_elements]) |
| 1357 except ValueError: |
| 1358 raise ValueError('%s is not a correct version : should be X.Y[.Z].'
% version) |
| 1359 current = sys.version_info[:3] |
| 1360 if current < compare: |
| 1361 def new_f(self, *args, **kwargs): |
| 1362 self.skipTest('Need at least %s version of python. Current versi
on is %s.' % (version, '.'.join([str(element) for element in current]))) |
| 1363 new_f.__name__ = f.__name__ |
| 1364 return new_f |
| 1365 else: |
| 1366 return f |
| 1367 return check_require_version |
| 1368 |
| 1369 def require_module(module): |
| 1370 """ Check if the given module is loaded. Skip the test if not. |
| 1371 """ |
| 1372 def check_require_module(f): |
| 1373 try: |
| 1374 __import__(module) |
| 1375 return f |
| 1376 except ImportError: |
| 1377 def new_f(self, *args, **kwargs): |
| 1378 self.skipTest('%s can not be imported.' % module) |
| 1379 new_f.__name__ = f.__name__ |
| 1380 return new_f |
| 1381 return check_require_module |
| 1382 |
OLD | NEW |