Index: third_party/logilab/common/pytest.py |
diff --git a/third_party/logilab/common/pytest.py b/third_party/logilab/common/pytest.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2d6ccf9520d59f2a46f3943a59b830b936570fdc |
--- /dev/null |
+++ b/third_party/logilab/common/pytest.py |
@@ -0,0 +1,1177 @@ |
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
+# |
+# This file is part of logilab-common. |
+# |
+# logilab-common is free software: you can redistribute it and/or modify it under |
+# the terms of the GNU Lesser General Public License as published by the Free |
+# Software Foundation, either version 2.1 of the License, or (at your option) any |
+# later version. |
+# |
+# logilab-common is distributed in the hope that it will be useful, but WITHOUT |
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
+# details. |
+# |
+# You should have received a copy of the GNU Lesser General Public License along |
+# with logilab-common. If not, see <http://www.gnu.org/licenses/>. |
+"""pytest is a tool that eases test running and debugging. |
+ |
+To be able to use pytest, you should either write tests using |
+the logilab.common.testlib's framework or the unittest module of the |
+Python's standard library. |
+ |
+You can customize pytest's behaviour by defining a ``pytestconf.py`` file |
+somewhere in your test directory. In this file, you can add options or |
+change the way tests are run. |
+ |
+To add command line options, you must define a ``update_parser`` function in |
+your ``pytestconf.py`` file. The function must accept a single parameter |
+that will be the OptionParser's instance to customize. |
+ |
+If you wish to customize the tester, you'll have to define a class named |
+``CustomPyTester``. This class should extend the default `PyTester` class |
+defined in the pytest module. Take a look at the `PyTester` and `DjangoTester` |
+classes for more information about what can be done. |
+ |
+For instance, if you wish to add a custom -l option to specify a loglevel, you |
+could define the following ``pytestconf.py`` file :: |
+ |
+ import logging |
+ from logilab.common.pytest import PyTester |
+ |
+ def update_parser(parser): |
+ parser.add_option('-l', '--loglevel', dest='loglevel', action='store', |
+ choices=('debug', 'info', 'warning', 'error', 'critical'), |
+ default='critical', help="the default log level possible choices are " |
+ "('debug', 'info', 'warning', 'error', 'critical')") |
+ return parser |
+ |
+ |
+ class CustomPyTester(PyTester): |
+ def __init__(self, cvg, options): |
+ super(CustomPyTester, self).__init__(cvg, options) |
+ loglevel = options.loglevel.upper() |
+ logger = logging.getLogger('erudi') |
+ logger.setLevel(logging.getLevelName(loglevel)) |
+ |
+ |
+In your TestCase class you can then get the value of a specific option with |
+the ``optval`` method:: |
+ |
+ class MyTestCase(TestCase): |
+ def test_foo(self): |
+ loglevel = self.optval('loglevel') |
+ # ... |
+ |
+ |
+You can also tag your tag your test for fine filtering |
+ |
+With those tag:: |
+ |
+ from logilab.common.testlib import tag, TestCase |
+ |
+ class Exemple(TestCase): |
+ |
+ @tag('rouge', 'carre') |
+ def toto(self): |
+ pass |
+ |
+ @tag('carre', 'vert') |
+ def tata(self): |
+ pass |
+ |
+ @tag('rouge') |
+ def titi(test): |
+ pass |
+ |
+you can filter the function with a simple python expression |
+ |
+ * ``toto`` and ``titi`` match ``rouge`` |
+ * ``toto``, ``tata`` and ``titi``, match ``rouge or carre`` |
+ * ``tata`` and ``titi`` match``rouge ^ carre`` |
+ * ``titi`` match ``rouge and not carre`` |
+""" |
+__docformat__ = "restructuredtext en" |
+ |
+PYTEST_DOC = """%prog [OPTIONS] [testfile [testpattern]] |
+ |
+examples: |
+ |
+pytest path/to/mytests.py |
+pytest path/to/mytests.py TheseTests |
+pytest path/to/mytests.py TheseTests.test_thisone |
+pytest path/to/mytests.py -m '(not long and database) or regr' |
+ |
+pytest one (will run both test_thisone and test_thatone) |
+pytest path/to/mytests.py -s not (will skip test_notthisone) |
+ |
+pytest --coverage test_foo.py |
+ (only if logilab.devtools is available) |
+""" |
+ |
+ENABLE_DBC = False |
+FILE_RESTART = ".pytest.restart" |
+ |
+import os, sys, re |
+import os.path as osp |
+from time import time, clock |
+import warnings |
+import types |
+ |
+from logilab.common.fileutils import abspath_listdir |
+from logilab.common import textutils |
+from logilab.common import testlib, STD_BLACKLIST |
+# use the same unittest module as testlib |
+from logilab.common.testlib import unittest, start_interactive_mode |
+from logilab.common.compat import any |
+import doctest |
+ |
+import unittest as unittest_legacy |
+if not getattr(unittest_legacy, "__package__", None): |
+ try: |
+ import unittest2.suite as unittest_suite |
+ except ImportError: |
+ sys.exit("You have to install python-unittest2 to use this module") |
+else: |
+ import unittest.suite as unittest_suite |
+ |
+try: |
+ import django |
+ from logilab.common.modutils import modpath_from_file, load_module_from_modpath |
+ DJANGO_FOUND = True |
+except ImportError: |
+ DJANGO_FOUND = False |
+ |
+CONF_FILE = 'pytestconf.py' |
+ |
+## coverage hacks, do not read this, do not read this, do not read this |
+ |
+# hey, but this is an aspect, right ?!!! |
+class TraceController(object): |
+ nesting = 0 |
+ |
+ def pause_tracing(cls): |
+ if not cls.nesting: |
+ cls.tracefunc = staticmethod(getattr(sys, '__settrace__', sys.settrace)) |
+ cls.oldtracer = getattr(sys, '__tracer__', None) |
+ sys.__notrace__ = True |
+ cls.tracefunc(None) |
+ cls.nesting += 1 |
+ pause_tracing = classmethod(pause_tracing) |
+ |
+ def resume_tracing(cls): |
+ cls.nesting -= 1 |
+ assert cls.nesting >= 0 |
+ if not cls.nesting: |
+ cls.tracefunc(cls.oldtracer) |
+ delattr(sys, '__notrace__') |
+ resume_tracing = classmethod(resume_tracing) |
+ |
+ |
+pause_tracing = TraceController.pause_tracing |
+resume_tracing = TraceController.resume_tracing |
+ |
+ |
+def nocoverage(func): |
+ if hasattr(func, 'uncovered'): |
+ return func |
+ func.uncovered = True |
+ def not_covered(*args, **kwargs): |
+ pause_tracing() |
+ try: |
+ return func(*args, **kwargs) |
+ finally: |
+ resume_tracing() |
+ not_covered.uncovered = True |
+ return not_covered |
+ |
+ |
+## end of coverage hacks |
+ |
+ |
+TESTFILE_RE = re.compile("^((unit)?test.*|smoketest)\.py$") |
+def this_is_a_testfile(filename): |
+ """returns True if `filename` seems to be a test file""" |
+ return TESTFILE_RE.match(osp.basename(filename)) |
+ |
+TESTDIR_RE = re.compile("^(unit)?tests?$") |
+def this_is_a_testdir(dirpath): |
+ """returns True if `filename` seems to be a test directory""" |
+ return TESTDIR_RE.match(osp.basename(dirpath)) |
+ |
+ |
+def load_pytest_conf(path, parser): |
+ """loads a ``pytestconf.py`` file and update default parser |
+ and / or tester. |
+ """ |
+ namespace = {} |
+ execfile(path, namespace) |
+ if 'update_parser' in namespace: |
+ namespace['update_parser'](parser) |
+ return namespace.get('CustomPyTester', PyTester) |
+ |
+ |
+def project_root(parser, projdir=os.getcwd()): |
+ """try to find project's root and add it to sys.path""" |
+ previousdir = curdir = osp.abspath(projdir) |
+ testercls = PyTester |
+ conf_file_path = osp.join(curdir, CONF_FILE) |
+ if osp.isfile(conf_file_path): |
+ testercls = load_pytest_conf(conf_file_path, parser) |
+ while this_is_a_testdir(curdir) or \ |
+ osp.isfile(osp.join(curdir, '__init__.py')): |
+ newdir = osp.normpath(osp.join(curdir, os.pardir)) |
+ if newdir == curdir: |
+ break |
+ previousdir = curdir |
+ curdir = newdir |
+ conf_file_path = osp.join(curdir, CONF_FILE) |
+ if osp.isfile(conf_file_path): |
+ testercls = load_pytest_conf(conf_file_path, parser) |
+ return previousdir, testercls |
+ |
+ |
+class GlobalTestReport(object): |
+ """this class holds global test statistics""" |
+ def __init__(self): |
+ self.ran = 0 |
+ self.skipped = 0 |
+ self.failures = 0 |
+ self.errors = 0 |
+ self.ttime = 0 |
+ self.ctime = 0 |
+ self.modulescount = 0 |
+ self.errmodules = [] |
+ |
+ def feed(self, filename, testresult, ttime, ctime): |
+ """integrates new test information into internal statistics""" |
+ ran = testresult.testsRun |
+ self.ran += ran |
+ self.skipped += len(getattr(testresult, 'skipped', ())) |
+ self.failures += len(testresult.failures) |
+ self.errors += len(testresult.errors) |
+ self.ttime += ttime |
+ self.ctime += ctime |
+ self.modulescount += 1 |
+ if not testresult.wasSuccessful(): |
+ problems = len(testresult.failures) + len(testresult.errors) |
+ self.errmodules.append((filename[:-3], problems, ran)) |
+ |
+ def failed_to_test_module(self, filename): |
+ """called when the test module could not be imported by unittest |
+ """ |
+ self.errors += 1 |
+ self.modulescount += 1 |
+ self.ran += 1 |
+ self.errmodules.append((filename[:-3], 1, 1)) |
+ |
+ def skip_module(self, filename): |
+ self.modulescount += 1 |
+ self.ran += 1 |
+ self.errmodules.append((filename[:-3], 0, 0)) |
+ |
+ def __str__(self): |
+ """this is just presentation stuff""" |
+ line1 = ['Ran %s test cases in %.2fs (%.2fs CPU)' |
+ % (self.ran, self.ttime, self.ctime)] |
+ if self.errors: |
+ line1.append('%s errors' % self.errors) |
+ if self.failures: |
+ line1.append('%s failures' % self.failures) |
+ if self.skipped: |
+ line1.append('%s skipped' % self.skipped) |
+ modulesok = self.modulescount - len(self.errmodules) |
+ if self.errors or self.failures: |
+ line2 = '%s modules OK (%s failed)' % (modulesok, |
+ len(self.errmodules)) |
+ descr = ', '.join(['%s [%s/%s]' % info for info in self.errmodules]) |
+ line3 = '\nfailures: %s' % descr |
+ elif modulesok: |
+ line2 = 'All %s modules OK' % modulesok |
+ line3 = '' |
+ else: |
+ return '' |
+ return '%s\n%s%s' % (', '.join(line1), line2, line3) |
+ |
+ |
+ |
+def remove_local_modules_from_sys(testdir): |
+ """remove all modules from cache that come from `testdir` |
+ |
+ This is used to avoid strange side-effects when using the |
+ testall() mode of pytest. |
+ For instance, if we run pytest on this tree:: |
+ |
+ A/test/test_utils.py |
+ B/test/test_utils.py |
+ |
+ we **have** to clean sys.modules to make sure the correct test_utils |
+ module is ran in B |
+ """ |
+ for modname, mod in sys.modules.items(): |
+ if mod is None: |
+ continue |
+ if not hasattr(mod, '__file__'): |
+ # this is the case of some built-in modules like sys, imp, marshal |
+ continue |
+ modfile = mod.__file__ |
+ # if modfile is not an absolute path, it was probably loaded locally |
+ # during the tests |
+ if not osp.isabs(modfile) or modfile.startswith(testdir): |
+ del sys.modules[modname] |
+ |
+ |
+ |
+class PyTester(object): |
+ """encapsulates testrun logic""" |
+ |
+ def __init__(self, cvg, options): |
+ self.report = GlobalTestReport() |
+ self.cvg = cvg |
+ self.options = options |
+ self.firstwrite = True |
+ self._errcode = None |
+ |
+ def show_report(self): |
+ """prints the report and returns appropriate exitcode""" |
+ # everything has been ran, print report |
+ print "*" * 79 |
+ print self.report |
+ |
+ def get_errcode(self): |
+ # errcode set explicitly |
+ if self._errcode is not None: |
+ return self._errcode |
+ return self.report.failures + self.report.errors |
+ |
+ def set_errcode(self, errcode): |
+ self._errcode = errcode |
+ errcode = property(get_errcode, set_errcode) |
+ |
+ def testall(self, exitfirst=False): |
+ """walks through current working directory, finds something |
+ which can be considered as a testdir and runs every test there |
+ """ |
+ here = os.getcwd() |
+ for dirname, dirs, _ in os.walk(here): |
+ for skipped in STD_BLACKLIST: |
+ if skipped in dirs: |
+ dirs.remove(skipped) |
+ basename = osp.basename(dirname) |
+ if this_is_a_testdir(basename): |
+ print "going into", dirname |
+ # we found a testdir, let's explore it ! |
+ if not self.testonedir(dirname, exitfirst): |
+ break |
+ dirs[:] = [] |
+ if self.report.ran == 0: |
+ print "no test dir found testing here:", here |
+ # if no test was found during the visit, consider |
+ # the local directory as a test directory even if |
+ # it doesn't have a traditional test directory name |
+ self.testonedir(here) |
+ |
+ def testonedir(self, testdir, exitfirst=False): |
+ """finds each testfile in the `testdir` and runs it |
+ |
+ return true when all tests has been executed, false if exitfirst and |
+ some test has failed. |
+ """ |
+ for filename in abspath_listdir(testdir): |
+ if this_is_a_testfile(filename): |
+ if self.options.exitfirst and not self.options.restart: |
+ # overwrite restart file |
+ try: |
+ restartfile = open(FILE_RESTART, "w") |
+ restartfile.close() |
+ except Exception, e: |
+ print >> sys.__stderr__, "Error while overwriting \ |
+succeeded test file :", osp.join(os.getcwd(), FILE_RESTART) |
+ raise e |
+ # run test and collect information |
+ prog = self.testfile(filename, batchmode=True) |
+ if exitfirst and (prog is None or not prog.result.wasSuccessful()): |
+ return False |
+ self.firstwrite = True |
+ # clean local modules |
+ remove_local_modules_from_sys(testdir) |
+ return True |
+ |
+ def testfile(self, filename, batchmode=False): |
+ """runs every test in `filename` |
+ |
+ :param filename: an absolute path pointing to a unittest file |
+ """ |
+ here = os.getcwd() |
+ dirname = osp.dirname(filename) |
+ if dirname: |
+ os.chdir(dirname) |
+ # overwrite restart file if it has not been done already |
+ if self.options.exitfirst and not self.options.restart and self.firstwrite: |
+ try: |
+ restartfile = open(FILE_RESTART, "w") |
+ restartfile.close() |
+ except Exception, e: |
+ print >> sys.__stderr__, "Error while overwriting \ |
+succeeded test file :", osp.join(os.getcwd(), FILE_RESTART) |
+ raise e |
+ modname = osp.basename(filename)[:-3] |
+ try: |
+ print >> sys.stderr, (' %s ' % osp.basename(filename)).center(70, '=') |
+ except TypeError: # < py 2.4 bw compat |
+ print >> sys.stderr, (' %s ' % osp.basename(filename)).center(70) |
+ try: |
+ tstart, cstart = time(), clock() |
+ try: |
+ testprog = SkipAwareTestProgram(modname, batchmode=batchmode, cvg=self.cvg, |
+ options=self.options, outstream=sys.stderr) |
+ except KeyboardInterrupt: |
+ raise |
+ except SystemExit, exc: |
+ self.errcode = exc.code |
+ raise |
+ except testlib.SkipTest: |
+ print "Module skipped:", filename |
+ self.report.skip_module(filename) |
+ return None |
+ except Exception: |
+ self.report.failed_to_test_module(filename) |
+ print >> sys.stderr, 'unhandled exception occurred while testing', modname |
+ import traceback |
+ traceback.print_exc(file=sys.stderr) |
+ return None |
+ |
+ tend, cend = time(), clock() |
+ ttime, ctime = (tend - tstart), (cend - cstart) |
+ self.report.feed(filename, testprog.result, ttime, ctime) |
+ return testprog |
+ finally: |
+ if dirname: |
+ os.chdir(here) |
+ |
+ |
+ |
+class DjangoTester(PyTester): |
+ |
+ def load_django_settings(self, dirname): |
+ """try to find project's setting and load it""" |
+ curdir = osp.abspath(dirname) |
+ previousdir = curdir |
+ while not osp.isfile(osp.join(curdir, 'settings.py')) and \ |
+ osp.isfile(osp.join(curdir, '__init__.py')): |
+ newdir = osp.normpath(osp.join(curdir, os.pardir)) |
+ if newdir == curdir: |
+ raise AssertionError('could not find settings.py') |
+ previousdir = curdir |
+ curdir = newdir |
+ # late django initialization |
+ settings = load_module_from_modpath(modpath_from_file(osp.join(curdir, 'settings.py'))) |
+ from django.core.management import setup_environ |
+ setup_environ(settings) |
+ settings.DEBUG = False |
+ self.settings = settings |
+ # add settings dir to pythonpath since it's the project's root |
+ if curdir not in sys.path: |
+ sys.path.insert(1, curdir) |
+ |
+ def before_testfile(self): |
+ # Those imports must be done **after** setup_environ was called |
+ from django.test.utils import setup_test_environment |
+ from django.test.utils import create_test_db |
+ setup_test_environment() |
+ create_test_db(verbosity=0) |
+ self.dbname = self.settings.TEST_DATABASE_NAME |
+ |
+ def after_testfile(self): |
+ # Those imports must be done **after** setup_environ was called |
+ from django.test.utils import teardown_test_environment |
+ from django.test.utils import destroy_test_db |
+ teardown_test_environment() |
+ print 'destroying', self.dbname |
+ destroy_test_db(self.dbname, verbosity=0) |
+ |
+ def testall(self, exitfirst=False): |
+ """walks through current working directory, finds something |
+ which can be considered as a testdir and runs every test there |
+ """ |
+ for dirname, dirs, files in os.walk(os.getcwd()): |
+ for skipped in ('CVS', '.svn', '.hg'): |
+ if skipped in dirs: |
+ dirs.remove(skipped) |
+ if 'tests.py' in files: |
+ if not self.testonedir(dirname, exitfirst): |
+ break |
+ dirs[:] = [] |
+ else: |
+ basename = osp.basename(dirname) |
+ if basename in ('test', 'tests'): |
+ print "going into", dirname |
+ # we found a testdir, let's explore it ! |
+ if not self.testonedir(dirname, exitfirst): |
+ break |
+ dirs[:] = [] |
+ |
+ def testonedir(self, testdir, exitfirst=False): |
+ """finds each testfile in the `testdir` and runs it |
+ |
+ return true when all tests has been executed, false if exitfirst and |
+ some test has failed. |
+ """ |
+ # special django behaviour : if tests are splitted in several files, |
+ # remove the main tests.py file and tests each test file separately |
+ testfiles = [fpath for fpath in abspath_listdir(testdir) |
+ if this_is_a_testfile(fpath)] |
+ if len(testfiles) > 1: |
+ try: |
+ testfiles.remove(osp.join(testdir, 'tests.py')) |
+ except ValueError: |
+ pass |
+ for filename in testfiles: |
+ # run test and collect information |
+ prog = self.testfile(filename, batchmode=True) |
+ if exitfirst and (prog is None or not prog.result.wasSuccessful()): |
+ return False |
+ # clean local modules |
+ remove_local_modules_from_sys(testdir) |
+ return True |
+ |
+ def testfile(self, filename, batchmode=False): |
+ """runs every test in `filename` |
+ |
+ :param filename: an absolute path pointing to a unittest file |
+ """ |
+ here = os.getcwd() |
+ dirname = osp.dirname(filename) |
+ if dirname: |
+ os.chdir(dirname) |
+ self.load_django_settings(dirname) |
+ modname = osp.basename(filename)[:-3] |
+ print >>sys.stderr, (' %s ' % osp.basename(filename)).center(70, '=') |
+ try: |
+ try: |
+ tstart, cstart = time(), clock() |
+ self.before_testfile() |
+ testprog = SkipAwareTestProgram(modname, batchmode=batchmode, cvg=self.cvg) |
+ tend, cend = time(), clock() |
+ ttime, ctime = (tend - tstart), (cend - cstart) |
+ self.report.feed(filename, testprog.result, ttime, ctime) |
+ return testprog |
+ except SystemExit: |
+ raise |
+ except Exception, exc: |
+ import traceback |
+ traceback.print_exc() |
+ self.report.failed_to_test_module(filename) |
+ print 'unhandled exception occurred while testing', modname |
+ print 'error: %s' % exc |
+ return None |
+ finally: |
+ self.after_testfile() |
+ if dirname: |
+ os.chdir(here) |
+ |
+ |
+def make_parser(): |
+ """creates the OptionParser instance |
+ """ |
+ from optparse import OptionParser |
+ parser = OptionParser(usage=PYTEST_DOC) |
+ |
+ parser.newargs = [] |
+ def rebuild_cmdline(option, opt, value, parser): |
+ """carry the option to unittest_main""" |
+ parser.newargs.append(opt) |
+ |
+ def rebuild_and_store(option, opt, value, parser): |
+ """carry the option to unittest_main and store |
+ the value on current parser |
+ """ |
+ parser.newargs.append(opt) |
+ setattr(parser.values, option.dest, True) |
+ |
+ def capture_and_rebuild(option, opt, value, parser): |
+ warnings.simplefilter('ignore', DeprecationWarning) |
+ rebuild_cmdline(option, opt, value, parser) |
+ |
+ # pytest options |
+ parser.add_option('-t', dest='testdir', default=None, |
+ help="directory where the tests will be found") |
+ parser.add_option('-d', dest='dbc', default=False, |
+ action="store_true", help="enable design-by-contract") |
+ # unittest_main options provided and passed through pytest |
+ parser.add_option('-v', '--verbose', callback=rebuild_cmdline, |
+ action="callback", help="Verbose output") |
+ parser.add_option('-i', '--pdb', callback=rebuild_and_store, |
+ dest="pdb", action="callback", |
+ help="Enable test failure inspection (conflicts with --coverage)") |
+ parser.add_option('-x', '--exitfirst', callback=rebuild_and_store, |
+ dest="exitfirst", default=False, |
+ action="callback", help="Exit on first failure " |
+ "(only make sense when pytest run one test file)") |
+ parser.add_option('-R', '--restart', callback=rebuild_and_store, |
+ dest="restart", default=False, |
+ action="callback", |
+ help="Restart tests from where it failed (implies exitfirst) " |
+ "(only make sense if tests previously ran with exitfirst only)") |
+ parser.add_option('--color', callback=rebuild_cmdline, |
+ action="callback", |
+ help="colorize tracebacks") |
+ parser.add_option('-s', '--skip', |
+ # XXX: I wish I could use the callback action but it |
+ # doesn't seem to be able to get the value |
+ # associated to the option |
+ action="store", dest="skipped", default=None, |
+ help="test names matching this name will be skipped " |
+ "to skip several patterns, use commas") |
+ parser.add_option('-q', '--quiet', callback=rebuild_cmdline, |
+ action="callback", help="Minimal output") |
+ parser.add_option('-P', '--profile', default=None, dest='profile', |
+ help="Profile execution and store data in the given file") |
+ parser.add_option('-m', '--match', default=None, dest='tags_pattern', |
+ help="only execute test whose tag match the current pattern") |
+ |
+ try: |
+ from logilab.devtools.lib.coverage import Coverage |
+ parser.add_option('--coverage', dest="coverage", default=False, |
+ action="store_true", |
+ help="run tests with pycoverage (conflicts with --pdb)") |
+ except ImportError: |
+ pass |
+ |
+ if DJANGO_FOUND: |
+ parser.add_option('-J', '--django', dest='django', default=False, |
+ action="store_true", |
+ help='use pytest for django test cases') |
+ return parser |
+ |
+ |
+def parseargs(parser): |
+ """Parse the command line and return (options processed), (options to pass to |
+ unittest_main()), (explicitfile or None). |
+ """ |
+ # parse the command line |
+ options, args = parser.parse_args() |
+ if options.pdb and getattr(options, 'coverage', False): |
+ parser.error("'pdb' and 'coverage' options are exclusive") |
+ filenames = [arg for arg in args if arg.endswith('.py')] |
+ if filenames: |
+ if len(filenames) > 1: |
+ parser.error("only one filename is acceptable") |
+ explicitfile = filenames[0] |
+ args.remove(explicitfile) |
+ else: |
+ explicitfile = None |
+ # someone wants DBC |
+ testlib.ENABLE_DBC = options.dbc |
+ newargs = parser.newargs |
+ if options.skipped: |
+ newargs.extend(['--skip', options.skipped]) |
+ # restart implies exitfirst |
+ if options.restart: |
+ options.exitfirst = True |
+ # append additional args to the new sys.argv and let unittest_main |
+ # do the rest |
+ newargs += args |
+ return options, explicitfile |
+ |
+ |
+ |
+def run(): |
+ parser = make_parser() |
+ rootdir, testercls = project_root(parser) |
+ options, explicitfile = parseargs(parser) |
+ # mock a new command line |
+ sys.argv[1:] = parser.newargs |
+ covermode = getattr(options, 'coverage', None) |
+ cvg = None |
+ if not '' in sys.path: |
+ sys.path.insert(0, '') |
+ if covermode: |
+ # control_import_coverage(rootdir) |
+ from logilab.devtools.lib.coverage import Coverage |
+ cvg = Coverage([rootdir]) |
+ cvg.erase() |
+ cvg.start() |
+ if DJANGO_FOUND and options.django: |
+ tester = DjangoTester(cvg, options) |
+ else: |
+ tester = testercls(cvg, options) |
+ if explicitfile: |
+ cmd, args = tester.testfile, (explicitfile,) |
+ elif options.testdir: |
+ cmd, args = tester.testonedir, (options.testdir, options.exitfirst) |
+ else: |
+ cmd, args = tester.testall, (options.exitfirst,) |
+ try: |
+ try: |
+ if options.profile: |
+ import hotshot |
+ prof = hotshot.Profile(options.profile) |
+ prof.runcall(cmd, *args) |
+ prof.close() |
+ print 'profile data saved in', options.profile |
+ else: |
+ cmd(*args) |
+ except SystemExit: |
+ raise |
+ except: |
+ import traceback |
+ traceback.print_exc() |
+ finally: |
+ if covermode: |
+ cvg.stop() |
+ cvg.save() |
+ tester.show_report() |
+ if covermode: |
+ print 'coverage information stored, use it with pycoverage -ra' |
+ sys.exit(tester.errcode) |
+ |
+class SkipAwareTestProgram(unittest.TestProgram): |
+ # XXX: don't try to stay close to unittest.py, use optparse |
+ USAGE = """\ |
+Usage: %(progName)s [options] [test] [...] |
+ |
+Options: |
+ -h, --help Show this message |
+ -v, --verbose Verbose output |
+ -i, --pdb Enable test failure inspection |
+ -x, --exitfirst Exit on first failure |
+ -s, --skip skip test matching this pattern (no regexp for now) |
+ -q, --quiet Minimal output |
+ --color colorize tracebacks |
+ |
+ -m, --match Run only test whose tag match this pattern |
+ |
+ -P, --profile FILE: Run the tests using cProfile and saving results |
+ in FILE |
+ |
+Examples: |
+ %(progName)s - run default set of tests |
+ %(progName)s MyTestSuite - run suite 'MyTestSuite' |
+ %(progName)s MyTestCase.testSomething - run MyTestCase.testSomething |
+ %(progName)s MyTestCase - run all 'test*' test methods |
+ in MyTestCase |
+""" |
+ def __init__(self, module='__main__', defaultTest=None, batchmode=False, |
+ cvg=None, options=None, outstream=sys.stderr): |
+ self.batchmode = batchmode |
+ self.cvg = cvg |
+ self.options = options |
+ self.outstream = outstream |
+ super(SkipAwareTestProgram, self).__init__( |
+ module=module, defaultTest=defaultTest, |
+ testLoader=NonStrictTestLoader()) |
+ |
+ def parseArgs(self, argv): |
+ self.pdbmode = False |
+ self.exitfirst = False |
+ self.skipped_patterns = [] |
+ self.test_pattern = None |
+ self.tags_pattern = None |
+ self.colorize = False |
+ self.profile_name = None |
+ import getopt |
+ try: |
+ options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:P:', |
+ ['help', 'verbose', 'quiet', 'pdb', |
+ 'exitfirst', 'restart', |
+ 'skip=', 'color', 'match=', 'profile=']) |
+ for opt, value in options: |
+ if opt in ('-h', '-H', '--help'): |
+ self.usageExit() |
+ if opt in ('-i', '--pdb'): |
+ self.pdbmode = True |
+ if opt in ('-x', '--exitfirst'): |
+ self.exitfirst = True |
+ if opt in ('-r', '--restart'): |
+ self.restart = True |
+ self.exitfirst = True |
+ if opt in ('-q', '--quiet'): |
+ self.verbosity = 0 |
+ if opt in ('-v', '--verbose'): |
+ self.verbosity = 2 |
+ if opt in ('-s', '--skip'): |
+ self.skipped_patterns = [pat.strip() for pat in |
+ value.split(', ')] |
+ if opt == '--color': |
+ self.colorize = True |
+ if opt in ('-m', '--match'): |
+ #self.tags_pattern = value |
+ self.options["tag_pattern"] = value |
+ if opt in ('-P', '--profile'): |
+ self.profile_name = value |
+ self.testLoader.skipped_patterns = self.skipped_patterns |
+ if len(args) == 0 and self.defaultTest is None: |
+ suitefunc = getattr(self.module, 'suite', None) |
+ if isinstance(suitefunc, (types.FunctionType, |
+ types.MethodType)): |
+ self.test = self.module.suite() |
+ else: |
+ self.test = self.testLoader.loadTestsFromModule(self.module) |
+ return |
+ if len(args) > 0: |
+ self.test_pattern = args[0] |
+ self.testNames = args |
+ else: |
+ self.testNames = (self.defaultTest, ) |
+ self.createTests() |
+ except getopt.error, msg: |
+ self.usageExit(msg) |
+ |
+ def runTests(self): |
+ if self.profile_name: |
+ import cProfile |
+ cProfile.runctx('self._runTests()', globals(), locals(), self.profile_name ) |
+ else: |
+ return self._runTests() |
+ |
+ def _runTests(self): |
+ self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity, |
+ stream=self.outstream, |
+ exitfirst=self.exitfirst, |
+ pdbmode=self.pdbmode, |
+ cvg=self.cvg, |
+ test_pattern=self.test_pattern, |
+ skipped_patterns=self.skipped_patterns, |
+ colorize=self.colorize, |
+ batchmode=self.batchmode, |
+ options=self.options) |
+ |
+ def removeSucceededTests(obj, succTests): |
+ """ Recursive function that removes succTests from |
+ a TestSuite or TestCase |
+ """ |
+ if isinstance(obj, unittest.TestSuite): |
+ removeSucceededTests(obj._tests, succTests) |
+ if isinstance(obj, list): |
+ for el in obj[:]: |
+ if isinstance(el, unittest.TestSuite): |
+ removeSucceededTests(el, succTests) |
+ elif isinstance(el, unittest.TestCase): |
+ descr = '.'.join((el.__class__.__module__, |
+ el.__class__.__name__, |
+ el._testMethodName)) |
+ if descr in succTests: |
+ obj.remove(el) |
+ # take care, self.options may be None |
+ if getattr(self.options, 'restart', False): |
+ # retrieve succeeded tests from FILE_RESTART |
+ try: |
+ restartfile = open(FILE_RESTART, 'r') |
+ try: |
+ succeededtests = list(elem.rstrip('\n\r') for elem in |
+ restartfile.readlines()) |
+ removeSucceededTests(self.test, succeededtests) |
+ finally: |
+ restartfile.close() |
+ except Exception, ex: |
+ raise Exception("Error while reading succeeded tests into %s: %s" |
+ % (osp.join(os.getcwd(), FILE_RESTART), ex)) |
+ |
+ result = self.testRunner.run(self.test) |
+ # help garbage collection: we want TestSuite, which hold refs to every |
+ # executed TestCase, to be gc'ed |
+ del self.test |
+ if getattr(result, "debuggers", None) and \ |
+ getattr(self, "pdbmode", None): |
+ start_interactive_mode(result) |
+ if not getattr(self, "batchmode", None): |
+ sys.exit(not result.wasSuccessful()) |
+ self.result = result |
+ |
+ |
+class SkipAwareTextTestRunner(unittest.TextTestRunner): |
+ |
+ def __init__(self, stream=sys.stderr, verbosity=1, |
+ exitfirst=False, pdbmode=False, cvg=None, test_pattern=None, |
+ skipped_patterns=(), colorize=False, batchmode=False, |
+ options=None): |
+ super(SkipAwareTextTestRunner, self).__init__(stream=stream, |
+ verbosity=verbosity) |
+ self.exitfirst = exitfirst |
+ self.pdbmode = pdbmode |
+ self.cvg = cvg |
+ self.test_pattern = test_pattern |
+ self.skipped_patterns = skipped_patterns |
+ self.colorize = colorize |
+ self.batchmode = batchmode |
+ self.options = options |
+ |
+ def _this_is_skipped(self, testedname): |
+ return any([(pat in testedname) for pat in self.skipped_patterns]) |
+ |
+ def _runcondition(self, test, skipgenerator=True): |
+ if isinstance(test, testlib.InnerTest): |
+ testname = test.name |
+ else: |
+ if isinstance(test, testlib.TestCase): |
+ meth = test._get_test_method() |
+ func = meth.im_func |
+ testname = '%s.%s' % (meth.im_class.__name__, func.__name__) |
+ elif isinstance(test, types.FunctionType): |
+ func = test |
+ testname = func.__name__ |
+ elif isinstance(test, types.MethodType): |
+ func = test.im_func |
+ testname = '%s.%s' % (test.im_class.__name__, func.__name__) |
+ else: |
+ return True # Not sure when this happens |
+ if testlib.is_generator(test) and skipgenerator: |
+ return self.does_match_tags(test) # Let inner tests decide at run time |
+ if self._this_is_skipped(testname): |
+ return False # this was explicitly skipped |
+ if self.test_pattern is not None: |
+ try: |
+ classpattern, testpattern = self.test_pattern.split('.') |
+ klass, name = testname.split('.') |
+ if classpattern not in klass or testpattern not in name: |
+ return False |
+ except ValueError: |
+ if self.test_pattern not in testname: |
+ return False |
+ |
+ return self.does_match_tags(test) |
+ |
+ def does_match_tags(self, test): |
+ if self.options is not None: |
+ tags_pattern = getattr(self.options, 'tags_pattern', None) |
+ if tags_pattern is not None: |
+ tags = getattr(test, 'tags', testlib.Tags()) |
+ if tags.inherit and isinstance(test, types.MethodType): |
+ tags = tags | getattr(test.im_class, 'tags', testlib.Tags()) |
+ return tags.match(tags_pattern) |
+ return True # no pattern |
+ |
+ def _makeResult(self): |
+ return testlib.SkipAwareTestResult(self.stream, self.descriptions, |
+ self.verbosity, self.exitfirst, |
+ self.pdbmode, self.cvg, self.colorize) |
+ |
+ def run(self, test): |
+ "Run the given test case or test suite." |
+ result = self._makeResult() |
+ startTime = time() |
+ test(result, runcondition=self._runcondition, options=self.options) |
+ stopTime = time() |
+ timeTaken = stopTime - startTime |
+ result.printErrors() |
+ if not self.batchmode: |
+ self.stream.writeln(result.separator2) |
+ run = result.testsRun |
+ self.stream.writeln("Ran %d test%s in %.3fs" % |
+ (run, run != 1 and "s" or "", timeTaken)) |
+ self.stream.writeln() |
+ if not result.wasSuccessful(): |
+ if self.colorize: |
+ self.stream.write(textutils.colorize_ansi("FAILED", color='red')) |
+ else: |
+ self.stream.write("FAILED") |
+ else: |
+ if self.colorize: |
+ self.stream.write(textutils.colorize_ansi("OK", color='green')) |
+ else: |
+ self.stream.write("OK") |
+ failed, errored, skipped = map(len, (result.failures, |
+ result.errors, |
+ result.skipped)) |
+ |
+ det_results = [] |
+ for name, value in (("failures", result.failures), |
+ ("errors",result.errors), |
+ ("skipped", result.skipped)): |
+ if value: |
+ det_results.append("%s=%i" % (name, len(value))) |
+ if det_results: |
+ self.stream.write(" (") |
+ self.stream.write(', '.join(det_results)) |
+ self.stream.write(")") |
+ self.stream.writeln("") |
+ return result |
+ |
+class NonStrictTestLoader(unittest.TestLoader): |
+ """ |
+ Overrides default testloader to be able to omit classname when |
+ specifying tests to run on command line. |
+ |
+ For example, if the file test_foo.py contains :: |
+ |
+ class FooTC(TestCase): |
+ def test_foo1(self): # ... |
+ def test_foo2(self): # ... |
+ def test_bar1(self): # ... |
+ |
+ class BarTC(TestCase): |
+ def test_bar2(self): # ... |
+ |
+ 'python test_foo.py' will run the 3 tests in FooTC |
+ 'python test_foo.py FooTC' will run the 3 tests in FooTC |
+ 'python test_foo.py test_foo' will run test_foo1 and test_foo2 |
+ 'python test_foo.py test_foo1' will run test_foo1 |
+ 'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2 |
+ """ |
+ |
+ def __init__(self): |
+ self.skipped_patterns = () |
+ |
+ # some magic here to accept empty list by extending |
+ # and to provide callable capability |
+ def loadTestsFromNames(self, names, module=None): |
+ suites = [] |
+ for name in names: |
+ suites.extend(self.loadTestsFromName(name, module)) |
+ return self.suiteClass(suites) |
+ |
+ def _collect_tests(self, module): |
+ tests = {} |
+ for obj in vars(module).values(): |
+ if (issubclass(type(obj), (types.ClassType, type)) and |
+ issubclass(obj, unittest.TestCase)): |
+ classname = obj.__name__ |
+ if classname[0] == '_' or self._this_is_skipped(classname): |
+ continue |
+ methodnames = [] |
+ # obj is a TestCase class |
+ for attrname in dir(obj): |
+ if attrname.startswith(self.testMethodPrefix): |
+ attr = getattr(obj, attrname) |
+ if callable(attr): |
+ methodnames.append(attrname) |
+ # keep track of class (obj) for convenience |
+ tests[classname] = (obj, methodnames) |
+ return tests |
+ |
+ def loadTestsFromSuite(self, module, suitename): |
+ try: |
+ suite = getattr(module, suitename)() |
+ except AttributeError: |
+ return [] |
+ assert hasattr(suite, '_tests'), \ |
+ "%s.%s is not a valid TestSuite" % (module.__name__, suitename) |
+ # python2.3 does not implement __iter__ on suites, we need to return |
+ # _tests explicitly |
+ return suite._tests |
+ |
+ def loadTestsFromName(self, name, module=None): |
+ parts = name.split('.') |
+ if module is None or len(parts) > 2: |
+ # let the base class do its job here |
+ return [super(NonStrictTestLoader, self).loadTestsFromName(name)] |
+ tests = self._collect_tests(module) |
+ collected = [] |
+ if len(parts) == 1: |
+ pattern = parts[0] |
+ if callable(getattr(module, pattern, None) |
+ ) and pattern not in tests: |
+ # consider it as a suite |
+ return self.loadTestsFromSuite(module, pattern) |
+ if pattern in tests: |
+ # case python unittest_foo.py MyTestTC |
+ klass, methodnames = tests[pattern] |
+ for methodname in methodnames: |
+ collected = [klass(methodname) |
+ for methodname in methodnames] |
+ else: |
+ # case python unittest_foo.py something |
+ for klass, methodnames in tests.values(): |
+ # skip methodname if matched by skipped_patterns |
+ for skip_pattern in self.skipped_patterns: |
+ methodnames = [methodname |
+ for methodname in methodnames |
+ if skip_pattern not in methodname] |
+ collected += [klass(methodname) |
+ for methodname in methodnames |
+ if pattern in methodname] |
+ elif len(parts) == 2: |
+ # case "MyClass.test_1" |
+ classname, pattern = parts |
+ klass, methodnames = tests.get(classname, (None, [])) |
+ for methodname in methodnames: |
+ collected = [klass(methodname) for methodname in methodnames |
+ if pattern in methodname] |
+ return collected |
+ |
+ def _this_is_skipped(self, testedname): |
+ return any([(pat in testedname) for pat in self.skipped_patterns]) |
+ |
+ def getTestCaseNames(self, testCaseClass): |
+ """Return a sorted sequence of method names found within testCaseClass |
+ """ |
+ is_skipped = self._this_is_skipped |
+ classname = testCaseClass.__name__ |
+ if classname[0] == '_' or is_skipped(classname): |
+ return [] |
+ testnames = super(NonStrictTestLoader, self).getTestCaseNames( |
+ testCaseClass) |
+ return [testname for testname in testnames if not is_skipped(testname)] |
+ |
+def _ts_run(self, result, runcondition=None, options=None): |
+ self._wrapped_run(result,runcondition=runcondition, options=options) |
+ self._tearDownPreviousClass(None, result) |
+ self._handleModuleTearDown(result) |
+ return result |
+ |
+def _ts_wrapped_run(self, result, debug=False, runcondition=None, options=None): |
+ for test in self: |
+ if result.shouldStop: |
+ break |
+ if unittest_suite._isnotsuite(test): |
+ self._tearDownPreviousClass(test, result) |
+ self._handleModuleFixture(test, result) |
+ self._handleClassSetUp(test, result) |
+ result._previousTestClass = test.__class__ |
+ if (getattr(test.__class__, '_classSetupFailed', False) or |
+ getattr(result, '_moduleSetUpFailed', False)): |
+ continue |
+ |
+ if hasattr(test, '_wrapped_run'): |
+ try: |
+ test._wrapped_run(result, debug, runcondition=runcondition, options=options) |
+ except TypeError: |
+ test._wrapped_run(result, debug) |
+ elif not debug: |
+ try: |
+ test(result, runcondition, options) |
+ except TypeError: |
+ test(result) |
+ else: |
+ test.debug() |
+ |
+ |
+def enable_dbc(*args): |
+ """ |
+ Without arguments, return True if contracts can be enabled and should be |
+ enabled (see option -d), return False otherwise. |
+ |
+ With arguments, return False if contracts can't or shouldn't be enabled, |
+ otherwise weave ContractAspect with items passed as arguments. |
+ """ |
+ if not ENABLE_DBC: |
+ return False |
+ try: |
+ from logilab.aspects.weaver import weaver |
+ from logilab.aspects.lib.contracts import ContractAspect |
+ except ImportError: |
+ sys.stderr.write( |
+ 'Warning: logilab.aspects is not available. Contracts disabled.') |
+ return False |
+ for arg in args: |
+ weaver.weave_module(arg, ContractAspect) |
+ return True |
+ |
+ |
+# monkeypatch unittest and doctest (ouch !) |
+unittest._TextTestResult = testlib.SkipAwareTestResult |
+unittest.TextTestRunner = SkipAwareTextTestRunner |
+unittest.TestLoader = NonStrictTestLoader |
+unittest.TestProgram = SkipAwareTestProgram |
+ |
+if sys.version_info >= (2, 4): |
+ doctest.DocTestCase.__bases__ = (testlib.TestCase,) |
+ # XXX check python2.6 compatibility |
+ #doctest.DocTestCase._cleanups = [] |
+ #doctest.DocTestCase._out = [] |
+else: |
+ unittest.FunctionTestCase.__bases__ = (testlib.TestCase,) |
+unittest.TestSuite.run = _ts_run |
+unittest.TestSuite._wrapped_run = _ts_wrapped_run |