OLD | NEW |
---|---|
1 # Copyright 2014 The Chromium Authors. All rights reserved. | 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import Queue | 5 import Queue |
6 import atexit | |
6 import glob | 7 import glob |
7 import inspect | 8 import inspect |
8 import logging | 9 import logging |
9 import multiprocessing | 10 import multiprocessing |
10 import re | 11 import re |
11 import signal | 12 import signal |
12 import traceback | 13 import traceback |
13 | 14 |
14 from cStringIO import StringIO | 15 from cStringIO import StringIO |
15 | 16 |
16 from .type_definitions import ( | 17 from .type_definitions import ( |
17 Test, UnknownError, TestError, NoMatchingTestsError, MultiTest, | 18 Test, UnknownError, TestError, NoMatchingTestsError, MultiTest, |
18 Result, ResultStageAbort) | 19 Result, ResultStageAbort, Cleanup) |
19 | 20 |
20 | 21 |
21 class ResetableStringIO(object): | 22 class ResetableStringIO(object): |
22 def __init__(self): | 23 def __init__(self): |
23 self._stream = StringIO() | 24 self._stream = StringIO() |
24 | 25 |
25 def reset(self): | 26 def reset(self): |
26 self._stream = StringIO() | 27 self._stream = StringIO() |
27 | 28 |
28 def __getattr__(self, key): | 29 def __getattr__(self, key): |
29 return getattr(self._stream, key) | 30 return getattr(self._stream, key) |
30 | 31 |
31 | 32 |
32 def gen_loop_process(gen, test_queue, result_queue, opts, kill_switch, | 33 def gen_loop_process(gens, test_queue, result_queue, opts, kill_switch, |
33 cover_ctx): | 34 cover_ctx): |
34 """Generate `Test`'s from |gen|, and feed them into |test_queue|. | 35 """Generate `Test`'s from |gens|, and feed them into |test_queue|. |
35 | 36 |
36 Non-Test instances will be translated into `UnknownError` objects. | 37 Non-Test instances will be translated into `UnknownError` objects. |
37 | 38 |
38 On completion, feed |opts.jobs| None objects into |test_queue|. | 39 On completion, feed |opts.jobs| None objects into |test_queue|. |
39 | 40 |
40 @param gen: generator yielding Test() instances. | 41 @param gens: list of generators yielding Test() instances. |
41 @type test_queue: multiprocessing.Queue() | 42 @type test_queue: multiprocessing.Queue() |
42 @type result_queue: multiprocessing.Queue() | 43 @type result_queue: multiprocessing.Queue() |
43 @type opts: argparse.Namespace | 44 @type opts: argparse.Namespace |
44 @type kill_switch: multiprocessing.Event() | 45 @type kill_switch: multiprocessing.Event() |
45 @type cover_ctx: cover.CoverageContext().create_subprocess_context() | 46 @type cover_ctx: cover.CoverageContext().create_subprocess_context() |
46 """ | 47 """ |
47 # Implicitly append '*'' to globs that don't specify it. | 48 # Implicitly append '*'' to globs that don't specify it. |
48 globs = ['%s%s' % (g, '*' if '*' not in g else '') for g in opts.test_glob] | 49 globs = ['%s%s' % (g, '*' if '*' not in g else '') for g in opts.test_glob] |
49 | 50 |
50 matcher = re.compile( | 51 matcher = re.compile( |
51 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g) | 52 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g) |
52 for g in globs if g[0] != '-')) | 53 for g in globs if g[0] != '-')) |
53 if matcher.pattern == '^$': | 54 if matcher.pattern == '^$': |
54 matcher = re.compile('^.*$') | 55 matcher = re.compile('^.*$') |
55 | 56 |
56 neg_matcher = re.compile( | 57 neg_matcher = re.compile( |
57 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g[1:]) | 58 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g[1:]) |
58 for g in globs if g[0] == '-')) | 59 for g in globs if g[0] == '-')) |
59 | 60 |
61 SENTINEL = object() | |
62 | |
60 def generate_tests(): | 63 def generate_tests(): |
61 paths_seen = set() | 64 paths_seen = set() |
62 seen_tests = False | 65 seen_tests = False |
63 try: | 66 try: |
64 with cover_ctx: | 67 for gen in gens: |
65 gen_inst = gen() | 68 if hasattr(gen, '_covers'): |
69 # decorated with expect_tests.covers | |
70 gen_cover = gen._covers() # pylint: disable=W0212 | |
71 else: | |
72 gen_cover = [inspect.getabsfile(gen)] | |
73 gen_cover_ctx = cover_ctx(include=gen_cover) | |
66 | 74 |
67 while not kill_switch.is_set(): | 75 with gen_cover_ctx: |
68 with cover_ctx: | 76 gen_inst = gen() |
69 root_test = next(gen_inst) | |
70 | 77 |
71 if kill_switch.is_set(): | 78 while not kill_switch.is_set(): |
72 break | 79 with gen_cover_ctx: |
80 root_test = next(gen_inst, SENTINEL) | |
73 | 81 |
74 ok_tests = [] | 82 if root_test is SENTINEL: |
83 break | |
75 | 84 |
76 if isinstance(root_test, MultiTest): | 85 if kill_switch.is_set(): |
77 subtests = root_test.tests | 86 break |
78 else: | |
79 subtests = [root_test] | |
80 | 87 |
81 for subtest in subtests: | 88 ok_tests = [] |
82 if not isinstance(subtest, Test): | 89 |
83 result_queue.put_nowait( | 90 if isinstance(root_test, Cleanup): |
84 UnknownError('Got non-[Multi]Test isinstance from generator: %r' | 91 result_queue.put_nowait(root_test) |
85 % subtest)) | |
86 continue | 92 continue |
87 | 93 |
88 test_path = subtest.expect_path() | 94 if isinstance(root_test, MultiTest): |
89 if test_path is not None and test_path in paths_seen: | 95 subtests = root_test.tests |
90 result_queue.put_nowait( | |
91 TestError(subtest, 'Duplicate expectation path!')) | |
92 else: | 96 else: |
93 if test_path is not None: | 97 subtests = [root_test] |
94 paths_seen.add(test_path) | |
95 name = subtest.name | |
96 if not neg_matcher.match(name) and matcher.match(name): | |
97 ok_tests.append(subtest) | |
98 | 98 |
99 if ok_tests: | 99 for subtest in subtests: |
100 seen_tests = True | 100 if not isinstance(subtest, Test): |
101 yield root_test.restrict(ok_tests) | 101 result_queue.put_nowait( |
102 UnknownError( | |
103 'Got non-[Multi]Test isinstance from generator: %r' | |
104 % subtest)) | |
105 continue | |
106 | |
107 test_path = subtest.expect_path() | |
108 if test_path is not None and test_path in paths_seen: | |
109 result_queue.put_nowait( | |
110 TestError(subtest, 'Duplicate expectation path!')) | |
111 else: | |
112 if test_path is not None: | |
113 paths_seen.add(test_path) | |
114 name = subtest.name | |
115 if not neg_matcher.match(name) and matcher.match(name): | |
116 ok_tests.append(subtest) | |
117 | |
118 if ok_tests: | |
119 seen_tests = True | |
120 yield root_test.restrict(ok_tests) | |
102 | 121 |
103 if not seen_tests: | 122 if not seen_tests: |
104 result_queue.put_nowait(NoMatchingTestsError()) | 123 result_queue.put_nowait(NoMatchingTestsError()) |
105 except StopIteration: | |
106 pass | |
107 except KeyboardInterrupt: | 124 except KeyboardInterrupt: |
108 pass | 125 pass |
109 finally: | 126 finally: |
110 for _ in xrange(opts.jobs): | 127 for _ in xrange(opts.jobs): |
111 test_queue.put_nowait(None) | 128 test_queue.put_nowait(None) |
112 | 129 |
113 | 130 |
114 next_stage = (result_queue if opts.handler.SKIP_RUNLOOP else test_queue) | 131 next_stage = (result_queue if opts.handler.SKIP_RUNLOOP else test_queue) |
115 opts.handler.gen_stage_loop(opts, generate_tests(), next_stage.put_nowait, | 132 opts.handler.gen_stage_loop(opts, generate_tests(), next_stage.put_nowait, |
116 result_queue.put_nowait) | 133 result_queue.put_nowait) |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
170 result_queue.put_nowait( | 187 result_queue.put_nowait( |
171 TestError(test, traceback.format_exc(), | 188 TestError(test, traceback.format_exc(), |
172 logstream.getvalue().splitlines())) | 189 logstream.getvalue().splitlines())) |
173 except KeyboardInterrupt: | 190 except KeyboardInterrupt: |
174 pass | 191 pass |
175 | 192 |
176 opts.handler.run_stage_loop(opts, generate_tests_results(), | 193 opts.handler.run_stage_loop(opts, generate_tests_results(), |
177 result_queue.put_nowait) | 194 result_queue.put_nowait) |
178 | 195 |
179 | 196 |
180 def result_loop(test_gen, cover_ctx, opts): | 197 def result_loop(test_gens, cover_ctx, opts): |
181 kill_switch = multiprocessing.Event() | 198 kill_switch = multiprocessing.Event() |
182 def handle_killswitch(*_): | 199 def handle_killswitch(*_): |
183 kill_switch.set() | 200 kill_switch.set() |
184 # Reset the signal to DFL so that double ctrl-C kills us for sure. | 201 # Reset the signal to DFL so that double ctrl-C kills us for sure. |
185 signal.signal(signal.SIGINT, signal.SIG_DFL) | 202 signal.signal(signal.SIGINT, signal.SIG_DFL) |
186 signal.signal(signal.SIGTERM, signal.SIG_DFL) | 203 signal.signal(signal.SIGTERM, signal.SIG_DFL) |
187 signal.signal(signal.SIGINT, handle_killswitch) | 204 signal.signal(signal.SIGINT, handle_killswitch) |
188 signal.signal(signal.SIGTERM, handle_killswitch) | 205 signal.signal(signal.SIGTERM, handle_killswitch) |
189 | 206 |
190 test_queue = multiprocessing.Queue() | 207 test_queue = multiprocessing.Queue() |
191 result_queue = multiprocessing.Queue() | 208 result_queue = multiprocessing.Queue() |
192 | 209 |
193 gen_cover_ctx = cover_ctx | |
194 if cover_ctx.enabled: | |
195 if hasattr(test_gen, '_covers'): | |
196 # decorated with expect_tests.covers | |
197 gen_cover = test_gen._covers() # pylint: disable=W0212 | |
198 else: | |
199 gen_cover = [inspect.getabsfile(test_gen)] | |
200 gen_cover_ctx = cover_ctx(include=gen_cover) | |
201 | |
202 test_gen_args = ( | 210 test_gen_args = ( |
203 test_gen, test_queue, result_queue, opts, kill_switch, gen_cover_ctx) | 211 test_gens, test_queue, result_queue, opts, kill_switch, cover_ctx) |
204 | 212 |
205 procs = [] | 213 procs = [] |
206 if opts.handler.SKIP_RUNLOOP: | 214 if opts.handler.SKIP_RUNLOOP: |
207 gen_loop_process(*test_gen_args) | 215 gen_loop_process(*test_gen_args) |
208 else: | 216 else: |
209 procs = [multiprocessing.Process( | 217 procs = [multiprocessing.Process( |
210 target=gen_loop_process, args=test_gen_args)] | 218 target=gen_loop_process, args=test_gen_args)] |
211 | 219 |
212 procs += [ | 220 procs += [ |
213 multiprocessing.Process( | 221 multiprocessing.Process( |
214 target=run_loop_process, args=( | 222 target=run_loop_process, args=( |
215 test_queue, result_queue, opts, kill_switch, cover_ctx)) | 223 test_queue, result_queue, opts, kill_switch, cover_ctx)) |
216 for _ in xrange(opts.jobs) | 224 for _ in xrange(opts.jobs) |
217 ] | 225 ] |
218 | 226 |
219 for p in procs: | 227 for p in procs: |
220 p.daemon = True | 228 p.daemon = True |
221 p.start() | 229 p.start() |
222 | 230 |
223 error = False | 231 error = False |
224 try: | 232 try: |
225 def generate_objects(): | 233 def generate_objects(): |
226 while not kill_switch.is_set(): | 234 while not kill_switch.is_set(): |
227 while not kill_switch.is_set(): | 235 while not kill_switch.is_set(): |
228 try: | 236 try: |
229 yield result_queue.get(timeout=0.1) | 237 obj = result_queue.get(timeout=0.1) |
238 if isinstance(obj, Cleanup): | |
239 atexit.register(obj.func_call) | |
Vadim Sh.
2014/06/27 21:40:08
em... so func_call is crossing process boundaries?
iannucci
2014/06/28 16:22:17
yep. I'm using this to delete the temporary direct
Vadim Sh.
2014/06/30 17:15:46
I hope it's clearly marked in docstring that clean
| |
240 else: | |
241 yield obj | |
230 except Queue.Empty: | 242 except Queue.Empty: |
231 break | 243 break |
232 | 244 |
233 if not any(p.is_alive() for p in procs): | 245 if not any(p.is_alive() for p in procs): |
234 break | 246 break |
235 | 247 |
236 # Get everything still in the queue. Still need timeout, but since nothing | 248 # Get everything still in the queue. Still need timeout, but since nothing |
237 # is going to be adding stuff to the queue, use a very short timeout. | 249 # is going to be adding stuff to the queue, use a very short timeout. |
238 while not kill_switch.is_set(): | 250 while not kill_switch.is_set(): |
239 try: | 251 try: |
240 yield result_queue.get(timeout=0.00001) | 252 yield result_queue.get(timeout=0.00001) |
241 except Queue.Empty: | 253 except Queue.Empty: |
242 break | 254 break |
243 | 255 |
244 if kill_switch.is_set(): | 256 if kill_switch.is_set(): |
245 raise ResultStageAbort() | 257 raise ResultStageAbort() |
246 error = opts.handler.result_stage_loop(opts, generate_objects()) | 258 error = opts.handler.result_stage_loop(opts, generate_objects()) |
247 except ResultStageAbort: | 259 except ResultStageAbort: |
248 pass | 260 pass |
249 | 261 |
250 for p in procs: | 262 for p in procs: |
251 p.join() | 263 p.join() |
252 | 264 |
253 if not kill_switch.is_set() and not result_queue.empty(): | 265 if not kill_switch.is_set() and not result_queue.empty(): |
254 error = True | 266 error = True |
255 | 267 |
256 return error, kill_switch.is_set() | 268 return error, kill_switch.is_set() |
OLD | NEW |