OLD | NEW |
---|---|
(Empty) | |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import Queue | |
6 import glob | |
7 import multiprocessing | |
8 import re | |
9 import signal | |
10 | |
11 from .types import Test, UnknownError, TestError, Result, ResultStageAbort | |
12 | |
13 | |
14 def GenLoopProcess(gen, test_queue, result_queue, opts, kill_switch, cover_ctx): | |
15 """Generate `Test`'s from |gen|, and feed them into |test_queue|. | |
16 | |
17 Non-Test instances will be translated into `UnknownError` objects. | |
18 | |
19 On completion, feed |opts.jobs| None objects into |test_queue|. | |
20 | |
21 @param gen: generator yielding Test() instances. | |
22 @type test_queue: multiprocessing.Queue() | |
23 @type result_queue: multiprocessing.Queue() | |
24 @type opts: argparse.Namespace | |
25 @type kill_switch: multiprocessing.Event() | |
26 @type match_globs: [str] | |
27 @type cover_ctx: cover.CoverageContext().create_subprocess_context() | |
28 @type handler: types.Handler | |
29 """ | |
30 matcher = re.compile( | |
31 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g) | |
32 for g in opts.test_glob if g[0] != '-')) | |
33 if matcher.pattern == '^$': | |
34 matcher = re.compile('^.*$') | |
35 | |
36 neg_matcher = re.compile( | |
37 '^%s$' % '|'.join('(?:%s)' % glob.fnmatch.translate(g[1:]) | |
38 for g in opts.test_glob if g[0] == '-')) | |
39 | |
40 def generate_tests(): | |
41 try: | |
42 for test in gen(): | |
43 if kill_switch.is_set(): | |
44 break | |
45 | |
46 if not isinstance(test, Test): | |
47 result_queue.put_nowait( | |
48 UnknownError( | |
49 'Got non-Test isinstance from generator: %r' % test)) | |
50 continue | |
51 | |
52 if not neg_matcher.match(test.name) and matcher.match(test.name): | |
53 yield test | |
54 except KeyboardInterrupt: | |
55 pass | |
56 finally: | |
57 for _ in xrange(opts.jobs): | |
58 test_queue.put_nowait(None) | |
59 | |
60 | |
61 next_stage = (result_queue if opts.handler.SKIP_RUNLOOP else test_queue) | |
62 with cover_ctx: | |
63 opts.handler.gen_stage_loop(opts, generate_tests(), next_stage.put_nowait, | |
64 result_queue.put_nowait) | |
65 | |
66 | |
67 def RunLoopProcess(test_queue, result_queue, opts, kill_switch, cover_ctx): | |
68 """Consume `Test` instances from |test_queue|, run them, and yield the results | |
69 into opts.run_stage_loop(). | |
70 | |
71 Generates coverage data as a side-effect. | |
72 @type test_queue: multiprocessing.Queue() | |
73 @type result_queue: multiprocessing.Queue() | |
74 @type opts: argparse.Namespace | |
75 @type kill_switch: multiprocessing.Event() | |
76 @type cover_ctx: cover.CoverageContext().create_subprocess_context() | |
77 """ | |
78 def generate_tests_results(): | |
79 try: | |
80 while not kill_switch.is_set(): | |
81 try: | |
82 test = test_queue.get(timeout=0.1) | |
83 if test is None: | |
84 break | |
85 except Queue.Empty: | |
86 continue | |
87 | |
88 try: | |
89 result = test.run() | |
90 if not isinstance(result, Result): | |
91 result_queue.put_nowait( | |
92 TestError(test, 'Got non-Result instance from test: %r' | |
93 % result)) | |
94 continue | |
95 | |
96 yield test, result | |
97 except Exception as e: | |
98 # TODO(iannucci): include stacktrace | |
99 result_queue.put_nowait(TestError(test, str(e))) | |
100 except KeyboardInterrupt: | |
101 pass | |
102 | |
103 with cover_ctx: | |
104 opts.handler.run_stage_loop(opts, generate_tests_results(), | |
105 result_queue.put_nowait) | |
106 | |
107 | |
108 def ResultLoop(test_gen, cover_ctx, opts): | |
109 kill_switch = multiprocessing.Event() | |
110 def handle_killswitch(*_): | |
111 kill_switch.set() | |
112 signal.signal(signal.SIGINT, signal.SIG_DFL) | |
Vadim Sh.
2014/04/02 22:37:59
why setting to DFL? To die on double Ctrl+C?
iannucci
2014/04/03 00:03:49
Yeah, added comment.
| |
113 signal.signal(signal.SIGTERM, signal.SIG_DFL) | |
114 signal.signal(signal.SIGINT, handle_killswitch) | |
115 signal.signal(signal.SIGTERM, handle_killswitch) | |
116 | |
117 test_queue = multiprocessing.Queue() | |
118 result_queue = multiprocessing.Queue() | |
119 | |
120 test_gen_args = ( | |
121 test_gen, test_queue, result_queue, opts, kill_switch, cover_ctx) | |
122 | |
123 procs = [] | |
124 if opts.handler.SKIP_RUNLOOP: | |
125 GenLoopProcess(*test_gen_args) | |
126 class DeadProc(object): | |
127 """It's alive exactly once.""" | |
Vadim Sh.
2014/04/02 22:37:59
ugh...
iannucci
2014/04/03 00:03:49
Otherwise the generate_objects loop was racy :(. T
| |
128 alive = True | |
129 def is_alive(self): | |
130 ret = self.alive | |
131 self.alive = False | |
132 return ret | |
133 # This is so we can write generate_objects() in a non-racy way | |
134 procs = [DeadProc()] | |
135 else: | |
136 procs = [multiprocessing.Process( | |
137 target=GenLoopProcess, args=test_gen_args)] | |
138 | |
139 procs += [ | |
140 multiprocessing.Process( | |
141 target=RunLoopProcess, args=( | |
142 test_queue, result_queue, opts, kill_switch, cover_ctx)) | |
143 for _ in xrange(opts.jobs) | |
144 ] | |
145 | |
146 for p in procs: | |
147 p.daemon = True | |
148 p.start() | |
149 | |
150 error = False | |
151 try: | |
152 def generate_objects(): | |
153 while not kill_switch.is_set(): | |
154 if not any(p.is_alive() for p in procs): | |
155 break | |
156 | |
157 while not kill_switch.is_set(): | |
158 try: | |
159 yield result_queue.get(timeout=0.1) | |
160 except Queue.Empty: | |
161 break | |
162 if kill_switch.is_set(): | |
163 raise ResultStageAbort() | |
164 error = opts.handler.result_stage_loop(opts, generate_objects()) | |
165 except ResultStageAbort: | |
166 pass | |
167 | |
168 if not kill_switch.is_set() and not result_queue.empty(): | |
169 error = True | |
170 | |
171 return error, kill_switch.is_set() | |
OLD | NEW |