Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(306)

Side by Side Diff: scripts/slave/swarming/swarming_run_shim.py

Issue 139343011: Add swarming_run_shim.py to run swarming tasks as annotated tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Address comments Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2014 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Drives tests on Swarming. Both trigger and collect results.
7
8 This is the shim that is called through buildbot.
9 """
10
11 import logging
12 import optparse
13 import os
14 import subprocess
15 import sys
16 import threading
17 import Queue
18
19 from common import chromium_utils
20 from common import find_depot_tools # pylint: disable=W0611
21
22 from common import annotator
23 from slave.swarming import swarming_utils
24
25 # From depot tools/
26 import fix_encoding
27
28
29 def v0_3(
30 client, swarming_server, isolate_server, priority, dimensions,
31 task_name, isolated_hash, env, shards):
32 """Handles swarm_client/swarming.py starting 7c543276f08.
33
34 It was rolled in src on r237619 on 2013-11-27.
35 """
36 cmd = [
37 sys.executable,
38 os.path.join(client, 'swarming.py'),
39 'run',
40 '--swarming', swarming_server,
41 '--isolate-server', isolate_server,
42 '--priority', str(priority),
43 '--shards', str(shards),
44 '--task-name', task_name,
45 isolated_hash,
46 ]
47 for name, value in dimensions.iteritems():
48 if name != 'os':
49 cmd.extend(('--dimension', name, value))
50 else:
51 # Sadly, older version of swarming.py need special handling of os.
52 old_value = [
53 k for k, v in swarming_utils.OS_MAPPING.iteritems() if v == value
54 ]
55 assert len(old_value) == 1
56 cmd.extend(('--os', old_value[0]))
57
58 # Enable profiling on the -dev server.
59 if '-dev' in swarming_server:
60 cmd.append('--profile')
61 for name, value in env.iteritems():
62 cmd.extend(('--env', name, value))
63 return cmd
64
65
66 def v0_4(
67 client, swarming_server, isolate_server, priority, dimensions,
68 task_name, isolated_hash, env, shards):
69 """Handles swarm_client/swarming.py starting b39e8cf08c.
70
71 It was rolled in src on r246113 on 2014-01-21.
72 """
73 cmd = [
74 sys.executable,
75 os.path.join(client, 'swarming.py'),
76 'run',
77 '--swarming', swarming_server,
78 '--isolate-server', isolate_server,
79 '--priority', str(priority),
80 '--shards', str(shards),
81 '--task-name', task_name,
82 isolated_hash,
83 ]
84 for name, value in dimensions.iteritems():
85 cmd.extend(('--dimension', name, value))
86 # Enable profiling on the -dev server.
87 if '-dev' in swarming_server:
88 cmd.append('--profile')
89 for name, value in env.iteritems():
90 cmd.extend(('--env', name, value))
91 return cmd
92
93
94 def stream_process(cmd):
95 """Calls process cmd and yields its output.
96
97 This is not the most efficient nor safe way to do it but it is only meant to
98 be run on linux so it should be fine. Fix if necessary.
99 """
100 p = subprocess.Popen(
101 cmd, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
102 try:
103 while True:
104 try:
105 i = p.stdout.readline()
106 if i:
107 yield i
108 continue
109 except OSError:
110 pass
111 if p.poll() is not None:
112 break
113 yield p.returncode
114 finally:
115 if p.poll() is None:
116 p.kill()
117
118
119 def drive_one(
120 client, version, swarming_server, isolate_server, priority, dimensions,
121 task_name, isolated_hash, env, shards, out):
122 """Executes the proper handler based on the code layout and --version support.
123 """
124 def send_back(l):
125 out.put((task_name, l))
126 if version < (0, 4):
127 cmd = v0_3(
128 client, swarming_server, isolate_server, priority, dimensions,
129 task_name, isolated_hash, env, shards)
130 else:
131 cmd = v0_4(
132 client, swarming_server, isolate_server, priority, dimensions,
133 task_name, isolated_hash, env, shards)
134 try:
135 for i in stream_process(cmd):
136 send_back(i)
137 except Exception as e:
138 send_back(e)
139
140
141 def drive_many(
142 client, version, swarming_server, isolate_server, priority, dimensions,
143 steps):
144 logging.info(
145 'drive_many(%s, %s, %s, %s, %s, %s, %s)',
146 client, version, swarming_server, isolate_server, priority, dimensions,
147 steps)
148 return _drive_many(
149 client, version, swarming_server, isolate_server, priority, dimensions,
150 steps, Queue.Queue())
151
152
153 def step_name_to_cursor(x):
154 """The cursor is buildbot's step name. It is only the base test name for
155 simplicity.
156
157 But the swarming task name is longer, it is
158 "<name>/<dimensions>/<isolated hash>".
159 """
160 return x.split('/', 1)[0]
161
162
163 def _drive_many(
164 client, version, swarming_server, isolate_server, priority, dimensions,
165 steps, out):
166 """Internal version, exposed so it can be hooked in test."""
167 stream = annotator.AdvancedAnnotationStream(sys.stdout, False)
ghost stip (do not use) 2014/01/24 22:47:26 stream.honor_zero_return_code() if you want the re
168 for step_name in sorted(steps):
169 # Seeds the step first before doing the cursors otherwise it is interleaved
170 # in the logs of other steps.
171 stream.seed_step(step_name)
172
173 threads = []
174 # Create the boxes in buildbot in order for consistency.
175 steps_annotations = {}
176 for step_name, isolated_hash in sorted(steps.iteritems()):
177 env = {}
178 # TODO(maruel): Propagate GTEST_FILTER.
179 #if gtest_filter not in (None, '', '.', '*'):
180 # env['GTEST_FILTER'] = gtest_filter
181 shards = swarming_utils.TESTS_SHARDS.get(step_name, 1)
182 # This will be the key in steps_annotations.
183 task_name = '%s/%s/%s' % (step_name, dimensions['os'], isolated_hash)
184 t = threading.Thread(
185 target=drive_one,
186 args=(client, version, swarming_server, isolate_server, priority,
187 dimensions, task_name, isolated_hash, env, shards, out))
188 t.daemon = True
189 t.start()
190 threads.append(t)
191 steps_annotations[task_name] = annotator.AdvancedAnnotationStep(
192 sys.stdout, False)
193 items = task_name.split('/', 2)
194 assert step_name == items[0]
195 assert step_name == step_name_to_cursor(task_name)
196 # It is important data to surface through buildbot.
197 stream.step_cursor(step_name)
ghost stip (do not use) 2014/01/24 22:47:26 steps_annotations[task_name].step_started()
M-A Ruel 2014/01/25 02:12:20 It's not really necessary.
198 steps_annotations[task_name].step_text(items[1])
199 steps_annotations[task_name].step_text(items[2])
200 collect(stream, steps_annotations, out)
201 return 0
202
203
204 def collect(stream, steps_annotations, out):
205 last_cursor = None
206 while steps_annotations:
207 try:
208 # Polling FTW.
209 packet = out.get(timeout=1)
210 except Queue.Empty:
211 continue
212 task_name, item = packet
213 if isinstance(item, int):
214 # Signals it's completed.
215 if last_cursor != task_name:
ghost stip (do not use) 2014/01/24 22:47:26 might want to take lines 215-217, make it into a s
M-A Ruel 2014/01/25 02:12:20 Cleaned up.
216 stream.step_cursor(step_name_to_cursor(task_name))
217 last_cursor = task_name
218 if item:
219 steps_annotations[task_name].step_failure()
220 steps_annotations[task_name].step_closed()
221 del steps_annotations[task_name]
222 last_cursor = None
223 elif isinstance(item, Exception):
224 if last_cursor != task_name:
225 stream.step_cursor(step_name_to_cursor(task_name))
226 last_cursor = task_name
227 steps_annotations[task_name].step_failure()
228 del steps_annotations[task_name]
229 last_cursor = None
230 else:
231 assert isinstance(item, str), item
232 if last_cursor != task_name:
233 stream.step_cursor(step_name_to_cursor(task_name))
234 last_cursor = task_name
235 sys.stdout.write(item)
236 out.task_done()
237
238
239 def determine_steps_to_run(isolated_hashes, default_swarming_tests, testfilter):
240 """Returns a dict of test:hash for the test that should be run through
241 Swarming.
242
243 This is done by looking at the build properties to figure out what should be
244 run.
245 """
246 logging.info(
247 'determine_steps_to_run(%s, %s, %s)',
248 isolated_hashes, default_swarming_tests, testfilter)
249 # TODO(maruel): Support gtest filter.
250 def should_run(name):
251 return (
252 ((name in default_swarming_tests or not default_swarming_tests) and
253 'defaulttests' in testfilter) or
254 (name + '_swarm' in testfilter))
255
256 return dict(
257 (name, isolated_hash)
258 for name, isolated_hash in isolated_hashes.iteritems()
259 if should_run(name))
260
261
262 def process_build_properties(options):
263 """Converts build properties and factory properties into expected flags."""
264 # target_os is not defined when using a normal builder, contrary to a
265 # xx_swarm_triggered buildbot<->swarming builder, and it's not needed since
266 # the OS match, it's defined in builder/tester configurations.
267 slave_os = options.build_properties.get('target_os', sys.platform)
268 priority = swarming_utils.build_to_priority(options.build_properties)
269 steps = determine_steps_to_run(
270 options.build_properties.get('swarm_hashes', {}),
271 options.build_properties.get('run_default_swarm_tests', []),
272 options.build_properties.get('testfilter', ['defaulttests']))
273 return slave_os, priority, steps
274
275
276 def main(args):
277 """Note: this is solely to run the current master's code and can totally
278 differ from the underlying script flags.
279
280 To update these flags:
281 - Update the following code to support both the previous flag and the new
282 flag.
283 - Change scripts/master/factory/swarm_commands.py to pass the new flag.
284 - Restart all the masters using swarming.
285 - Remove the old flag from this code.
286 """
287 client = swarming_utils.find_client(os.getcwd())
288 if not client:
289 print >> sys.stderr, 'Failed to find swarm(ing)_client'
290 return 1
291 version = swarming_utils.get_version(client)
292 if version < (0, 3):
293 print >> sys.stderr, (
294 '%s is version %s which is too old. Please run the test locally' %
295 (client, '.'.join(version)))
296 return 1
297
298 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
299 parser.add_option('--verbose', action='store_true')
300 parser.add_option('--swarming')
301 parser.add_option('--isolate-server')
302 chromium_utils.AddPropertiesOptions(parser)
303 options, args = parser.parse_args(args)
304 if args:
305 parser.error('Unsupported args: %s' % args)
306 if not options.swarming or not options.isolate_server:
307 parser.error('Require both --swarming and --isolate-server')
308
309 logging.basicConfig(level=logging.DEBUG if options.verbose else logging.ERROR)
310 # Loads the other flags implicitly.
311 slave_os, priority, steps = process_build_properties(options)
312 logging.info('To run: %s, %s, %s', slave_os, priority, steps)
313 if not steps:
314 # TODO(maruel): Returns a warning so it's clear that something is not
315 # normal. Not sure how to do this.
ghost stip (do not use) 2014/01/24 22:47:26 annotator.AdvancedAnnotationStep(sys.stdout, False
M-A Ruel 2014/01/25 02:12:20 From reading annotator.py, I don't think this work
316 print('Nothing to trigger')
317 return 0
318 print('Selected tests:')
319 print('\n'.join(' %s' % s for s in sorted(steps)))
320 selected_os = swarming_utils.OS_MAPPING[slave_os]
321 print('Selected OS: %s' % selected_os)
322 return drive_many(
323 client,
324 version,
325 options.swarming,
326 options.isolate_server,
327 priority,
328 {'os': selected_os},
329 steps)
330
331
332 if __name__ == '__main__':
333 fix_encoding.fix_encoding()
334 sys.exit(main(sys.argv[1:]))
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698