OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. |
| 5 |
| 6 """Triggers a ton of fake jobs to test its handling under high load. |
| 7 |
| 8 Generates an histogram with the latencies to process the tasks and number of |
| 9 retries. |
| 10 """ |
| 11 |
| 12 import hashlib |
| 13 import json |
| 14 import logging |
| 15 import optparse |
| 16 import os |
| 17 import Queue |
| 18 import socket |
| 19 import StringIO |
| 20 import sys |
| 21 import threading |
| 22 import time |
| 23 import zipfile |
| 24 |
| 25 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 26 |
| 27 sys.path.insert(0, ROOT_DIR) |
| 28 |
| 29 from third_party import colorama |
| 30 |
| 31 from utils import graph |
| 32 from utils import net |
| 33 from utils import threading_utils |
| 34 |
| 35 # Line too long (NN/80) |
| 36 # pylint: disable=C0301 |
| 37 |
| 38 |
| 39 TASK_OUTPUT = 'This task ran with great success' |
| 40 |
| 41 |
| 42 def print_results(results, columns, buckets): |
| 43 delays = [i for i in results if isinstance(i, float)] |
| 44 failures = [i for i in results if not isinstance(i, float)] |
| 45 |
| 46 print('%sDELAYS%s:' % (colorama.Fore.RED, colorama.Fore.RESET)) |
| 47 graph.print_histogram( |
| 48 graph.generate_histogram(delays, buckets), columns, ' %.3f') |
| 49 print('') |
| 50 print('Total items : %d' % len(results)) |
| 51 average = 0 |
| 52 if delays: |
| 53 average = sum(delays)/ len(delays) |
| 54 print('Average delay: %s' % graph.to_units(average)) |
| 55 print('') |
| 56 |
| 57 if failures: |
| 58 print('%sEVENTS%s:' % (colorama.Fore.RED, colorama.Fore.RESET)) |
| 59 values = {} |
| 60 for f in failures: |
| 61 values.setdefault(f, 0) |
| 62 values[f] += 1 |
| 63 graph.print_histogram(values, columns, ' %s') |
| 64 print('') |
| 65 |
| 66 |
| 67 def calculate_version(url): |
| 68 """Retrieves the swarm_bot code and returns the SHA-1 for it.""" |
| 69 # Cannot use url_open() since zipfile requires .seek(). |
| 70 archive = zipfile.ZipFile(StringIO.StringIO(net.url_read(url))) |
| 71 # See |
| 72 # https://code.google.com/p/swarming/source/browse/src/common/version.py?repo=
swarming-server |
| 73 files = ( |
| 74 'slave_machine.py', |
| 75 'swarm_bot/local_test_runner.py', |
| 76 'common/__init__.py', |
| 77 'common/swarm_constants.py', |
| 78 'common/version.py', |
| 79 'common/test_request_message.py', |
| 80 'common/url_helper.py', |
| 81 ) |
| 82 d = hashlib.sha1() |
| 83 for f in files: |
| 84 d.update(archive.read(f)) |
| 85 return d.hexdigest() |
| 86 |
| 87 |
| 88 class FakeSwarmBot(object): |
| 89 """This is a Fake swarm_bot implementation simulating it is running |
| 90 Comodore64. |
| 91 |
| 92 It polls for job, acts as if it was processing them and return the fake |
| 93 result. |
| 94 """ |
| 95 def __init__( |
| 96 self, swarming_url, swarm_bot_hash, index, progress, duration, |
| 97 events, kill_event): |
| 98 self._lock = threading.Lock() |
| 99 self._swarming = swarming_url |
| 100 self._index = index |
| 101 self._progress = progress |
| 102 self._duration = duration |
| 103 self._events = events |
| 104 self._kill_event = kill_event |
| 105 # Use an impossible hostname. |
| 106 self._machine_id = '%s-%d' % (socket.getfqdn().lower(), index) |
| 107 |
| 108 # See |
| 109 # https://code.google.com/p/swarming/source/browse/src/swarm_bot/slave_machi
ne.py?repo=swarming-server |
| 110 # and |
| 111 # https://chromium.googlesource.com/chromium/tools/build.git/ \ |
| 112 # +/master/scripts/tools/swarm_bootstrap/swarm_bootstrap.py |
| 113 # for more details. |
| 114 self._attributes = { |
| 115 'dimensions': { |
| 116 # Use improbable values to reduce the chance of interferring with real |
| 117 # slaves. |
| 118 'bits': '36', |
| 119 'machine': os.uname()[4] + '-experimental', |
| 120 'os': ['Comodore64'], |
| 121 }, |
| 122 'id': self._machine_id, |
| 123 'try_count': 0, |
| 124 'tag': self._machine_id, |
| 125 'version': swarm_bot_hash, |
| 126 } |
| 127 |
| 128 self._thread = threading.Thread(target=self._run, name='bot%d' % index) |
| 129 self._thread.daemon = True |
| 130 self._thread.start() |
| 131 |
| 132 def join(self): |
| 133 self._thread.join() |
| 134 |
| 135 def is_alive(self): |
| 136 return self._thread.is_alive() |
| 137 |
| 138 def _run(self): |
| 139 try: |
| 140 self._progress.update_item('%d alive' % self._index, bots=1) |
| 141 while True: |
| 142 if self._kill_event.get(): |
| 143 return |
| 144 data = {'attributes': json.dumps(self._attributes)} |
| 145 request = net.url_open(self._swarming + '/poll_for_test', data=data) |
| 146 if request is None: |
| 147 self._events.put('poll_for_test_empty') |
| 148 continue |
| 149 start = time.time() |
| 150 try: |
| 151 manifest = json.load(request) |
| 152 except ValueError: |
| 153 self._progress.update_item('Failed to poll') |
| 154 self._events.put('poll_for_test_invalid') |
| 155 continue |
| 156 |
| 157 commands = [c['function'] for c in manifest.get('commands', [])] |
| 158 if not commands: |
| 159 # Nothing to run. |
| 160 self._events.put('sleep') |
| 161 time.sleep(manifest['come_back']) |
| 162 continue |
| 163 |
| 164 if commands == ['UpdateSlave']: |
| 165 # Calculate the proper SHA-1 and loop again. |
| 166 # This could happen if the Swarming server is upgraded while this |
| 167 # script runs. |
| 168 self._attributes['version'] = calculate_version( |
| 169 manifest['commands'][0]['args']) |
| 170 self._events.put('update_slave') |
| 171 continue |
| 172 |
| 173 if commands != ['StoreFiles', 'RunCommands']: |
| 174 self._progress.update_item( |
| 175 'Unexpected RPC call %s\n%s' % (commands, manifest)) |
| 176 self._events.put('unknown_rpc') |
| 177 break |
| 178 |
| 179 # The normal way Swarming works is that it 'stores' a test_run.swarm |
| 180 # file and then defer control to swarm_bot/local_test_runner.py. |
| 181 store_cmd = manifest['commands'][0] |
| 182 assert len(store_cmd['args']) == 1, store_cmd['args'] |
| 183 filepath, filename, test_run_content = store_cmd['args'][0] |
| 184 assert filepath == '' |
| 185 assert filename == 'test_run.swarm' |
| 186 assert 'local_test_runner.py' in manifest['commands'][1]['args'][0], ( |
| 187 manifest['commands'][1]) |
| 188 result_url = manifest['result_url'] |
| 189 test_run = json.loads(test_run_content) |
| 190 assert result_url == test_run['result_url'] |
| 191 ping_url = test_run['ping_url'] |
| 192 ping_delay = test_run['ping_delay'] |
| 193 self._progress.update_item('%d processing' % self._index, processing=1) |
| 194 |
| 195 # Fake activity and send pings as requested. |
| 196 while True: |
| 197 remaining = max(0, time.time() - start - self._duration) |
| 198 if remaining > ping_delay: |
| 199 result = net.url_read(ping_url) |
| 200 assert result == 'OK' |
| 201 remaining = max(0, time.time() - start - self._duration) |
| 202 if not remaining: |
| 203 break |
| 204 time.sleep(remaining) |
| 205 |
| 206 data = { |
| 207 'c': test_run['configuration']['config_name'], |
| 208 'n': test_run['test_run_name'], |
| 209 'o': False, |
| 210 'result_output': TASK_OUTPUT, |
| 211 's': True, |
| 212 'x': '0', |
| 213 } |
| 214 result = net.url_read(manifest['result_url'], data=data) |
| 215 self._progress.update_item( |
| 216 '%d processed' % self._index, processing=-1, processed=1) |
| 217 if not result: |
| 218 self._events.put('result_url_fail') |
| 219 else: |
| 220 assert result == 'Successfully update the runner results.', result |
| 221 self._events.put(time.time() - start) |
| 222 finally: |
| 223 try: |
| 224 # Unregister itself. Otherwise the server will have tons of fake slaves |
| 225 # that the admin will have to remove manually. |
| 226 response = net.url_open( |
| 227 self._swarming + '/delete_machine_stats', |
| 228 data=[('r', self._machine_id)]) |
| 229 if not response: |
| 230 self._events.put('failed_unregister') |
| 231 else: |
| 232 response.read() |
| 233 finally: |
| 234 self._progress.update_item('%d quit' % self._index, bots=-1) |
| 235 |
| 236 |
| 237 def main(): |
| 238 colorama.init() |
| 239 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
| 240 parser.add_option( |
| 241 '-S', '--swarming', |
| 242 metavar='URL', default='', |
| 243 help='Swarming server to use') |
| 244 |
| 245 group = optparse.OptionGroup(parser, 'Load generated') |
| 246 group.add_option( |
| 247 '--slaves', type='int', default=300, metavar='N', |
| 248 help='Number of swarm bot slaves, default: %default') |
| 249 group.add_option( |
| 250 '-c', '--consume', type='float', default=60., metavar='N', |
| 251 help='Duration (s) for consuming a request, default: %default') |
| 252 parser.add_option_group(group) |
| 253 |
| 254 group = optparse.OptionGroup(parser, 'Display options') |
| 255 group.add_option( |
| 256 '--columns', type='int', default=graph.get_console_width(), metavar='N', |
| 257 help='For histogram display, default:%default') |
| 258 group.add_option( |
| 259 '--buckets', type='int', default=20, metavar='N', |
| 260 help='Number of buckets for histogram display, default:%default') |
| 261 parser.add_option_group(group) |
| 262 |
| 263 parser.add_option( |
| 264 '--dump', metavar='FOO.JSON', help='Dumps to json file') |
| 265 parser.add_option( |
| 266 '-v', '--verbose', action='store_true', help='Enables logging') |
| 267 |
| 268 options, args = parser.parse_args() |
| 269 logging.basicConfig(level=logging.INFO if options.verbose else logging.FATAL) |
| 270 if args: |
| 271 parser.error('Unsupported args: %s' % args) |
| 272 options.swarming = options.swarming.rstrip('/') |
| 273 if not options.swarming: |
| 274 parser.error('--swarming is required.') |
| 275 if options.consume <= 0: |
| 276 parser.error('Needs --consume > 0. 0.01 is a valid value.') |
| 277 |
| 278 print( |
| 279 'Running %d slaves, each task lasting %.1fs' % ( |
| 280 options.slaves, options.consume)) |
| 281 print('Ctrl-C to exit.') |
| 282 print('[processing/processed/bots]') |
| 283 columns = [('processing', 0), ('processed', 0), ('bots', 0)] |
| 284 progress = threading_utils.Progress(columns) |
| 285 events = Queue.Queue() |
| 286 start = time.time() |
| 287 kill_event = threading_utils.Bit() |
| 288 swarm_bot_hash = calculate_version(options.swarming + '/get_slave_code') |
| 289 slaves = [ |
| 290 FakeSwarmBot( |
| 291 options.swarming, swarm_bot_hash, i, progress, options.consume, |
| 292 events, kill_event) |
| 293 for i in range(options.slaves) |
| 294 ] |
| 295 try: |
| 296 # Wait for all the slaves to come alive. |
| 297 while not all(s.is_alive() for s in slaves): |
| 298 time.sleep(0.01) |
| 299 progress.update_item('Ready to run') |
| 300 while slaves: |
| 301 progress.print_update() |
| 302 time.sleep(0.01) |
| 303 # The slaves could be told to die. |
| 304 slaves = [s for s in slaves if s.is_alive()] |
| 305 except KeyboardInterrupt: |
| 306 kill_event.set() |
| 307 |
| 308 progress.update_item('Waiting for slaves to quit.', raw=True) |
| 309 progress.update_item('') |
| 310 while slaves: |
| 311 progress.print_update() |
| 312 slaves = [s for s in slaves if s.is_alive()] |
| 313 # At this point, progress is not used anymore. |
| 314 print('') |
| 315 print('Ran for %.1fs.' % (time.time() - start)) |
| 316 print('') |
| 317 results = events.queue |
| 318 print_results(results, options.columns, options.buckets) |
| 319 if options.dump: |
| 320 with open(options.dump, 'w') as f: |
| 321 json.dump(results, f, separators=(',',':')) |
| 322 return 0 |
| 323 |
| 324 |
| 325 if __name__ == '__main__': |
| 326 sys.exit(main()) |
OLD | NEW |