| OLD | NEW |
| 1 # Copyright (C) 2011 Google Inc. All rights reserved. | 1 # Copyright (C) 2011 Google Inc. All rights reserved. |
| 2 # | 2 # |
| 3 # Redistribution and use in source and binary forms, with or without | 3 # Redistribution and use in source and binary forms, with or without |
| 4 # modification, are permitted provided that the following conditions are | 4 # modification, are permitted provided that the following conditions are |
| 5 # met: | 5 # met: |
| 6 # | 6 # |
| 7 # * Redistributions of source code must retain the above copyright | 7 # * Redistributions of source code must retain the above copyright |
| 8 # notice, this list of conditions and the following disclaimer. | 8 # notice, this list of conditions and the following disclaimer. |
| 9 # * Redistributions in binary form must reproduce the above | 9 # * Redistributions in binary form must reproduce the above |
| 10 # copyright notice, this list of conditions and the following disclaimer | 10 # copyright notice, this list of conditions and the following disclaimer |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 65 self._options = options | 65 self._options = options |
| 66 self._port = port | 66 self._port = port |
| 67 self._printer = printer | 67 self._printer = printer |
| 68 self._results_directory = results_directory | 68 self._results_directory = results_directory |
| 69 self._test_is_slow = test_is_slow_fn | 69 self._test_is_slow = test_is_slow_fn |
| 70 self._sharder = Sharder(self._port.split_test, self._options.max_locked_
shards) | 70 self._sharder = Sharder(self._port.split_test, self._options.max_locked_
shards) |
| 71 self._filesystem = self._port.host.filesystem | 71 self._filesystem = self._port.host.filesystem |
| 72 | 72 |
| 73 self._expectations = None | 73 self._expectations = None |
| 74 self._test_inputs = [] | 74 self._test_inputs = [] |
| 75 self._needs_http = None | |
| 76 self._needs_websockets = None | |
| 77 self._retrying = False | 75 self._retrying = False |
| 78 | 76 |
| 79 self._current_run_results = None | 77 self._current_run_results = None |
| 80 self._remaining_locked_shards = [] | |
| 81 self._has_http_lock = False | |
| 82 | 78 |
| 83 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, n
eeds_http, needs_websockets, retrying): | 79 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, r
etrying): |
| 84 self._expectations = expectations | 80 self._expectations = expectations |
| 85 self._test_inputs = test_inputs | 81 self._test_inputs = test_inputs |
| 86 self._needs_http = needs_http | |
| 87 self._needs_websockets = needs_websockets | |
| 88 self._retrying = retrying | 82 self._retrying = retrying |
| 89 | 83 |
| 90 # FIXME: rename all variables to test_run_results or some such ... | 84 # FIXME: rename all variables to test_run_results or some such ... |
| 91 run_results = TestRunResults(self._expectations, len(test_inputs) + len(
tests_to_skip)) | 85 run_results = TestRunResults(self._expectations, len(test_inputs) + len(
tests_to_skip)) |
| 92 self._current_run_results = run_results | 86 self._current_run_results = run_results |
| 93 self._remaining_locked_shards = [] | |
| 94 self._has_http_lock = False | |
| 95 self._printer.num_tests = len(test_inputs) | 87 self._printer.num_tests = len(test_inputs) |
| 96 self._printer.num_completed = 0 | 88 self._printer.num_completed = 0 |
| 97 | 89 |
| 98 if not retrying: | 90 if not retrying: |
| 99 self._printer.print_expected(run_results, self._expectations.get_tes
ts_with_result_type) | 91 self._printer.print_expected(run_results, self._expectations.get_tes
ts_with_result_type) |
| 100 | 92 |
| 101 for test_name in set(tests_to_skip): | 93 for test_name in set(tests_to_skip): |
| 102 result = test_results.TestResult(test_name) | 94 result = test_results.TestResult(test_name) |
| 103 result.type = test_expectations.SKIP | 95 result.type = test_expectations.SKIP |
| 104 run_results.add(result, expected=True, test_is_slow=self._test_is_sl
ow(test_name)) | 96 run_results.add(result, expected=True, test_is_slow=self._test_is_sl
ow(test_name)) |
| 105 | 97 |
| 106 self._printer.write_update('Sharding tests ...') | 98 self._printer.write_update('Sharding tests ...') |
| 107 locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs,
int(self._options.child_processes), self._options.fully_parallel) | 99 locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs,
int(self._options.child_processes), self._options.fully_parallel) |
| 108 | 100 |
| 109 # FIXME: We don't have a good way to coordinate the workers so that | 101 # We don't have a good way to coordinate the workers so that they don't |
| 110 # they don't try to run the shards that need a lock if we don't actually | 102 # try to run the shards that need a lock. The easiest solution is to |
| 111 # have the lock. The easiest solution at the moment is to grab the | 103 # run all of the locked shards first. |
| 112 # lock at the beginning of the run, and then run all of the locked | |
| 113 # shards first. This minimizes the time spent holding the lock, but | |
| 114 # means that we won't be running tests while we're waiting for the lock. | |
| 115 # If this becomes a problem in practice we'll need to change this. | |
| 116 | |
| 117 all_shards = locked_shards + unlocked_shards | 104 all_shards = locked_shards + unlocked_shards |
| 118 self._remaining_locked_shards = locked_shards | |
| 119 if self._port.requires_http_server() or locked_shards: | |
| 120 self.start_servers_with_lock(2 * min(num_workers, len(locked_shards)
)) | |
| 121 | |
| 122 num_workers = min(num_workers, len(all_shards)) | 105 num_workers = min(num_workers, len(all_shards)) |
| 123 self._printer.print_workers_and_shards(num_workers, len(all_shards), len
(locked_shards)) | 106 self._printer.print_workers_and_shards(num_workers, len(all_shards), len
(locked_shards)) |
| 124 | 107 |
| 125 if self._options.dry_run: | 108 if self._options.dry_run: |
| 126 return run_results | 109 return run_results |
| 127 | 110 |
| 128 self._printer.write_update('Starting %s ...' % grammar.pluralize('worker
', num_workers)) | 111 self._printer.write_update('Starting %s ...' % grammar.pluralize('worker
', num_workers)) |
| 129 | 112 |
| 130 try: | 113 try: |
| 131 with message_pool.get(self, self._worker_factory, num_workers, self.
_port.worker_startup_delay_secs(), self._port.host) as pool: | 114 with message_pool.get(self, self._worker_factory, num_workers, self.
_port.worker_startup_delay_secs(), self._port.host) as pool: |
| 132 pool.run(('test_list', shard.name, shard.test_inputs) for shard
in all_shards) | 115 pool.run(('test_list', shard.name, shard.test_inputs) for shard
in all_shards) |
| 133 except TestRunInterruptedException, e: | 116 except TestRunInterruptedException, e: |
| 134 _log.warning(e.reason) | 117 _log.warning(e.reason) |
| 135 run_results.interrupted = True | 118 run_results.interrupted = True |
| 136 except KeyboardInterrupt: | 119 except KeyboardInterrupt: |
| 137 self._printer.flush() | 120 self._printer.flush() |
| 138 self._printer.writeln('Interrupted, exiting ...') | 121 self._printer.writeln('Interrupted, exiting ...') |
| 139 raise | 122 raise |
| 140 except Exception, e: | 123 except Exception, e: |
| 141 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e
))) | 124 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e
))) |
| 142 raise | 125 raise |
| 143 finally: | |
| 144 self.stop_servers_with_lock() | |
| 145 | 126 |
| 146 return run_results | 127 return run_results |
| 147 | 128 |
| 148 def _worker_factory(self, worker_connection): | 129 def _worker_factory(self, worker_connection): |
| 149 results_directory = self._results_directory | 130 results_directory = self._results_directory |
| 150 if self._retrying: | 131 if self._retrying: |
| 151 self._filesystem.maybe_make_directory(self._filesystem.join(self._re
sults_directory, 'retries')) | 132 self._filesystem.maybe_make_directory(self._filesystem.join(self._re
sults_directory, 'retries')) |
| 152 results_directory = self._filesystem.join(self._results_directory, '
retries') | 133 results_directory = self._filesystem.join(self._results_directory, '
retries') |
| 153 return Worker(worker_connection, results_directory, self._options) | 134 return Worker(worker_connection, results_directory, self._options) |
| 154 | 135 |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 186 expected = self._expectations.matches_an_expected_result(result.test_nam
e, result.type, self._options.pixel_tests or result.reftest_type) | 167 expected = self._expectations.matches_an_expected_result(result.test_nam
e, result.type, self._options.pixel_tests or result.reftest_type) |
| 187 exp_str = self._expectations.get_expectations_string(result.test_name) | 168 exp_str = self._expectations.get_expectations_string(result.test_name) |
| 188 got_str = self._expectations.expectation_to_string(result.type) | 169 got_str = self._expectations.expectation_to_string(result.type) |
| 189 | 170 |
| 190 run_results.add(result, expected, self._test_is_slow(result.test_name)) | 171 run_results.add(result, expected, self._test_is_slow(result.test_name)) |
| 191 | 172 |
| 192 self._printer.print_finished_test(result, expected, exp_str, got_str) | 173 self._printer.print_finished_test(result, expected, exp_str, got_str) |
| 193 | 174 |
| 194 self._interrupt_if_at_failure_limits(run_results) | 175 self._interrupt_if_at_failure_limits(run_results) |
| 195 | 176 |
| 196 def start_servers_with_lock(self, number_of_servers): | |
| 197 self._printer.write_update('Acquiring http lock ...') | |
| 198 self._port.acquire_http_lock() | |
| 199 if self._needs_http: | |
| 200 self._printer.write_update('Starting HTTP server ...') | |
| 201 self._port.start_http_server(number_of_servers=number_of_servers) | |
| 202 if self._needs_websockets: | |
| 203 self._printer.write_update('Starting WebSocket server ...') | |
| 204 self._port.start_websocket_server() | |
| 205 self._has_http_lock = True | |
| 206 | |
| 207 def stop_servers_with_lock(self): | |
| 208 if self._has_http_lock: | |
| 209 if self._needs_http: | |
| 210 self._printer.write_update('Stopping HTTP server ...') | |
| 211 self._port.stop_http_server() | |
| 212 if self._needs_websockets: | |
| 213 self._printer.write_update('Stopping WebSocket server ...') | |
| 214 self._port.stop_websocket_server() | |
| 215 self._printer.write_update('Releasing server lock ...') | |
| 216 self._port.release_http_lock() | |
| 217 self._has_http_lock = False | |
| 218 | |
| 219 def handle(self, name, source, *args): | 177 def handle(self, name, source, *args): |
| 220 method = getattr(self, '_handle_' + name) | 178 method = getattr(self, '_handle_' + name) |
| 221 if method: | 179 if method: |
| 222 return method(source, *args) | 180 return method(source, *args) |
| 223 raise AssertionError('unknown message %s received from %s, args=%s' % (n
ame, source, repr(args))) | 181 raise AssertionError('unknown message %s received from %s, args=%s' % (n
ame, source, repr(args))) |
| 224 | 182 |
| 225 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): | 183 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): |
| 226 self._printer.print_started_test(test_input.test_name) | 184 self._printer.print_started_test(test_input.test_name) |
| 227 | 185 |
| 228 def _handle_finished_test_list(self, worker_name, list_name): | 186 def _handle_finished_test_list(self, worker_name, list_name): |
| 229 def find(name, test_lists): | 187 pass |
| 230 for i in range(len(test_lists)): | |
| 231 if test_lists[i].name == name: | |
| 232 return i | |
| 233 return -1 | |
| 234 | |
| 235 index = find(list_name, self._remaining_locked_shards) | |
| 236 if index >= 0: | |
| 237 self._remaining_locked_shards.pop(index) | |
| 238 if not self._remaining_locked_shards and not self._port.requires_htt
p_server(): | |
| 239 self.stop_servers_with_lock() | |
| 240 | 188 |
| 241 def _handle_finished_test(self, worker_name, result, log_messages=[]): | 189 def _handle_finished_test(self, worker_name, result, log_messages=[]): |
| 242 self._update_summary_with_result(self._current_run_results, result) | 190 self._update_summary_with_result(self._current_run_results, result) |
| 243 | 191 |
| 244 | 192 |
| 245 class Worker(object): | 193 class Worker(object): |
| 246 def __init__(self, caller, results_directory, options): | 194 def __init__(self, caller, results_directory, options): |
| 247 self._caller = caller | 195 self._caller = caller |
| 248 self._worker_number = caller.worker_number | 196 self._worker_number = caller.worker_number |
| 249 self._name = caller.name | 197 self._name = caller.name |
| (...skipping 325 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 575 def split_at(seq, index): | 523 def split_at(seq, index): |
| 576 return (seq[:index], seq[index:]) | 524 return (seq[:index], seq[index:]) |
| 577 | 525 |
| 578 num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) | 526 num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) |
| 579 new_shards = [] | 527 new_shards = [] |
| 580 remaining_shards = old_shards | 528 remaining_shards = old_shards |
| 581 while remaining_shards: | 529 while remaining_shards: |
| 582 some_shards, remaining_shards = split_at(remaining_shards, num_old_p
er_new) | 530 some_shards, remaining_shards = split_at(remaining_shards, num_old_p
er_new) |
| 583 new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_sh
ards) + 1), extract_and_flatten(some_shards))) | 531 new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_sh
ards) + 1), extract_and_flatten(some_shards))) |
| 584 return new_shards | 532 return new_shards |
| OLD | NEW |