Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(453)

Side by Side Diff: build/android/pylib/base/shard.py

Issue 18770008: [Android] Redesigns the sharder to allow replicated vs distributed tests (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Re-adds -f short form to gtest_filter switch Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Implements test sharding logic."""
6
7 import logging
8 import threading
9
10 from pylib import android_commands
11 from pylib import constants
12 from pylib.utils import reraiser_thread
13 from pylib.utils import watchdog_timer
14
15 import base_test_result
16
17
18 DEFAULT_TIMEOUT = 7 * 60 # seven minutes
19
20
21 class _ThreadSafeCounter(object):
22 """A threadsafe counter."""
23
24 def __init__(self):
25 self._lock = threading.Lock()
26 self._value = 0
27
28 def GetAndIncrement(self):
29 """Get the current value and increment it atomically.
30
31 Returns:
32 The value before incrementing.
33 """
34 with self._lock:
35 pre_increment = self._value
36 self._value += 1
37 return pre_increment
38
39
40 class _Test(object):
41 """Holds a test with additional metadata."""
42
43 def __init__(self, test, tries=0):
44 """Initializes the _Test object.
45
46 Args:
47 test: the test.
48 tries: number of tries so far.
49 """
50 self.test = test
51 self.tries = tries
52
53
54 class _TestCollection(object):
55 """A threadsafe collection of tests.
56
57 Args:
58 tests: list of tests to put in the collection.
59 """
60
61 def __init__(self, tests=[]):
62 self._lock = threading.Lock()
63 self._tests = []
64 self._tests_in_progress = 0
65 # Used to signal that an item is avaliable or all items have been handled.
66 self._item_avaliable_or_all_done = threading.Event()
67 for t in tests:
68 self.add(t)
69
70 def _pop(self):
71 """Pop a test from the collection.
72
73 Waits until a test is avaliable or all tests have been handled.
74
75 Returns:
76 A test or None if all tests have been handled.
77 """
78 while True:
79 # Wait for a test to be avaliable or all tests to have been handled.
80 self._item_avaliable_or_all_done.wait()
81 with self._lock:
82 # Check which of the two conditions triggered the signal.
83 if self._tests_in_progress == 0:
84 return None
85 try:
86 return self._tests.pop(0)
87 except IndexError:
88 # Another thread beat us to the avaliable test, wait again.
89 self._item_avaliable_or_all_done.clear()
90
91 def add(self, test):
92 """Add an test to the collection.
93
94 Args:
95 test: A test to add.
96 """
97 with self._lock:
98 self._tests.append(test)
99 self._item_avaliable_or_all_done.set()
100 self._tests_in_progress += 1
101
102 def test_completed(self):
103 """Indicate that a test has been fully handled."""
104 with self._lock:
105 self._tests_in_progress -= 1
106 if self._tests_in_progress == 0:
107 # All tests have been handled, signal all waiting threads.
108 self._item_avaliable_or_all_done.set()
109
110 def __iter__(self):
111 """Iterate through tests in the collection until all have been handled."""
112 while True:
113 r = self._pop()
114 if r is None:
115 break
116 yield r
117
118
119 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
120 num_retries):
121 """Runs tests from the test_collection until empty using the given runner.
122
123 Adds TestRunResults objects to the out_results list and may add tests to the
124 out_retry list.
125
126 Args:
127 runner: A TestRunner object used to run the tests.
128 test_collection: A _TestCollection from which to get _Test objects to run.
129 out_results: A list to add TestRunResults to.
130 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
131 num_retries: Number of retries for a test.
132 """
133 for test in test_collection:
134 watcher.Reset()
135 try:
136 if not android_commands.IsDeviceAttached(runner.device):
137 # Device is unresponsive, stop handling tests on this device.
138 msg = 'Device %s is unresponsive.' % runner.device
139 logging.warning(msg)
140 raise android_commands.errors.DeviceUnresponsiveError(msg)
141 result, retry = runner.RunTest(test.test)
142 test.tries += 1
143 if retry and test.tries <= num_retries:
144 # Retry non-passing results, only record passing results.
145 pass_results = base_test_result.TestRunResults()
146 pass_results.AddResults(result.GetPass())
147 out_results.append(pass_results)
148 logging.warning('Will retry test, try #%s.' % test.tries)
149 test_collection.add(_Test(test=retry, tries=test.tries))
150 else:
151 # All tests passed or retry limit reached. Either way, record results.
152 out_results.append(result)
153 except:
154 # An unhandleable exception, ensure tests get run by another device and
155 # reraise this exception on the main thread.
156 test_collection.add(test)
157 raise
158 finally:
159 # Retries count as separate tasks so always mark the popped test as done.
160 test_collection.test_completed()
161
162
163 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
164 """Creates a test runner for each device and calls SetUp() in parallel.
165
166 Note: if a device is unresponsive the corresponding TestRunner will not be
167 added to out_runners.
168
169 Args:
170 runner_factory: callable that takes a device and index and returns a
171 TestRunner object.
172 device: the device serial number to set up.
173 out_runners: list to add the successfully set up TestRunner object.
174 threadsafe_counter: a _ThreadSafeCounter object used to get shard indices.
175 """
176 try:
177 index = threadsafe_counter.GetAndIncrement()
178 logging.warning('Creating shard %s for device %s.', index, device)
179 runner = runner_factory(device, index)
180 runner.SetUp()
181 out_runners.append(runner)
182 except android_commands.errors.DeviceUnresponsiveError as e:
183 logging.warning('Failed to create shard for %s: [%s]', device, e)
184
185
186 def _RunAllTests(runners, tests, num_retries, timeout=None):
187 """Run all tests using the given TestRunners.
188
189 Args:
190 runners: a list of TestRunner objects.
191 tests: a list of Tests to run using the given TestRunners.
192 num_retries: number of retries for a test.
193 timeout: watchdog timeout in seconds, defaults to the default timeout.
194
195 Returns:
196 A tuple of (TestRunResults object, exit code)
197 """
198 logging.warning('Running %s tests with %s test runners.' %
199 (len(tests), len(runners)))
200 tests_collection = _TestCollection([_Test(t) for t in tests])
201 results = []
202 exit_code = 0
203 watcher = watchdog_timer.WatchdogTimer(timeout)
204 workers = reraiser_thread.ReraiserThreadGroup(
205 [reraiser_thread.ReraiserThread(
206 _RunTestsFromQueue,
207 [r, tests_collection, results, watcher, num_retries],
208 name=r.device[-4:])
209 for r in runners])
210 run_results = base_test_result.TestRunResults()
211 workers.StartAll()
212
213 # Catch DeviceUnresponsiveErrors and set a warning exit code
214 try:
215 workers.JoinAll(watcher)
216 except android_commands.errors.DeviceUnresponsiveError as e:
217 logging.error(e)
218 exit_code = constants.WARNING_EXIT_CODE
219
220 for r in results:
221 run_results.AddTestRunResults(r)
222 if not run_results.DidRunPass():
223 exit_code = constants.ERROR_EXIT_CODE
224 return (run_results, exit_code)
225
226
227 def _CreateRunners(runner_factory, devices, timeout=None):
228 """Creates a test runner for each device and calls SetUp() in parallel.
229
230 Note: if a device is unresponsive the corresponding TestRunner will not be
231 included in the returned list.
232
233 Args:
234 runner_factory: callable that takes a device and index and returns a
235 TestRunner object.
236 devices: list of device serial numbers as strings.
237 timeout: watchdog timeout in seconds, defaults to the default timeout.
238
239 Returns:
240 A list of TestRunner objects.
241 """
242 logging.warning('Creating %s test runners.' % len(devices))
243 runners = []
244 counter = _ThreadSafeCounter()
245 threads = reraiser_thread.ReraiserThreadGroup(
246 [reraiser_thread.ReraiserThread(_SetUp,
247 [runner_factory, d, runners, counter],
248 name=d[-4:])
249 for d in devices])
250 threads.StartAll()
251 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
252 return runners
253
254
255 def _TearDownRunners(runners, timeout=None):
256 """Calls TearDown() for each test runner in parallel.
257
258 Args:
259 runners: a list of TestRunner objects.
260 timeout: watchdog timeout in seconds, defaults to the default timeout.
261 """
262 threads = reraiser_thread.ReraiserThreadGroup(
263 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
264 for r in runners])
265 threads.StartAll()
266 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
267
268
269 def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
270 test_timeout=DEFAULT_TIMEOUT,
271 setup_timeout=DEFAULT_TIMEOUT,
272 num_retries=2):
273 """Run all tests on attached devices, retrying tests that don't pass.
274
275 Args:
276 runner_factory: callable that takes a device and index and returns a
277 TestRunner object.
278 devices: list of attached device serial numbers as strings.
279 tests: list of tests to run.
280 build_type: either 'Debug' or 'Release'.
281 test_timeout: watchdog timeout in seconds for running tests, defaults to the
282 default timeout.
283 setup_timeout: watchdog timeout in seconds for creating and cleaning up
284 test runners, defaults to the default timeout.
285 num_retries: number of retries for a test.
286
287 Returns:
288 A tuple of (base_test_result.TestRunResults object, exit code).
289 """
290 if not tests:
291 logging.error('No tests to run.')
292 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
293
294 logging.info('Will run %d tests: %s', len(tests), str(tests))
295 runners = _CreateRunners(runner_factory, devices, setup_timeout)
296 try:
297 return _RunAllTests(runners, tests, num_retries, test_timeout)
298 finally:
299 try:
300 _TearDownRunners(runners, setup_timeout)
301 except android_commands.errors.DeviceUnresponsiveError as e:
302 logging.warning('Device unresponsive during TearDown: [%s]', e)
OLDNEW
« no previous file with comments | « build/android/pylib/base/base_test_result.py ('k') | build/android/pylib/base/shard_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698