Index: sharding_supervisor.py |
=================================================================== |
--- sharding_supervisor.py (revision 130689) |
+++ sharding_supervisor.py (working copy) |
@@ -42,6 +42,8 @@ |
SS_DEFAULT_RUNS_PER_CORE = 1 # num_workers = cores * RUNS_PER_CORE |
SS_DEFAULT_RETRY_PERCENT = 5 # --retry-failed ignored if more than 5% fail |
SS_DEFAULT_TIMEOUT = 530 # Slightly less than buildbot's default 600 seconds |
+SS_DEFAULT_TOTAL_SLAVES = 1 # run the whole suite. |
cmp
2012/04/05 19:07:49
yes!
|
+SS_DEFAULT_SLAVE_INDEX = 0 # run the tests for the first slave. |
def DetectNumCores(): |
@@ -66,7 +68,7 @@ |
return SS_DEFAULT_NUM_CORES |
-def RunShard(test, num_shards, index, gtest_args, stdout, stderr): |
+def RunShard(test, total_shards, index, gtest_args, stdout, stderr): |
cmp
2012/04/05 19:07:49
thank you!
|
"""Runs a single test shard in a subprocess. |
Returns: |
@@ -75,7 +77,7 @@ |
args = [test] |
args.extend(gtest_args) |
env = os.environ.copy() |
- env["GTEST_TOTAL_SHARDS"] = str(num_shards) |
+ env["GTEST_TOTAL_SHARDS"] = str(total_shards) |
env["GTEST_SHARD_INDEX"] = str(index) |
# Use a unique log file for each shard |
@@ -155,7 +157,7 @@ |
chars = cStringIO.StringIO() |
shard_running = True |
shard = RunShard( |
- self.supervisor.test, self.supervisor.num_shards, index, |
+ self.supervisor.test, self.supervisor.total_shards, index, |
self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT) |
while shard_running: |
char = shard.stdout.read(1) |
@@ -182,7 +184,7 @@ |
Attributes: |
test: Name of the test to shard. |
- num_shards: Total number of shards to split the test into. |
+ num_shards_to_run: Total number of shards to split the test into. |
num_runs: Total number of worker threads to create for running shards. |
color: Indicates which coloring mode to use in the output. |
original_order: True if shard output should be printed as it comes. |
@@ -194,15 +196,33 @@ |
shards_completed: List of flags indicating which shards have finished. |
shard_output: Buffer that stores the output from each shard. |
test_counter: Stores the total number of tests run. |
+ total_slaves: Total number of slaves running this test. |
+ slave_index: Current slave to run tests for. |
+ |
+ If total_slaves is set, we run only a subset of the tests. This is meant to be |
+ used when we want to shard across machines as well as across cpus. In that |
+ case the number of shards to execute will be the same, but they will be |
+ smaller, as the total number of shards in the test suite will be multiplied |
+ by 'total_slaves'. |
+ |
+ For example, if you are on a quad core machine, the sharding supervisor by |
+ default will use 20 shards for the whole suite. However, if you set |
+ total_slaves to 2, it will split the suite in 40 shards and will only run |
+ shards [0-19] or shards [20-39] depending if you set slave_index to 0 or 1. |
cmp
2012/04/05 19:07:49
will help a lot, thanks
|
""" |
SHARD_COMPLETED = object() |
- def __init__(self, test, num_shards, num_runs, color, original_order, |
- prefix, retry_percent, timeout, gtest_args): |
+ def __init__(self, test, num_shards_to_run, num_runs, color, original_order, |
+ prefix, retry_percent, timeout, total_slaves, slave_index, |
+ gtest_args): |
"""Inits ShardingSupervisor with given options and gtest arguments.""" |
self.test = test |
- self.num_shards = num_shards |
+ # Number of shards to run locally. |
+ self.num_shards_to_run = num_shards_to_run |
+ # Total shards in the test suite running across all slaves. |
+ self.total_shards = num_shards_to_run * total_slaves |
+ self.slave_index = slave_index |
self.num_runs = num_runs |
self.color = color |
self.original_order = original_order |
@@ -212,8 +232,8 @@ |
self.gtest_args = gtest_args |
self.failed_tests = [] |
self.failed_shards = [] |
- self.shards_completed = [False] * num_shards |
- self.shard_output = [Queue.Queue() for _ in range(num_shards)] |
+ self.shards_completed = [False] * self.num_shards_to_run |
+ self.shard_output = [Queue.Queue() for _ in range(self.num_shards_to_run)] |
self.test_counter = itertools.count() |
def ShardTest(self): |
@@ -249,7 +269,8 @@ |
workers = [] |
counter = Queue.Queue() |
- for i in range(self.num_shards): |
+ start_point = self.num_shards_to_run * self.slave_index |
+ for i in range(start_point, start_point + self.num_shards_to_run): |
counter.put(i) |
for i in range(self.num_runs): |
@@ -295,7 +316,7 @@ |
the current shard to finish before starting on the next shard. |
""" |
try: |
- for shard_index in range(self.num_shards): |
+ for shard_index in range(self.num_shards_to_run): |
while True: |
try: |
line = self.shard_output[shard_index].get(True, self.timeout) |
@@ -316,7 +337,7 @@ |
except: |
sys.stdout.flush() |
print >> sys.stderr, 'CAUGHT EXCEPTION: dumping remaining data:' |
- for shard_index in range(self.num_shards): |
+ for shard_index in range(self.num_shards_to_run): |
while True: |
try: |
line = self.shard_output[shard_index].get(False) |
@@ -333,12 +354,14 @@ |
"""Either prints the shard output line immediately or saves it in the |
output buffer, depending on the settings. Also optionally adds a prefix. |
""" |
+ # Fix up the index. |
+ array_index = index - (self.num_shards_to_run * self.slave_index) |
if self.prefix: |
line = "%i>%s" % (index, line) |
if self.original_order: |
sys.stdout.write(line) |
else: |
- self.shard_output[index].put(line) |
+ self.shard_output[array_index].put(line) |
def IncrementTestCount(self): |
"""Increments the number of tests run. This is relevant to the |
@@ -350,7 +373,9 @@ |
"""Records that a shard has finished so the output from the next shard |
can now be printed. |
""" |
- self.shard_output[index].put(self.SHARD_COMPLETED) |
+ # Fix up the index. |
+ array_index = index - (self.num_shards_to_run * self.slave_index) |
+ self.shard_output[array_index].put(self.SHARD_COMPLETED) |
def RetryFailedTests(self): |
"""Reruns any failed tests serially and prints another summary of the |
@@ -447,6 +472,13 @@ |
parser.add_option( |
"-t", "--timeout", type="int", default=SS_DEFAULT_TIMEOUT, |
help="timeout in seconds to wait for a shard (default=%default s)") |
+ parser.add_option( |
+ "--total-slaves", type="int", default=SS_DEFAULT_TOTAL_SLAVES, |
+ help="if running a subset, number of slaves sharing the test") |
+ parser.add_option( |
+ "--slave-index", type="int", default=SS_DEFAULT_SLAVE_INDEX, |
+ help="if running a subset, index of the slave to run tests for") |
+ |
parser.disable_interspersed_args() |
(options, args) = parser.parse_args() |
@@ -459,7 +491,7 @@ |
if options.shards_per_core < 1: |
parser.error("You must have at least 1 shard per core!") |
- num_shards = num_cores * options.shards_per_core |
+ num_shards_to_run = num_cores * options.shards_per_core |
if options.runs_per_core < 1: |
parser.error("You must have at least 1 run per core!") |
@@ -488,17 +520,18 @@ |
if options.runshard != None: |
# run a single shard and exit |
- if (options.runshard < 0 or options.runshard >= num_shards): |
+ if (options.runshard < 0 or options.runshard >= num_shards_to_run): |
parser.error("Invalid shard number given parameters!") |
shard = RunShard( |
- args[0], num_shards, options.runshard, gtest_args, None, None) |
+ args[0], num_shards_to_run, options.runshard, gtest_args, None, None) |
shard.communicate() |
return shard.poll() |
# shard and run the whole test |
ss = ShardingSupervisor( |
- args[0], num_shards, num_runs, options.color, options.original_order, |
- options.prefix, options.retry_percent, options.timeout, gtest_args) |
+ args[0], num_shards_to_run, num_runs, options.color, |
+ options.original_order, options.prefix, options.retry_percent, |
+ options.timeout, options.total_slaves, options.slave_index, gtest_args) |
return ss.ShardTest() |