Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: sharding_supervisor.py

Issue 9978016: Modify the sharding supervisor to support sharding only a sub-shard of a test suite. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/tools/sharding_supervisor/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Shards a given test suite and runs the shards in parallel. 6 """Shards a given test suite and runs the shards in parallel.
7 7
8 ShardingSupervisor is called to process the command line options and creates 8 ShardingSupervisor is called to process the command line options and creates
9 the specified number of worker threads. These threads then run each shard of 9 the specified number of worker threads. These threads then run each shard of
10 the test in a separate process and report on the results. When all the shards 10 the test in a separate process and report on the results. When all the shards
(...skipping 24 matching lines...) Expand all
35 except ImportError: 35 except ImportError:
36 # Unable to find depot_tools, so just use standard subprocess 36 # Unable to find depot_tools, so just use standard subprocess
37 import subprocess 37 import subprocess
38 38
39 SS_USAGE = "python %prog [options] path/to/test [gtest_args]" 39 SS_USAGE = "python %prog [options] path/to/test [gtest_args]"
40 SS_DEFAULT_NUM_CORES = 4 40 SS_DEFAULT_NUM_CORES = 4
41 SS_DEFAULT_SHARDS_PER_CORE = 5 # num_shards = cores * SHARDS_PER_CORE 41 SS_DEFAULT_SHARDS_PER_CORE = 5 # num_shards = cores * SHARDS_PER_CORE
42 SS_DEFAULT_RUNS_PER_CORE = 1 # num_workers = cores * RUNS_PER_CORE 42 SS_DEFAULT_RUNS_PER_CORE = 1 # num_workers = cores * RUNS_PER_CORE
43 SS_DEFAULT_RETRY_PERCENT = 5 # --retry-failed ignored if more than 5% fail 43 SS_DEFAULT_RETRY_PERCENT = 5 # --retry-failed ignored if more than 5% fail
44 SS_DEFAULT_TIMEOUT = 530 # Slightly less than buildbot's default 600 seconds 44 SS_DEFAULT_TIMEOUT = 530 # Slightly less than buildbot's default 600 seconds
45 SS_DEFAULT_SUITE_TOTAL_SHARDS = 1 # run the whole suite.
46 SS_DEFAULT_SUITE_SHARD_INDEX = 0 # run the first shard.
45 47
46 48
47 def DetectNumCores(): 49 def DetectNumCores():
48 """Detects the number of cores on the machine. 50 """Detects the number of cores on the machine.
49 51
50 Returns: 52 Returns:
51 The number of cores on the machine or DEFAULT_NUM_CORES if it could not 53 The number of cores on the machine or DEFAULT_NUM_CORES if it could not
52 be found. 54 be found.
53 """ 55 """
54 try: 56 try:
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
170 self.supervisor.LogOutputLine(index, line) 172 self.supervisor.LogOutputLine(index, line)
171 chars.close() 173 chars.close()
172 chars = cStringIO.StringIO() 174 chars = cStringIO.StringIO()
173 if self.current_test: 175 if self.current_test:
174 self.ReportFailure("INCOMPLETE", index, self.current_test) 176 self.ReportFailure("INCOMPLETE", index, self.current_test)
175 self.supervisor.ShardIndexCompleted(index) 177 self.supervisor.ShardIndexCompleted(index)
176 if shard.returncode != 0: 178 if shard.returncode != 0:
177 self.supervisor.LogShardFailure(index) 179 self.supervisor.LogShardFailure(index)
178 180
179 181
180 class ShardingSupervisor(object): 182 class ShardingSupervisor(object):
cmp 2012/04/04 23:00:28 not sure where to add the docs, here or below, but
nsylvain 2012/04/04 23:24:34 Done.
181 """Supervisor object that handles the worker threads. 183 """Supervisor object that handles the worker threads.
182 184
183 Attributes: 185 Attributes:
184 test: Name of the test to shard. 186 test: Name of the test to shard.
185 num_shards: Total number of shards to split the test into. 187 num_shards: Total number of shards to split the test into.
186 num_runs: Total number of worker threads to create for running shards. 188 num_runs: Total number of worker threads to create for running shards.
187 color: Indicates which coloring mode to use in the output. 189 color: Indicates which coloring mode to use in the output.
188 original_order: True if shard output should be printed as it comes. 190 original_order: True if shard output should be printed as it comes.
189 prefix: True if each line should indicate the shard index. 191 prefix: True if each line should indicate the shard index.
190 retry_percent: Integer specifying the max percent of tests to retry. 192 retry_percent: Integer specifying the max percent of tests to retry.
191 gtest_args: The options to pass to gtest. 193 gtest_args: The options to pass to gtest.
192 failed_tests: List of statements from shard output indicating a failure. 194 failed_tests: List of statements from shard output indicating a failure.
193 failed_shards: List of shards that contained failing tests. 195 failed_shards: List of shards that contained failing tests.
194 shards_completed: List of flags indicating which shards have finished. 196 shards_completed: List of flags indicating which shards have finished.
195 shard_output: Buffer that stores the output from each shard. 197 shard_output: Buffer that stores the output from each shard.
196 test_counter: Stores the total number of tests run. 198 test_counter: Stores the total number of tests run.
199 suite_total_shards: Total number of shards the suite is split into if we
cmp 2012/04/04 23:00:28 we agreed that suite_*_shards and suite_shard_* co
nsylvain 2012/04/04 23:24:34 Done.
200 want to run only a subset of the suite.
201 suite_shard_index: Shard index to run if we want to run only a subset of
202 the suite.
197 """ 203 """
198 204
199 SHARD_COMPLETED = object() 205 SHARD_COMPLETED = object()
200 206
201 def __init__(self, test, num_shards, num_runs, color, original_order, 207 def __init__(self, test, num_shards, num_runs, color, original_order,
202 prefix, retry_percent, timeout, gtest_args): 208 prefix, retry_percent, timeout, suite_total_shards,
209 suite_shard_index, gtest_args):
203 """Inits ShardingSupervisor with given options and gtest arguments.""" 210 """Inits ShardingSupervisor with given options and gtest arguments."""
204 self.test = test 211 self.test = test
205 self.num_shards = num_shards 212 self.num_shards_to_run = num_shards
213 self.num_shards = num_shards * suite_total_shards
214 self.suite_shard_index = suite_shard_index
206 self.num_runs = num_runs 215 self.num_runs = num_runs
207 self.color = color 216 self.color = color
208 self.original_order = original_order 217 self.original_order = original_order
209 self.prefix = prefix 218 self.prefix = prefix
210 self.retry_percent = retry_percent 219 self.retry_percent = retry_percent
211 self.timeout = timeout 220 self.timeout = timeout
212 self.gtest_args = gtest_args 221 self.gtest_args = gtest_args
213 self.failed_tests = [] 222 self.failed_tests = []
214 self.failed_shards = [] 223 self.failed_shards = []
215 self.shards_completed = [False] * num_shards 224 self.shards_completed = [False] * self.num_shards_to_run
216 self.shard_output = [Queue.Queue() for _ in range(num_shards)] 225 self.shard_output = [Queue.Queue() for _ in range(self.num_shards_to_run)]
217 self.test_counter = itertools.count() 226 self.test_counter = itertools.count()
218 227
219 def ShardTest(self): 228 def ShardTest(self):
220 """Runs the test and manages the worker threads. 229 """Runs the test and manages the worker threads.
221 230
222 Runs the test and outputs a summary at the end. All the tests in the 231 Runs the test and outputs a summary at the end. All the tests in the
223 suite are run by creating (cores * runs_per_core) threads and 232 suite are run by creating (cores * runs_per_core) threads and
224 (cores * shards_per_core) shards. When all the worker threads have 233 (cores * shards_per_core) shards. When all the worker threads have
225 finished, the lines saved in failed_tests are printed again. If enabled, 234 finished, the lines saved in failed_tests are printed again. If enabled,
226 and failed tests that do not have FLAKY or FAILS in their names are run 235 and failed tests that do not have FLAKY or FAILS in their names are run
(...skipping 15 matching lines...) Expand all
242 251
243 test_start = re.compile( 252 test_start = re.compile(
244 ansi_regex + r"\[\s+RUN\s+\] " + ansi_regex + test_name_regex) 253 ansi_regex + r"\[\s+RUN\s+\] " + ansi_regex + test_name_regex)
245 test_ok = re.compile( 254 test_ok = re.compile(
246 ansi_regex + r"\[\s+OK\s+\] " + ansi_regex + test_name_regex) 255 ansi_regex + r"\[\s+OK\s+\] " + ansi_regex + test_name_regex)
247 test_fail = re.compile( 256 test_fail = re.compile(
248 ansi_regex + r"\[\s+FAILED\s+\] " + ansi_regex + test_name_regex) 257 ansi_regex + r"\[\s+FAILED\s+\] " + ansi_regex + test_name_regex)
249 258
250 workers = [] 259 workers = []
251 counter = Queue.Queue() 260 counter = Queue.Queue()
252 for i in range(self.num_shards): 261 start_point = self.num_shards_to_run * self.suite_shard_index
262 for i in range(start_point, start_point + self.num_shards_to_run):
253 counter.put(i) 263 counter.put(i)
254 264
255 for i in range(self.num_runs): 265 for i in range(self.num_runs):
256 worker = ShardRunner( 266 worker = ShardRunner(
257 self, counter, test_start, test_ok, test_fail) 267 self, counter, test_start, test_ok, test_fail)
258 worker.start() 268 worker.start()
259 workers.append(worker) 269 workers.append(worker)
260 if self.original_order: 270 if self.original_order:
261 for worker in workers: 271 for worker in workers:
262 worker.join() 272 worker.join()
(...skipping 25 matching lines...) Expand all
288 298
289 def LogShardFailure(self, index): 299 def LogShardFailure(self, index):
290 """Records that a test in the given shard has failed.""" 300 """Records that a test in the given shard has failed."""
291 self.failed_shards.append(index) 301 self.failed_shards.append(index)
292 302
293 def WaitForShards(self): 303 def WaitForShards(self):
294 """Prints the output from each shard in consecutive order, waiting for 304 """Prints the output from each shard in consecutive order, waiting for
295 the current shard to finish before starting on the next shard. 305 the current shard to finish before starting on the next shard.
296 """ 306 """
297 try: 307 try:
298 for shard_index in range(self.num_shards): 308 for shard_index in range(self.num_shards_to_run):
299 while True: 309 while True:
300 try: 310 try:
301 line = self.shard_output[shard_index].get(True, self.timeout) 311 line = self.shard_output[shard_index].get(True, self.timeout)
302 except Queue.Empty: 312 except Queue.Empty:
303 # Shard timed out, notice failure and move on. 313 # Shard timed out, notice failure and move on.
304 self.LogShardFailure(shard_index) 314 self.LogShardFailure(shard_index)
305 # TODO(maruel): Print last test. It'd be simpler to have the 315 # TODO(maruel): Print last test. It'd be simpler to have the
306 # processing in the main thread. 316 # processing in the main thread.
307 # TODO(maruel): Make sure the worker thread terminates. 317 # TODO(maruel): Make sure the worker thread terminates.
308 sys.stdout.write('TIMED OUT\n\n') 318 sys.stdout.write('TIMED OUT\n\n')
309 LogTestFailure( 319 LogTestFailure(
310 'FAILURE: SHARD %d TIMED OUT; %d seconds' % ( 320 'FAILURE: SHARD %d TIMED OUT; %d seconds' % (
311 shard_index, self.timeout)) 321 shard_index, self.timeout))
312 break 322 break
313 if line is self.SHARD_COMPLETED: 323 if line is self.SHARD_COMPLETED:
314 break 324 break
315 sys.stdout.write(line) 325 sys.stdout.write(line)
316 except: 326 except:
317 sys.stdout.flush() 327 sys.stdout.flush()
318 print >> sys.stderr, 'CAUGHT EXCEPTION: dumping remaining data:' 328 print >> sys.stderr, 'CAUGHT EXCEPTION: dumping remaining data:'
319 for shard_index in range(self.num_shards): 329 for shard_index in range(self.num_shards_to_run):
320 while True: 330 while True:
321 try: 331 try:
322 line = self.shard_output[shard_index].get(False) 332 line = self.shard_output[shard_index].get(False)
323 except Queue.Empty: 333 except Queue.Empty:
324 # Shard timed out, notice failure and move on. 334 # Shard timed out, notice failure and move on.
325 self.LogShardFailure(shard_index) 335 self.LogShardFailure(shard_index)
326 break 336 break
327 if line is self.SHARD_COMPLETED: 337 if line is self.SHARD_COMPLETED:
328 break 338 break
329 sys.stderr.write(line) 339 sys.stderr.write(line)
330 raise 340 raise
331 341
332 def LogOutputLine(self, index, line): 342 def LogOutputLine(self, index, line):
333 """Either prints the shard output line immediately or saves it in the 343 """Either prints the shard output line immediately or saves it in the
334 output buffer, depending on the settings. Also optionally adds a prefix. 344 output buffer, depending on the settings. Also optionally adds a prefix.
335 """ 345 """
346 # Fix up the index.
347 array_index = index - (self.num_shards_to_run * self.suite_shard_index)
336 if self.prefix: 348 if self.prefix:
337 line = "%i>%s" % (index, line) 349 line = "%i>%s" % (index, line)
338 if self.original_order: 350 if self.original_order:
339 sys.stdout.write(line) 351 sys.stdout.write(line)
340 else: 352 else:
341 self.shard_output[index].put(line) 353 self.shard_output[array_index].put(line)
342 354
343 def IncrementTestCount(self): 355 def IncrementTestCount(self):
344 """Increments the number of tests run. This is relevant to the 356 """Increments the number of tests run. This is relevant to the
345 --retry-percent option. 357 --retry-percent option.
346 """ 358 """
347 self.test_counter.next() 359 self.test_counter.next()
348 360
349 def ShardIndexCompleted(self, index): 361 def ShardIndexCompleted(self, index):
350 """Records that a shard has finished so the output from the next shard 362 """Records that a shard has finished so the output from the next shard
351 can now be printed. 363 can now be printed.
352 """ 364 """
353 self.shard_output[index].put(self.SHARD_COMPLETED) 365 # Fix up the index.
366 array_index = index - (self.num_shards_to_run * self.suite_shard_index)
367 self.shard_output[array_index].put(self.SHARD_COMPLETED)
354 368
355 def RetryFailedTests(self): 369 def RetryFailedTests(self):
356 """Reruns any failed tests serially and prints another summary of the 370 """Reruns any failed tests serially and prints another summary of the
357 results if no more than retry_percent failed. 371 results if no more than retry_percent failed.
358 """ 372 """
359 num_tests_run = self.test_counter.next() 373 num_tests_run = self.test_counter.next()
360 if len(self.failed_tests) > self.retry_percent * num_tests_run: 374 if len(self.failed_tests) > self.retry_percent * num_tests_run:
361 sys.stderr.write("\nNOT RETRYING FAILED TESTS (too many failed)\n") 375 sys.stderr.write("\nNOT RETRYING FAILED TESTS (too many failed)\n")
362 return 1 376 return 1
363 self.WriteText(sys.stderr, "\nRETRYING FAILED TESTS:\n", "\x1b[1;5;33m") 377 self.WriteText(sys.stderr, "\nRETRYING FAILED TESTS:\n", "\x1b[1;5;33m")
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
440 "--retry-failed", action="store_true", 454 "--retry-failed", action="store_true",
441 help="retry tests that did not pass serially") 455 help="retry tests that did not pass serially")
442 parser.add_option( 456 parser.add_option(
443 "--retry-percent", type="int", 457 "--retry-percent", type="int",
444 default=SS_DEFAULT_RETRY_PERCENT, 458 default=SS_DEFAULT_RETRY_PERCENT,
445 help="ignore --retry-failed if more than this percent fail [0, 100]" 459 help="ignore --retry-failed if more than this percent fail [0, 100]"
446 " (default = %i)" % SS_DEFAULT_RETRY_PERCENT) 460 " (default = %i)" % SS_DEFAULT_RETRY_PERCENT)
447 parser.add_option( 461 parser.add_option(
448 "-t", "--timeout", type="int", default=SS_DEFAULT_TIMEOUT, 462 "-t", "--timeout", type="int", default=SS_DEFAULT_TIMEOUT,
449 help="timeout in seconds to wait for a shard (default=%default s)") 463 help="timeout in seconds to wait for a shard (default=%default s)")
464 parser.add_option(
465 "--suite-total-shards", type="int", default=SS_DEFAULT_SUITE_TOTAL_SHARDS,
466 help="number of shards when running only a subset of the suite")
cmp 2012/04/04 23:00:28 change to 'if running a subset, ...'
nsylvain 2012/04/04 23:24:34 rewrote that, but still not happy.
467 parser.add_option(
468 "--suite-shard-index", type="int", default=SS_DEFAULT_SUITE_SHARD_INDEX,
469 help="index of the shard to run when running only a subset of the suite")
cmp 2012/04/04 23:00:28 change to 'if running a subset, ...'
470
450 parser.disable_interspersed_args() 471 parser.disable_interspersed_args()
451 (options, args) = parser.parse_args() 472 (options, args) = parser.parse_args()
452 473
453 if not args: 474 if not args:
454 parser.error("You must specify a path to test!") 475 parser.error("You must specify a path to test!")
455 if not os.path.exists(args[0]): 476 if not os.path.exists(args[0]):
456 parser.error("%s does not exist!" % args[0]) 477 parser.error("%s does not exist!" % args[0])
457 478
458 num_cores = DetectNumCores() 479 num_cores = DetectNumCores()
459 480
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
491 if (options.runshard < 0 or options.runshard >= num_shards): 512 if (options.runshard < 0 or options.runshard >= num_shards):
492 parser.error("Invalid shard number given parameters!") 513 parser.error("Invalid shard number given parameters!")
493 shard = RunShard( 514 shard = RunShard(
494 args[0], num_shards, options.runshard, gtest_args, None, None) 515 args[0], num_shards, options.runshard, gtest_args, None, None)
495 shard.communicate() 516 shard.communicate()
496 return shard.poll() 517 return shard.poll()
497 518
498 # shard and run the whole test 519 # shard and run the whole test
499 ss = ShardingSupervisor( 520 ss = ShardingSupervisor(
500 args[0], num_shards, num_runs, options.color, options.original_order, 521 args[0], num_shards, num_runs, options.color, options.original_order,
501 options.prefix, options.retry_percent, options.timeout, gtest_args) 522 options.prefix, options.retry_percent, options.timeout,
523 options.suite_total_shards, options.suite_shard_index, gtest_args)
502 return ss.ShardTest() 524 return ss.ShardTest()
503 525
504 526
505 if __name__ == "__main__": 527 if __name__ == "__main__":
506 sys.exit(main()) 528 sys.exit(main())
OLDNEW
« no previous file with comments | « dummy_test.py ('k') | sharding_supervisor_unittest.py » ('j') | sharding_supervisor_unittest.py » ('J')

Powered by Google App Engine
This is Rietveld 408576698