OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. |
| 4 |
| 5 """This file contains post process filters for use with the |
| 6 RecipeTestApi.post_process method in GenTests. |
| 7 """ |
| 8 |
| 9 import re |
| 10 |
| 11 from collections import defaultdict, OrderedDict, namedtuple |
| 12 |
| 13 |
| 14 class Filter(object): |
| 15 """Filter is an implementation of a post_process callable which can remove |
| 16 unwanted data from a step OrderedDict.""" |
| 17 _reEntry = namedtuple('_reEntry', 'at_most at_least fields') |
| 18 |
| 19 def __init__(self, *steps): |
| 20 """Builds a new Filter object. It may be optionally prepopulated by |
| 21 specifying steps. |
| 22 |
| 23 Usage: |
| 24 f = Filter('step_a', 'step_b') |
| 25 yield TEST + api.post_process(f) |
| 26 |
| 27 f = f.include('other_step') |
| 28 yield TEST + api.post_process(f) |
| 29 |
| 30 yield TEST + api.post_process(Filter('step_a', 'step_b', 'other_step')) |
| 31 """ |
| 32 self.data = {name: () for name in steps} |
| 33 self.re_data = {} |
| 34 |
| 35 def __call__(self, check, step_odict): |
| 36 unused_includes = self.data.copy() |
| 37 re_data = self.re_data.copy() |
| 38 |
| 39 re_usage_count = defaultdict(int) |
| 40 |
| 41 to_ret = OrderedDict() |
| 42 for name, step in step_odict.iteritems(): |
| 43 field_set = unused_includes.pop(name, None) |
| 44 if field_set is None: |
| 45 for exp, (_, _, fset) in re_data.iteritems(): |
| 46 if exp.match(name): |
| 47 re_usage_count[exp] += 1 |
| 48 field_set = fset |
| 49 break |
| 50 if field_set is None: |
| 51 continue |
| 52 if len(field_set) == 0: |
| 53 to_ret[name] = step |
| 54 else: |
| 55 to_ret[name] = { |
| 56 k: v for k, v in step.iteritems() |
| 57 if k in field_set or k == 'name' |
| 58 } |
| 59 |
| 60 check(len(unused_includes) == 0) |
| 61 |
| 62 for regex, (at_least, at_most, _) in re_data.iteritems(): |
| 63 check(re_usage_count[regex] >= at_least) |
| 64 if at_most is not None: |
| 65 check(re_usage_count[regex] <= at_most) |
| 66 |
| 67 return to_ret |
| 68 |
| 69 def include(self, step_name, fields=()): |
| 70 """Include adds a step to the included steps set. |
| 71 |
| 72 Additionally, if any specified fields are provided, they will be the total |
| 73 set of fields in the filtered step. The 'name' field is always included. If |
| 74 fields is omitted, the entire step will be included. |
| 75 |
| 76 Args: |
| 77 step_name (str) - The name of the step to include |
| 78 fields (list(str)) - The field(s) to include. Omit to include all fields. |
| 79 |
| 80 Returns the new filter. |
| 81 """ |
| 82 if isinstance(fields, basestring): |
| 83 raise ValueError('Expected fields to be a non-string iterable') |
| 84 new_data = self.data.copy() |
| 85 new_data[step_name] = frozenset(fields) |
| 86 ret = Filter() |
| 87 ret.data = new_data |
| 88 ret.re_data = self.re_data |
| 89 return ret |
| 90 |
| 91 def include_re(self, step_name_re, fields=(), at_least=1, at_most=None): |
| 92 """This includes all steps which match the given regular expression. |
| 93 |
| 94 If a step matches both an include() directive as well as include_re(), the |
| 95 include() directive will take precedence. |
| 96 |
| 97 Args: |
| 98 step_name_re (str or regex) - the regular expression of step names to |
| 99 match. |
| 100 fields (list(str)) - the field(s) to include in the matched steps. Omit to |
| 101 include all fields. |
| 102 at_least (int) - the number of steps that this regular expression MUST |
| 103 match. |
| 104 at_most (int) - the maximum number of steps that this regular expression |
| 105 MUST NOT exceed. |
| 106 |
| 107 Returns the new filter. |
| 108 """ |
| 109 if isinstance(fields, basestring): |
| 110 raise ValueError('Expected fields to be a non-string iterable') |
| 111 new_re_data = self.re_data.copy() |
| 112 new_re_data[re.compile(step_name_re)] = Filter._reEntry( |
| 113 at_least, at_most, frozenset(fields)) |
| 114 |
| 115 ret = Filter() |
| 116 ret.data = self.data |
| 117 ret.re_data = new_re_data |
| 118 return ret |
| 119 |
| 120 |
| 121 def DoesNotRun(check, step_odict, *steps): |
| 122 """Asserts that the given steps don't run. |
| 123 |
| 124 Usage: |
| 125 yield TEST + api.post_process(DoesNotRun, 'step_a', 'step_b') |
| 126 |
| 127 """ |
| 128 banSet = set(steps) |
| 129 for step_name in step_odict: |
| 130 check(step_name not in banSet) |
| 131 |
| 132 |
| 133 def DoesNotRunRE(check, step_odict, *step_regexes): |
| 134 """Asserts that no steps matching any of the regexes have run. |
| 135 |
| 136 Args: |
| 137 step_regexes (str) - The step name regexes to ban. |
| 138 |
| 139 Usage: |
| 140 yield TEST + api.post_process(DoesNotRunRE, '.*with_patch.*', '.*compile.*') |
| 141 |
| 142 """ |
| 143 step_regexes = [re.compile(r) for r in step_regexes] |
| 144 for step_name in step_odict: |
| 145 for r in step_regexes: |
| 146 check(not r.match(step_name)) |
| 147 |
| 148 |
| 149 def MustRun(check, step_odict, *steps): |
| 150 """Asserts that steps with the given names are in the expectations. |
| 151 |
| 152 Args: |
| 153 steps (str) - The steps that must have run. |
| 154 |
| 155 Usage: |
| 156 yield TEST + api.post_process(MustRun, 'step_a', 'step_b') |
| 157 """ |
| 158 for step_name in steps: |
| 159 check(step_name in step_odict) |
| 160 |
| 161 |
| 162 def MustRunRE(check, step_odict, step_regex, at_least=1, at_most=None): |
| 163 """Assert that steps matching the given regex completely are in the |
| 164 exepectations. |
| 165 |
| 166 Args: |
| 167 step_regex (str, compiled regex) - The regular expression to match. |
| 168 at_least (int) - Match at least this many steps. Matching fewer than this |
| 169 is a CHECK failure. |
| 170 at_most (int) - Optional upper bound on the number of matches. Matching |
| 171 more than this is a CHECK failure. |
| 172 |
| 173 Usage: |
| 174 yield TEST + api.post_process(MustRunRE, r'.*with_patch.*', at_most=2) |
| 175 """ |
| 176 step_regex = re.compile(step_regex) |
| 177 matches = 0 |
| 178 for step_name in step_odict: |
| 179 if step_regex.match(step_name): |
| 180 matches += 1 |
| 181 check(matches >= at_least) |
| 182 if at_most is not None: |
| 183 check(matches <= at_most) |
| 184 |
| 185 |
| 186 def DropExpectation(_check, _step_odict): |
| 187 """Using this post-process hook will drop the expectations for this test |
| 188 completely. |
| 189 |
| 190 Usage: |
| 191 yield TEST + api.post_process(DropExpectation) |
| 192 |
| 193 """ |
| 194 return {} |
OLD | NEW |