OLD | NEW |
1 # Copyright (C) 2011 Google Inc. All rights reserved. | 1 # Copyright (C) 2011 Google Inc. All rights reserved. |
2 # | 2 # |
3 # Redistribution and use in source and binary forms, with or without | 3 # Redistribution and use in source and binary forms, with or without |
4 # modification, are permitted provided that the following conditions are | 4 # modification, are permitted provided that the following conditions are |
5 # met: | 5 # met: |
6 # | 6 # |
7 # * Redistributions of source code must retain the above copyright | 7 # * Redistributions of source code must retain the above copyright |
8 # notice, this list of conditions and the following disclaimer. | 8 # notice, this list of conditions and the following disclaimer. |
9 # * Redistributions in binary form must reproduce the above | 9 # * Redistributions in binary form must reproduce the above |
10 # copyright notice, this list of conditions and the following disclaimer | 10 # copyright notice, this list of conditions and the following disclaimer |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
65 self._options = options | 65 self._options = options |
66 self._port = port | 66 self._port = port |
67 self._printer = printer | 67 self._printer = printer |
68 self._results_directory = results_directory | 68 self._results_directory = results_directory |
69 self._test_is_slow = test_is_slow_fn | 69 self._test_is_slow = test_is_slow_fn |
70 self._sharder = Sharder(self._port.split_test, self._options.max_locked_
shards) | 70 self._sharder = Sharder(self._port.split_test, self._options.max_locked_
shards) |
71 self._filesystem = self._port.host.filesystem | 71 self._filesystem = self._port.host.filesystem |
72 | 72 |
73 self._expectations = None | 73 self._expectations = None |
74 self._test_inputs = [] | 74 self._test_inputs = [] |
75 self._needs_http = None | |
76 self._needs_websockets = None | |
77 self._retrying = False | 75 self._retrying = False |
78 | 76 |
79 self._current_run_results = None | 77 self._current_run_results = None |
80 self._remaining_locked_shards = [] | |
81 self._has_http_lock = False | |
82 | 78 |
83 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, n
eeds_http, needs_websockets, retrying): | 79 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, r
etrying): |
84 self._expectations = expectations | 80 self._expectations = expectations |
85 self._test_inputs = test_inputs | 81 self._test_inputs = test_inputs |
86 self._needs_http = needs_http | |
87 self._needs_websockets = needs_websockets | |
88 self._retrying = retrying | 82 self._retrying = retrying |
89 | 83 |
90 # FIXME: rename all variables to test_run_results or some such ... | 84 # FIXME: rename all variables to test_run_results or some such ... |
91 run_results = TestRunResults(self._expectations, len(test_inputs) + len(
tests_to_skip)) | 85 run_results = TestRunResults(self._expectations, len(test_inputs) + len(
tests_to_skip)) |
92 self._current_run_results = run_results | 86 self._current_run_results = run_results |
93 self._remaining_locked_shards = [] | |
94 self._has_http_lock = False | |
95 self._printer.num_tests = len(test_inputs) | 87 self._printer.num_tests = len(test_inputs) |
96 self._printer.num_completed = 0 | 88 self._printer.num_completed = 0 |
97 | 89 |
98 if not retrying: | 90 if not retrying: |
99 self._printer.print_expected(run_results, self._expectations.get_tes
ts_with_result_type) | 91 self._printer.print_expected(run_results, self._expectations.get_tes
ts_with_result_type) |
100 | 92 |
101 for test_name in set(tests_to_skip): | 93 for test_name in set(tests_to_skip): |
102 result = test_results.TestResult(test_name) | 94 result = test_results.TestResult(test_name) |
103 result.type = test_expectations.SKIP | 95 result.type = test_expectations.SKIP |
104 run_results.add(result, expected=True, test_is_slow=self._test_is_sl
ow(test_name)) | 96 run_results.add(result, expected=True, test_is_slow=self._test_is_sl
ow(test_name)) |
105 | 97 |
106 self._printer.write_update('Sharding tests ...') | 98 self._printer.write_update('Sharding tests ...') |
107 locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs,
int(self._options.child_processes), self._options.fully_parallel) | 99 locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs,
int(self._options.child_processes), self._options.fully_parallel) |
108 | 100 |
109 # FIXME: We don't have a good way to coordinate the workers so that | 101 # We don't have a good way to coordinate the workers so that they don't |
110 # they don't try to run the shards that need a lock if we don't actually | 102 # try to run the shards that need a lock. The easiest solution is to |
111 # have the lock. The easiest solution at the moment is to grab the | 103 # run all of the locked shards first. |
112 # lock at the beginning of the run, and then run all of the locked | |
113 # shards first. This minimizes the time spent holding the lock, but | |
114 # means that we won't be running tests while we're waiting for the lock. | |
115 # If this becomes a problem in practice we'll need to change this. | |
116 | |
117 all_shards = locked_shards + unlocked_shards | 104 all_shards = locked_shards + unlocked_shards |
118 self._remaining_locked_shards = locked_shards | |
119 if self._port.requires_http_server() or locked_shards: | |
120 self.start_servers_with_lock(2 * min(num_workers, len(locked_shards)
)) | |
121 | |
122 num_workers = min(num_workers, len(all_shards)) | 105 num_workers = min(num_workers, len(all_shards)) |
123 self._printer.print_workers_and_shards(num_workers, len(all_shards), len
(locked_shards)) | 106 self._printer.print_workers_and_shards(num_workers, len(all_shards), len
(locked_shards)) |
124 | 107 |
125 if self._options.dry_run: | 108 if self._options.dry_run: |
126 return run_results | 109 return run_results |
127 | 110 |
128 self._printer.write_update('Starting %s ...' % grammar.pluralize('worker
', num_workers)) | 111 self._printer.write_update('Starting %s ...' % grammar.pluralize('worker
', num_workers)) |
129 | 112 |
130 try: | 113 try: |
131 with message_pool.get(self, self._worker_factory, num_workers, self.
_port.worker_startup_delay_secs(), self._port.host) as pool: | 114 with message_pool.get(self, self._worker_factory, num_workers, self.
_port.worker_startup_delay_secs(), self._port.host) as pool: |
132 pool.run(('test_list', shard.name, shard.test_inputs) for shard
in all_shards) | 115 pool.run(('test_list', shard.name, shard.test_inputs) for shard
in all_shards) |
133 except TestRunInterruptedException, e: | 116 except TestRunInterruptedException, e: |
134 _log.warning(e.reason) | 117 _log.warning(e.reason) |
135 run_results.interrupted = True | 118 run_results.interrupted = True |
136 except KeyboardInterrupt: | 119 except KeyboardInterrupt: |
137 self._printer.flush() | 120 self._printer.flush() |
138 self._printer.writeln('Interrupted, exiting ...') | 121 self._printer.writeln('Interrupted, exiting ...') |
139 raise | 122 raise |
140 except Exception, e: | 123 except Exception, e: |
141 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e
))) | 124 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e
))) |
142 raise | 125 raise |
143 finally: | |
144 self.stop_servers_with_lock() | |
145 | 126 |
146 return run_results | 127 return run_results |
147 | 128 |
148 def _worker_factory(self, worker_connection): | 129 def _worker_factory(self, worker_connection): |
149 results_directory = self._results_directory | 130 results_directory = self._results_directory |
150 if self._retrying: | 131 if self._retrying: |
151 self._filesystem.maybe_make_directory(self._filesystem.join(self._re
sults_directory, 'retries')) | 132 self._filesystem.maybe_make_directory(self._filesystem.join(self._re
sults_directory, 'retries')) |
152 results_directory = self._filesystem.join(self._results_directory, '
retries') | 133 results_directory = self._filesystem.join(self._results_directory, '
retries') |
153 return Worker(worker_connection, results_directory, self._options) | 134 return Worker(worker_connection, results_directory, self._options) |
154 | 135 |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
186 expected = self._expectations.matches_an_expected_result(result.test_nam
e, result.type, self._options.pixel_tests or result.reftest_type) | 167 expected = self._expectations.matches_an_expected_result(result.test_nam
e, result.type, self._options.pixel_tests or result.reftest_type) |
187 exp_str = self._expectations.get_expectations_string(result.test_name) | 168 exp_str = self._expectations.get_expectations_string(result.test_name) |
188 got_str = self._expectations.expectation_to_string(result.type) | 169 got_str = self._expectations.expectation_to_string(result.type) |
189 | 170 |
190 run_results.add(result, expected, self._test_is_slow(result.test_name)) | 171 run_results.add(result, expected, self._test_is_slow(result.test_name)) |
191 | 172 |
192 self._printer.print_finished_test(result, expected, exp_str, got_str) | 173 self._printer.print_finished_test(result, expected, exp_str, got_str) |
193 | 174 |
194 self._interrupt_if_at_failure_limits(run_results) | 175 self._interrupt_if_at_failure_limits(run_results) |
195 | 176 |
196 def start_servers_with_lock(self, number_of_servers): | |
197 self._printer.write_update('Acquiring http lock ...') | |
198 self._port.acquire_http_lock() | |
199 if self._needs_http: | |
200 self._printer.write_update('Starting HTTP server ...') | |
201 self._port.start_http_server(number_of_servers=number_of_servers) | |
202 if self._needs_websockets: | |
203 self._printer.write_update('Starting WebSocket server ...') | |
204 self._port.start_websocket_server() | |
205 self._has_http_lock = True | |
206 | |
207 def stop_servers_with_lock(self): | |
208 if self._has_http_lock: | |
209 if self._needs_http: | |
210 self._printer.write_update('Stopping HTTP server ...') | |
211 self._port.stop_http_server() | |
212 if self._needs_websockets: | |
213 self._printer.write_update('Stopping WebSocket server ...') | |
214 self._port.stop_websocket_server() | |
215 self._printer.write_update('Releasing server lock ...') | |
216 self._port.release_http_lock() | |
217 self._has_http_lock = False | |
218 | |
219 def handle(self, name, source, *args): | 177 def handle(self, name, source, *args): |
220 method = getattr(self, '_handle_' + name) | 178 method = getattr(self, '_handle_' + name) |
221 if method: | 179 if method: |
222 return method(source, *args) | 180 return method(source, *args) |
223 raise AssertionError('unknown message %s received from %s, args=%s' % (n
ame, source, repr(args))) | 181 raise AssertionError('unknown message %s received from %s, args=%s' % (n
ame, source, repr(args))) |
224 | 182 |
225 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): | 183 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): |
226 self._printer.print_started_test(test_input.test_name) | 184 self._printer.print_started_test(test_input.test_name) |
227 | 185 |
228 def _handle_finished_test_list(self, worker_name, list_name): | 186 def _handle_finished_test_list(self, worker_name, list_name): |
229 def find(name, test_lists): | 187 pass |
230 for i in range(len(test_lists)): | |
231 if test_lists[i].name == name: | |
232 return i | |
233 return -1 | |
234 | |
235 index = find(list_name, self._remaining_locked_shards) | |
236 if index >= 0: | |
237 self._remaining_locked_shards.pop(index) | |
238 if not self._remaining_locked_shards and not self._port.requires_htt
p_server(): | |
239 self.stop_servers_with_lock() | |
240 | 188 |
241 def _handle_finished_test(self, worker_name, result, log_messages=[]): | 189 def _handle_finished_test(self, worker_name, result, log_messages=[]): |
242 self._update_summary_with_result(self._current_run_results, result) | 190 self._update_summary_with_result(self._current_run_results, result) |
243 | 191 |
244 | 192 |
245 class Worker(object): | 193 class Worker(object): |
246 def __init__(self, caller, results_directory, options): | 194 def __init__(self, caller, results_directory, options): |
247 self._caller = caller | 195 self._caller = caller |
248 self._worker_number = caller.worker_number | 196 self._worker_number = caller.worker_number |
249 self._name = caller.name | 197 self._name = caller.name |
(...skipping 325 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
575 def split_at(seq, index): | 523 def split_at(seq, index): |
576 return (seq[:index], seq[index:]) | 524 return (seq[:index], seq[index:]) |
577 | 525 |
578 num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) | 526 num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) |
579 new_shards = [] | 527 new_shards = [] |
580 remaining_shards = old_shards | 528 remaining_shards = old_shards |
581 while remaining_shards: | 529 while remaining_shards: |
582 some_shards, remaining_shards = split_at(remaining_shards, num_old_p
er_new) | 530 some_shards, remaining_shards = split_at(remaining_shards, num_old_p
er_new) |
583 new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_sh
ards) + 1), extract_and_flatten(some_shards))) | 531 new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_sh
ards) + 1), extract_and_flatten(some_shards))) |
584 return new_shards | 532 return new_shards |
OLD | NEW |