Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(267)

Side by Side Diff: build/android/pylib/base/test_dispatcher.py

Issue 18770008: [Android] Redesigns the sharder to allow replicated vs distributed tests (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Re-adds -f short form to gtest_filter switch Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 # Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Implements test sharding logic.""" 5 """Dispatches tests, either sharding or replicating them.
6
7 To dispatch, performs the following steps:
8 * Create a test collection factory, using the given tests
9 - If sharding: test collection factory returns the same shared test collection
10 to all test runners
11 - If replciating: test collection factory returns a unique test collection to
12 each test runner, with the same set of tests in each.
13 * Get the list of devices to run on
14 * Create test runners
15 * Run each test runner in its own thread, pulling tests from the test collection
16 generated from the test collection factory until there are no tests left.
17 """
6 18
7 import logging 19 import logging
8 import threading 20 import threading
9 21
10 from pylib import android_commands 22 from pylib import android_commands
11 from pylib import constants 23 from pylib import constants
12 from pylib.utils import reraiser_thread 24 from pylib.utils import reraiser_thread
13 from pylib.utils import watchdog_timer 25 from pylib.utils import watchdog_timer
14 26
15 import base_test_result 27 import base_test_result
(...skipping 21 matching lines...) Expand all
37 return pre_increment 49 return pre_increment
38 50
39 51
40 class _Test(object): 52 class _Test(object):
41 """Holds a test with additional metadata.""" 53 """Holds a test with additional metadata."""
42 54
43 def __init__(self, test, tries=0): 55 def __init__(self, test, tries=0):
44 """Initializes the _Test object. 56 """Initializes the _Test object.
45 57
46 Args: 58 Args:
47 test: the test. 59 test: The test.
48 tries: number of tries so far. 60 tries: Number of tries so far.
49 """ 61 """
50 self.test = test 62 self.test = test
51 self.tries = tries 63 self.tries = tries
52 64
53 65
54 class _TestCollection(object): 66 class _TestCollection(object):
55 """A threadsafe collection of tests. 67 """A threadsafe collection of tests.
56 68
57 Args: 69 Args:
58 tests: list of tests to put in the collection. 70 tests: List of tests to put in the collection.
59 """ 71 """
60 72
61 def __init__(self, tests=[]): 73 def __init__(self, tests=[]):
62 self._lock = threading.Lock() 74 self._lock = threading.Lock()
63 self._tests = [] 75 self._tests = []
64 self._tests_in_progress = 0 76 self._tests_in_progress = 0
65 # Used to signal that an item is avaliable or all items have been handled. 77 # Used to signal that an item is avaliable or all items have been handled.
66 self._item_avaliable_or_all_done = threading.Event() 78 self._item_avaliable_or_all_done = threading.Event()
67 for t in tests: 79 for t in tests:
68 self.add(t) 80 self.add(t)
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
110 def __iter__(self): 122 def __iter__(self):
111 """Iterate through tests in the collection until all have been handled.""" 123 """Iterate through tests in the collection until all have been handled."""
112 while True: 124 while True:
113 r = self._pop() 125 r = self._pop()
114 if r is None: 126 if r is None:
115 break 127 break
116 yield r 128 yield r
117 129
118 130
119 def _RunTestsFromQueue(runner, test_collection, out_results, watcher, 131 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
120 num_retries): 132 num_retries, tag_results_with_device=False):
121 """Runs tests from the test_collection until empty using the given runner. 133 """Runs tests from the test_collection until empty using the given runner.
122 134
123 Adds TestRunResults objects to the out_results list and may add tests to the 135 Adds TestRunResults objects to the out_results list and may add tests to the
124 out_retry list. 136 out_retry list.
125 137
126 Args: 138 Args:
127 runner: A TestRunner object used to run the tests. 139 runner: A TestRunner object used to run the tests.
128 test_collection: A _TestCollection from which to get _Test objects to run. 140 test_collection: A _TestCollection from which to get _Test objects to run.
129 out_results: A list to add TestRunResults to. 141 out_results: A list to add TestRunResults to.
130 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout. 142 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
131 num_retries: Number of retries for a test. 143 num_retries: Number of retries for a test.
144 tag_results_with_device: If True, appends the name of the device on which
145 the test was run to the test name. Used when replicating to identify
146 which device ran each copy of the test, and to ensure each copy of the
147 test is recorded separately.
132 """ 148 """
149
150 def TagTestRunResults(test_run_results):
151 """Tags all results with the last 4 digits of the device id.
152
153 Used when replicating tests to distinguish the same tests run on different
154 devices. We use a set to store test results, so the hash (generated from
155 name and tag) must be unique to be considered different results.
156 """
157 new_test_run_results = base_test_result.TestRunResults()
158 for test_result in test_run_results.GetAll():
159 test_result.SetTag(runner.device[-4:])
160 new_test_run_results.AddResult(test_result)
161 return new_test_run_results
162
133 for test in test_collection: 163 for test in test_collection:
134 watcher.Reset() 164 watcher.Reset()
135 try: 165 try:
136 if not android_commands.IsDeviceAttached(runner.device): 166 if not android_commands.IsDeviceAttached(runner.device):
137 # Device is unresponsive, stop handling tests on this device. 167 # Device is unresponsive, stop handling tests on this device.
138 msg = 'Device %s is unresponsive.' % runner.device 168 msg = 'Device %s is unresponsive.' % runner.device
139 logging.warning(msg) 169 logging.warning(msg)
140 raise android_commands.errors.DeviceUnresponsiveError(msg) 170 raise android_commands.errors.DeviceUnresponsiveError(msg)
141 result, retry = runner.RunTest(test.test) 171 result, retry = runner.RunTest(test.test)
172 if tag_results_with_device:
173 result = TagTestRunResults(result)
142 test.tries += 1 174 test.tries += 1
143 if retry and test.tries <= num_retries: 175 if retry and test.tries <= num_retries:
144 # Retry non-passing results, only record passing results. 176 # Retry non-passing results, only record passing results.
145 pass_results = base_test_result.TestRunResults() 177 pass_results = base_test_result.TestRunResults()
146 pass_results.AddResults(result.GetPass()) 178 pass_results.AddResults(result.GetPass())
147 out_results.append(pass_results) 179 out_results.append(pass_results)
148 logging.warning('Will retry test, try #%s.' % test.tries) 180 logging.warning('Will retry test, try #%s.' % test.tries)
149 test_collection.add(_Test(test=retry, tries=test.tries)) 181 test_collection.add(_Test(test=retry, tries=test.tries))
150 else: 182 else:
151 # All tests passed or retry limit reached. Either way, record results. 183 # All tests passed or retry limit reached. Either way, record results.
152 out_results.append(result) 184 out_results.append(result)
153 except: 185 except:
154 # An unhandleable exception, ensure tests get run by another device and 186 # An unhandleable exception, ensure tests get run by another device and
155 # reraise this exception on the main thread. 187 # reraise this exception on the main thread.
156 test_collection.add(test) 188 test_collection.add(test)
157 raise 189 raise
158 finally: 190 finally:
159 # Retries count as separate tasks so always mark the popped test as done. 191 # Retries count as separate tasks so always mark the popped test as done.
160 test_collection.test_completed() 192 test_collection.test_completed()
161 193
162 194
163 def _SetUp(runner_factory, device, out_runners, threadsafe_counter): 195 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
164 """Creates a test runner for each device and calls SetUp() in parallel. 196 """Creates a test runner for each device and calls SetUp() in parallel.
165 197
166 Note: if a device is unresponsive the corresponding TestRunner will not be 198 Note: if a device is unresponsive the corresponding TestRunner will not be
167 added to out_runners. 199 added to out_runners.
168 200
169 Args: 201 Args:
170 runner_factory: callable that takes a device and index and returns a 202 runner_factory: Callable that takes a device and index and returns a
171 TestRunner object. 203 TestRunner object.
172 device: the device serial number to set up. 204 device: The device serial number to set up.
173 out_runners: list to add the successfully set up TestRunner object. 205 out_runners: List to add the successfully set up TestRunner object.
174 threadsafe_counter: a _ThreadSafeCounter object used to get shard indices. 206 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
175 """ 207 """
176 try: 208 try:
177 index = threadsafe_counter.GetAndIncrement() 209 index = threadsafe_counter.GetAndIncrement()
178 logging.warning('Creating shard %s for device %s.', index, device) 210 logging.warning('Creating shard %s for device %s.', index, device)
179 runner = runner_factory(device, index) 211 runner = runner_factory(device, index)
180 runner.SetUp() 212 runner.SetUp()
181 out_runners.append(runner) 213 out_runners.append(runner)
182 except android_commands.errors.DeviceUnresponsiveError as e: 214 except android_commands.errors.DeviceUnresponsiveError as e:
183 logging.warning('Failed to create shard for %s: [%s]', device, e) 215 logging.warning('Failed to create shard for %s: [%s]', device, e)
184 216
185 217
186 def _RunAllTests(runners, tests, num_retries, timeout=None): 218 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
219 tag_results_with_device=False):
187 """Run all tests using the given TestRunners. 220 """Run all tests using the given TestRunners.
188 221
189 Args: 222 Args:
190 runners: a list of TestRunner objects. 223 runners: A list of TestRunner objects.
191 tests: a list of Tests to run using the given TestRunners. 224 test_collection_factory: A callable to generate a _TestCollection object for
192 num_retries: number of retries for a test. 225 each test runner.
193 timeout: watchdog timeout in seconds, defaults to the default timeout. 226 num_retries: Number of retries for a test.
227 timeout: Watchdog timeout in seconds.
228 tag_results_with_device: If True, appends the name of the device on which
229 the test was run to the test name. Used when replicating to identify
230 which device ran each copy of the test, and to ensure each copy of the
231 test is recorded separately.
194 232
195 Returns: 233 Returns:
196 A tuple of (TestRunResults object, exit code) 234 A tuple of (TestRunResults object, exit code)
197 """ 235 """
198 logging.warning('Running %s tests with %s test runners.' % 236 logging.warning('Running tests with %s test runners.' % (len(runners)))
199 (len(tests), len(runners)))
200 tests_collection = _TestCollection([_Test(t) for t in tests])
201 results = [] 237 results = []
202 exit_code = 0 238 exit_code = 0
203 watcher = watchdog_timer.WatchdogTimer(timeout) 239 watcher = watchdog_timer.WatchdogTimer(timeout)
240
204 workers = reraiser_thread.ReraiserThreadGroup( 241 workers = reraiser_thread.ReraiserThreadGroup(
205 [reraiser_thread.ReraiserThread( 242 [reraiser_thread.ReraiserThread(
206 _RunTestsFromQueue, 243 _RunTestsFromQueue,
207 [r, tests_collection, results, watcher, num_retries], 244 [r, test_collection_factory(), results, watcher, num_retries,
245 tag_results_with_device],
208 name=r.device[-4:]) 246 name=r.device[-4:])
209 for r in runners]) 247 for r in runners])
210 run_results = base_test_result.TestRunResults() 248 run_results = base_test_result.TestRunResults()
211 workers.StartAll() 249 workers.StartAll()
212 250
213 # Catch DeviceUnresponsiveErrors and set a warning exit code 251 # Catch DeviceUnresponsiveErrors and set a warning exit code
214 try: 252 try:
215 workers.JoinAll(watcher) 253 workers.JoinAll(watcher)
216 except android_commands.errors.DeviceUnresponsiveError as e: 254 except android_commands.errors.DeviceUnresponsiveError as e:
217 logging.error(e) 255 logging.error(e)
218 exit_code = constants.WARNING_EXIT_CODE 256 exit_code = constants.WARNING_EXIT_CODE
219 257
220 for r in results: 258 for r in results:
221 run_results.AddTestRunResults(r) 259 run_results.AddTestRunResults(r)
222 if not run_results.DidRunPass(): 260 if not run_results.DidRunPass():
223 exit_code = constants.ERROR_EXIT_CODE 261 exit_code = constants.ERROR_EXIT_CODE
224 return (run_results, exit_code) 262 return (run_results, exit_code)
225 263
226 264
227 def _CreateRunners(runner_factory, devices, timeout=None): 265 def _CreateRunners(runner_factory, devices, timeout=None):
228 """Creates a test runner for each device and calls SetUp() in parallel. 266 """Creates a test runner for each device and calls SetUp() in parallel.
229 267
230 Note: if a device is unresponsive the corresponding TestRunner will not be 268 Note: if a device is unresponsive the corresponding TestRunner will not be
231 included in the returned list. 269 included in the returned list.
232 270
233 Args: 271 Args:
234 runner_factory: callable that takes a device and index and returns a 272 runner_factory: Callable that takes a device and index and returns a
235 TestRunner object. 273 TestRunner object.
236 devices: list of device serial numbers as strings. 274 devices: List of device serial numbers as strings.
237 timeout: watchdog timeout in seconds, defaults to the default timeout. 275 timeout: Watchdog timeout in seconds, defaults to the default timeout.
238 276
239 Returns: 277 Returns:
240 A list of TestRunner objects. 278 A list of TestRunner objects.
241 """ 279 """
242 logging.warning('Creating %s test runners.' % len(devices)) 280 logging.warning('Creating %s test runners.' % len(devices))
243 runners = [] 281 runners = []
244 counter = _ThreadSafeCounter() 282 counter = _ThreadSafeCounter()
245 threads = reraiser_thread.ReraiserThreadGroup( 283 threads = reraiser_thread.ReraiserThreadGroup(
246 [reraiser_thread.ReraiserThread(_SetUp, 284 [reraiser_thread.ReraiserThread(_SetUp,
247 [runner_factory, d, runners, counter], 285 [runner_factory, d, runners, counter],
248 name=d[-4:]) 286 name=d[-4:])
249 for d in devices]) 287 for d in devices])
250 threads.StartAll() 288 threads.StartAll()
251 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 289 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
252 return runners 290 return runners
253 291
254 292
255 def _TearDownRunners(runners, timeout=None): 293 def _TearDownRunners(runners, timeout=None):
256 """Calls TearDown() for each test runner in parallel. 294 """Calls TearDown() for each test runner in parallel.
257 295
258 Args: 296 Args:
259 runners: a list of TestRunner objects. 297 runners: A list of TestRunner objects.
260 timeout: watchdog timeout in seconds, defaults to the default timeout. 298 timeout: Watchdog timeout in seconds, defaults to the default timeout.
261 """ 299 """
262 threads = reraiser_thread.ReraiserThreadGroup( 300 threads = reraiser_thread.ReraiserThreadGroup(
263 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:]) 301 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
264 for r in runners]) 302 for r in runners])
265 threads.StartAll() 303 threads.StartAll()
266 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 304 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
267 305
268 306
269 def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', 307
270 test_timeout=DEFAULT_TIMEOUT, 308 def _GetAttachedDevices(wait_for_debugger=False, test_device=None):
271 setup_timeout=DEFAULT_TIMEOUT, 309 """Get all attached devices.
272 num_retries=2): 310
311 If we are using a debugger, limit to only one device.
312
313 Args:
314 wait_for_debugger: True if this run will use a debugger.
315 test_device: Name of a specific device to use.
316
317 Returns:
318 A list of attached devices.
319 """
320 attached_devices = []
321
322 attached_devices = android_commands.GetAttachedDevices()
323 if test_device:
324 assert test_device in attached_devices, (
325 'Did not find device %s among attached device. Attached devices: %s'
326 % (test_device, ', '.join(attached_devices)))
327 attached_devices = [test_device]
328
329 if len(attached_devices) > 1 and wait_for_debugger:
330 logging.warning('Debugger can not be sharded, using first available device')
331 attached_devices = attached_devices[:1]
332
333 return attached_devices
334
335
336 def RunTests(tests, runner_factory, wait_for_debugger, test_device,
337 shard=True,
338 build_type='Debug',
339 test_timeout=DEFAULT_TIMEOUT,
340 setup_timeout=DEFAULT_TIMEOUT,
341 num_retries=2):
273 """Run all tests on attached devices, retrying tests that don't pass. 342 """Run all tests on attached devices, retrying tests that don't pass.
274 343
275 Args: 344 Args:
276 runner_factory: callable that takes a device and index and returns a 345 tests: List of tests to run.
277 TestRunner object. 346 runner_factory: Callable that takes a device and index and returns a
278 devices: list of attached device serial numbers as strings. 347 TestRunner object.
279 tests: list of tests to run. 348 wait_for_debugger: True if this test is using a debugger.
280 build_type: either 'Debug' or 'Release'. 349 test_device: A specific device to run tests on, or None.
281 test_timeout: watchdog timeout in seconds for running tests, defaults to the 350 shard: True if we should shard, False if we should replicate tests.
282 default timeout. 351 - Sharding tests will distribute tests across all test runners through a
283 setup_timeout: watchdog timeout in seconds for creating and cleaning up 352 shared test collection.
284 test runners, defaults to the default timeout. 353 - Replicating tests will copy all tests to each test runner through a
285 num_retries: number of retries for a test. 354 unique test collection for each test runner.
355 build_type: Either 'Debug' or 'Release'.
356 test_timeout: Watchdog timeout in seconds for running tests.
357 setup_timeout: Watchdog timeout in seconds for creating and cleaning up
358 test runners.
359 num_retries: Number of retries for a test.
286 360
287 Returns: 361 Returns:
288 A tuple of (base_test_result.TestRunResults object, exit code). 362 A tuple of (base_test_result.TestRunResults object, exit code).
289 """ 363 """
290 if not tests: 364 if not tests:
291 logging.error('No tests to run.') 365 logging.error('No tests to run.')
292 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE) 366 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
293 367
368 if shard:
369 # Generate a shared _TestCollection object for all test runners, so they
370 # draw from a common pool of tests.
371 shared_test_collection = _TestCollection([_Test(t) for t in tests])
372 test_collection_factory = lambda: shared_test_collection
373 tag_results_with_device = False
374 else:
375 # Generate a unique _TestCollection object for each test runner, but use
376 # the same set of tests.
377 test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
378 tag_results_with_device = True
379
380 devices = _GetAttachedDevices(wait_for_debugger, test_device)
381
294 logging.info('Will run %d tests: %s', len(tests), str(tests)) 382 logging.info('Will run %d tests: %s', len(tests), str(tests))
383
295 runners = _CreateRunners(runner_factory, devices, setup_timeout) 384 runners = _CreateRunners(runner_factory, devices, setup_timeout)
296 try: 385 try:
297 return _RunAllTests(runners, tests, num_retries, test_timeout) 386 return _RunAllTests(runners, test_collection_factory,
387 num_retries, test_timeout, tag_results_with_device)
298 finally: 388 finally:
299 try: 389 try:
300 _TearDownRunners(runners, setup_timeout) 390 _TearDownRunners(runners, setup_timeout)
301 except android_commands.errors.DeviceUnresponsiveError as e: 391 except android_commands.errors.DeviceUnresponsiveError as e:
302 logging.warning('Device unresponsive during TearDown: [%s]', e) 392 logging.warning('Device unresponsive during TearDown: [%s]', e)
OLDNEW
« no previous file with comments | « build/android/pylib/base/shard_unittest.py ('k') | build/android/pylib/base/test_dispatcher_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698