OLD | NEW |
| (Empty) |
1 # Copyright (c) 2013 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Implements test sharding logic.""" | |
6 | |
7 import logging | |
8 import threading | |
9 | |
10 from pylib import android_commands | |
11 from pylib import constants | |
12 from pylib.utils import reraiser_thread | |
13 from pylib.utils import watchdog_timer | |
14 | |
15 import base_test_result | |
16 | |
17 | |
18 DEFAULT_TIMEOUT = 7 * 60 # seven minutes | |
19 | |
20 | |
21 class _ThreadSafeCounter(object): | |
22 """A threadsafe counter.""" | |
23 | |
24 def __init__(self): | |
25 self._lock = threading.Lock() | |
26 self._value = 0 | |
27 | |
28 def GetAndIncrement(self): | |
29 """Get the current value and increment it atomically. | |
30 | |
31 Returns: | |
32 The value before incrementing. | |
33 """ | |
34 with self._lock: | |
35 pre_increment = self._value | |
36 self._value += 1 | |
37 return pre_increment | |
38 | |
39 | |
40 class _Test(object): | |
41 """Holds a test with additional metadata.""" | |
42 | |
43 def __init__(self, test, tries=0): | |
44 """Initializes the _Test object. | |
45 | |
46 Args: | |
47 test: the test. | |
48 tries: number of tries so far. | |
49 """ | |
50 self.test = test | |
51 self.tries = tries | |
52 | |
53 | |
54 class _TestCollection(object): | |
55 """A threadsafe collection of tests. | |
56 | |
57 Args: | |
58 tests: list of tests to put in the collection. | |
59 """ | |
60 | |
61 def __init__(self, tests=[]): | |
62 self._lock = threading.Lock() | |
63 self._tests = [] | |
64 self._tests_in_progress = 0 | |
65 # Used to signal that an item is avaliable or all items have been handled. | |
66 self._item_avaliable_or_all_done = threading.Event() | |
67 for t in tests: | |
68 self.add(t) | |
69 | |
70 def _pop(self): | |
71 """Pop a test from the collection. | |
72 | |
73 Waits until a test is avaliable or all tests have been handled. | |
74 | |
75 Returns: | |
76 A test or None if all tests have been handled. | |
77 """ | |
78 while True: | |
79 # Wait for a test to be avaliable or all tests to have been handled. | |
80 self._item_avaliable_or_all_done.wait() | |
81 with self._lock: | |
82 # Check which of the two conditions triggered the signal. | |
83 if self._tests_in_progress == 0: | |
84 return None | |
85 try: | |
86 return self._tests.pop(0) | |
87 except IndexError: | |
88 # Another thread beat us to the avaliable test, wait again. | |
89 self._item_avaliable_or_all_done.clear() | |
90 | |
91 def add(self, test): | |
92 """Add an test to the collection. | |
93 | |
94 Args: | |
95 test: A test to add. | |
96 """ | |
97 with self._lock: | |
98 self._tests.append(test) | |
99 self._item_avaliable_or_all_done.set() | |
100 self._tests_in_progress += 1 | |
101 | |
102 def test_completed(self): | |
103 """Indicate that a test has been fully handled.""" | |
104 with self._lock: | |
105 self._tests_in_progress -= 1 | |
106 if self._tests_in_progress == 0: | |
107 # All tests have been handled, signal all waiting threads. | |
108 self._item_avaliable_or_all_done.set() | |
109 | |
110 def __iter__(self): | |
111 """Iterate through tests in the collection until all have been handled.""" | |
112 while True: | |
113 r = self._pop() | |
114 if r is None: | |
115 break | |
116 yield r | |
117 | |
118 | |
119 def _RunTestsFromQueue(runner, test_collection, out_results, watcher, | |
120 num_retries): | |
121 """Runs tests from the test_collection until empty using the given runner. | |
122 | |
123 Adds TestRunResults objects to the out_results list and may add tests to the | |
124 out_retry list. | |
125 | |
126 Args: | |
127 runner: A TestRunner object used to run the tests. | |
128 test_collection: A _TestCollection from which to get _Test objects to run. | |
129 out_results: A list to add TestRunResults to. | |
130 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout. | |
131 num_retries: Number of retries for a test. | |
132 """ | |
133 for test in test_collection: | |
134 watcher.Reset() | |
135 try: | |
136 if not android_commands.IsDeviceAttached(runner.device): | |
137 # Device is unresponsive, stop handling tests on this device. | |
138 msg = 'Device %s is unresponsive.' % runner.device | |
139 logging.warning(msg) | |
140 raise android_commands.errors.DeviceUnresponsiveError(msg) | |
141 result, retry = runner.RunTest(test.test) | |
142 test.tries += 1 | |
143 if retry and test.tries <= num_retries: | |
144 # Retry non-passing results, only record passing results. | |
145 pass_results = base_test_result.TestRunResults() | |
146 pass_results.AddResults(result.GetPass()) | |
147 out_results.append(pass_results) | |
148 logging.warning('Will retry test, try #%s.' % test.tries) | |
149 test_collection.add(_Test(test=retry, tries=test.tries)) | |
150 else: | |
151 # All tests passed or retry limit reached. Either way, record results. | |
152 out_results.append(result) | |
153 except: | |
154 # An unhandleable exception, ensure tests get run by another device and | |
155 # reraise this exception on the main thread. | |
156 test_collection.add(test) | |
157 raise | |
158 finally: | |
159 # Retries count as separate tasks so always mark the popped test as done. | |
160 test_collection.test_completed() | |
161 | |
162 | |
163 def _SetUp(runner_factory, device, out_runners, threadsafe_counter): | |
164 """Creates a test runner for each device and calls SetUp() in parallel. | |
165 | |
166 Note: if a device is unresponsive the corresponding TestRunner will not be | |
167 added to out_runners. | |
168 | |
169 Args: | |
170 runner_factory: callable that takes a device and index and returns a | |
171 TestRunner object. | |
172 device: the device serial number to set up. | |
173 out_runners: list to add the successfully set up TestRunner object. | |
174 threadsafe_counter: a _ThreadSafeCounter object used to get shard indices. | |
175 """ | |
176 try: | |
177 index = threadsafe_counter.GetAndIncrement() | |
178 logging.warning('Creating shard %s for device %s.', index, device) | |
179 runner = runner_factory(device, index) | |
180 runner.SetUp() | |
181 out_runners.append(runner) | |
182 except android_commands.errors.DeviceUnresponsiveError as e: | |
183 logging.warning('Failed to create shard for %s: [%s]', device, e) | |
184 | |
185 | |
186 def _RunAllTests(runners, tests, num_retries, timeout=None): | |
187 """Run all tests using the given TestRunners. | |
188 | |
189 Args: | |
190 runners: a list of TestRunner objects. | |
191 tests: a list of Tests to run using the given TestRunners. | |
192 num_retries: number of retries for a test. | |
193 timeout: watchdog timeout in seconds, defaults to the default timeout. | |
194 | |
195 Returns: | |
196 A tuple of (TestRunResults object, exit code) | |
197 """ | |
198 logging.warning('Running %s tests with %s test runners.' % | |
199 (len(tests), len(runners))) | |
200 tests_collection = _TestCollection([_Test(t) for t in tests]) | |
201 results = [] | |
202 exit_code = 0 | |
203 watcher = watchdog_timer.WatchdogTimer(timeout) | |
204 workers = reraiser_thread.ReraiserThreadGroup( | |
205 [reraiser_thread.ReraiserThread( | |
206 _RunTestsFromQueue, | |
207 [r, tests_collection, results, watcher, num_retries], | |
208 name=r.device[-4:]) | |
209 for r in runners]) | |
210 run_results = base_test_result.TestRunResults() | |
211 workers.StartAll() | |
212 | |
213 # Catch DeviceUnresponsiveErrors and set a warning exit code | |
214 try: | |
215 workers.JoinAll(watcher) | |
216 except android_commands.errors.DeviceUnresponsiveError as e: | |
217 logging.error(e) | |
218 exit_code = constants.WARNING_EXIT_CODE | |
219 | |
220 for r in results: | |
221 run_results.AddTestRunResults(r) | |
222 if not run_results.DidRunPass(): | |
223 exit_code = constants.ERROR_EXIT_CODE | |
224 return (run_results, exit_code) | |
225 | |
226 | |
227 def _CreateRunners(runner_factory, devices, timeout=None): | |
228 """Creates a test runner for each device and calls SetUp() in parallel. | |
229 | |
230 Note: if a device is unresponsive the corresponding TestRunner will not be | |
231 included in the returned list. | |
232 | |
233 Args: | |
234 runner_factory: callable that takes a device and index and returns a | |
235 TestRunner object. | |
236 devices: list of device serial numbers as strings. | |
237 timeout: watchdog timeout in seconds, defaults to the default timeout. | |
238 | |
239 Returns: | |
240 A list of TestRunner objects. | |
241 """ | |
242 logging.warning('Creating %s test runners.' % len(devices)) | |
243 runners = [] | |
244 counter = _ThreadSafeCounter() | |
245 threads = reraiser_thread.ReraiserThreadGroup( | |
246 [reraiser_thread.ReraiserThread(_SetUp, | |
247 [runner_factory, d, runners, counter], | |
248 name=d[-4:]) | |
249 for d in devices]) | |
250 threads.StartAll() | |
251 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) | |
252 return runners | |
253 | |
254 | |
255 def _TearDownRunners(runners, timeout=None): | |
256 """Calls TearDown() for each test runner in parallel. | |
257 | |
258 Args: | |
259 runners: a list of TestRunner objects. | |
260 timeout: watchdog timeout in seconds, defaults to the default timeout. | |
261 """ | |
262 threads = reraiser_thread.ReraiserThreadGroup( | |
263 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:]) | |
264 for r in runners]) | |
265 threads.StartAll() | |
266 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) | |
267 | |
268 | |
269 def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', | |
270 test_timeout=DEFAULT_TIMEOUT, | |
271 setup_timeout=DEFAULT_TIMEOUT, | |
272 num_retries=2): | |
273 """Run all tests on attached devices, retrying tests that don't pass. | |
274 | |
275 Args: | |
276 runner_factory: callable that takes a device and index and returns a | |
277 TestRunner object. | |
278 devices: list of attached device serial numbers as strings. | |
279 tests: list of tests to run. | |
280 build_type: either 'Debug' or 'Release'. | |
281 test_timeout: watchdog timeout in seconds for running tests, defaults to the | |
282 default timeout. | |
283 setup_timeout: watchdog timeout in seconds for creating and cleaning up | |
284 test runners, defaults to the default timeout. | |
285 num_retries: number of retries for a test. | |
286 | |
287 Returns: | |
288 A tuple of (base_test_result.TestRunResults object, exit code). | |
289 """ | |
290 if not tests: | |
291 logging.error('No tests to run.') | |
292 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE) | |
293 | |
294 logging.info('Will run %d tests: %s', len(tests), str(tests)) | |
295 runners = _CreateRunners(runner_factory, devices, setup_timeout) | |
296 try: | |
297 return _RunAllTests(runners, tests, num_retries, test_timeout) | |
298 finally: | |
299 try: | |
300 _TearDownRunners(runners, setup_timeout) | |
301 except android_commands.errors.DeviceUnresponsiveError as e: | |
302 logging.warning('Device unresponsive during TearDown: [%s]', e) | |
OLD | NEW |