Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(290)

Side by Side Diff: subprocess2.py

Issue 14826003: Refactor nag functionality in to NagTimer class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Fix nag_max Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gclient_utils.py ('k') | tests/gclient_scm_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding=utf8 1 # coding=utf8
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 """Collection of subprocess wrapper functions. 5 """Collection of subprocess wrapper functions.
6 6
7 In theory you shouldn't need anything else in subprocess, or this module failed. 7 In theory you shouldn't need anything else in subprocess, or this module failed.
8 """ 8 """
9 9
10 import cStringIO 10 import cStringIO
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
125 # Requires modifications. 125 # Requires modifications.
126 env = env.copy() 126 env = env.copy()
127 def fix_lang(name): 127 def fix_lang(name):
128 if not is_english(name): 128 if not is_english(name):
129 env[name] = 'en_US.UTF-8' 129 env[name] = 'en_US.UTF-8'
130 fix_lang('LANG') 130 fix_lang('LANG')
131 fix_lang('LANGUAGE') 131 fix_lang('LANGUAGE')
132 return env 132 return env
133 133
134 134
135 class NagTimer(object):
136 """
137 Triggers a callback when a time interval passes without an event being fired.
138
139 For example, the event could be receiving terminal output from a subprocess;
140 and the callback could print a warning to stderr that the subprocess appeared
141 to be hung.
142 """
143 def __init__(self, interval, cb):
144 self.interval = interval
145 self.cb = cb
146 self.timer = threading.Timer(self.interval, self.fn)
147 self.last_output = self.previous_last_output = 0
148
149 def start(self):
150 self.last_output = self.previous_last_output = time.time()
151 self.timer.start()
152
153 def event(self):
154 self.last_output = time.time()
155
156 def fn(self):
157 now = time.time()
158 if self.last_output == self.previous_last_output:
159 self.cb(now - self.previous_last_output)
160 # Use 0.1 fudge factor, just in case
161 # (self.last_output - now) is very close to zero.
162 sleep_time = (self.last_output - now - 0.1) % self.interval
163 self.previous_last_output = self.last_output
164 self.timer = threading.Timer(sleep_time + 0.1, self.fn)
165 self.timer.start()
166
167 def cancel(self):
168 self.timer.cancel()
169
170
135 class Popen(subprocess.Popen): 171 class Popen(subprocess.Popen):
136 """Wraps subprocess.Popen() with various workarounds. 172 """Wraps subprocess.Popen() with various workarounds.
137 173
138 - Forces English output since it's easier to parse the stdout if it is always 174 - Forces English output since it's easier to parse the stdout if it is always
139 in English. 175 in English.
140 - Sets shell=True on windows by default. You can override this by forcing 176 - Sets shell=True on windows by default. You can override this by forcing
141 shell parameter to a value. 177 shell parameter to a value.
142 - Adds support for VOID to not buffer when not needed. 178 - Adds support for VOID to not buffer when not needed.
143 - Adds self.start property. 179 - Adds self.start property.
144 180
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
185 if kwargs.get(stream) in (VOID, os.devnull): 221 if kwargs.get(stream) in (VOID, os.devnull):
186 kwargs[stream] = open(os.devnull, 'w') 222 kwargs[stream] = open(os.devnull, 'w')
187 setattr(self, stream + '_is_void', True) 223 setattr(self, stream + '_is_void', True)
188 if callable(kwargs.get(stream)): 224 if callable(kwargs.get(stream)):
189 setattr(self, stream + '_cb', kwargs[stream]) 225 setattr(self, stream + '_cb', kwargs[stream])
190 kwargs[stream] = PIPE 226 kwargs[stream] = PIPE
191 227
192 self.start = time.time() 228 self.start = time.time()
193 self.timeout = None 229 self.timeout = None
194 self.nag_timer = None 230 self.nag_timer = None
231 self.nag_max = None
195 self.shell = kwargs.get('shell', None) 232 self.shell = kwargs.get('shell', None)
196 # Silence pylint on MacOSX 233 # Silence pylint on MacOSX
197 self.returncode = None 234 self.returncode = None
198 235
199 try: 236 try:
200 super(Popen, self).__init__(args, **kwargs) 237 super(Popen, self).__init__(args, **kwargs)
201 except OSError, e: 238 except OSError, e:
202 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': 239 if e.errno == errno.EAGAIN and sys.platform == 'cygwin':
203 # Convert fork() emulation failure into a CygwinRebaseError(). 240 # Convert fork() emulation failure into a CygwinRebaseError().
204 raise CygwinRebaseError( 241 raise CygwinRebaseError(
(...skipping 18 matching lines...) Expand all
223 effectiveness will be delayed accordingly. 260 effectiveness will be delayed accordingly.
224 """ 261 """
225 # Queue of either of <threadname> when done or (<threadname>, data). In 262 # Queue of either of <threadname> when done or (<threadname>, data). In
226 # theory we would like to limit to ~64kb items to not cause large memory 263 # theory we would like to limit to ~64kb items to not cause large memory
227 # usage when the callback blocks. It is not done because it slows down 264 # usage when the callback blocks. It is not done because it slows down
228 # processing on OSX10.6 by a factor of 2x, making it even slower than 265 # processing on OSX10.6 by a factor of 2x, making it even slower than
229 # Windows! Revisit this decision if it becomes a problem, e.g. crash 266 # Windows! Revisit this decision if it becomes a problem, e.g. crash
230 # because of memory exhaustion. 267 # because of memory exhaustion.
231 queue = Queue.Queue() 268 queue = Queue.Queue()
232 done = threading.Event() 269 done = threading.Event()
233 timer = [] 270 nag = None
234 last_output = [time.time()] * 2
235 271
236 def write_stdin(): 272 def write_stdin():
237 try: 273 try:
238 stdin_io = cStringIO.StringIO(input) 274 stdin_io = cStringIO.StringIO(input)
239 while True: 275 while True:
240 data = stdin_io.read(1024) 276 data = stdin_io.read(1024)
241 if data: 277 if data:
242 self.stdin.write(data) 278 self.stdin.write(data)
243 else: 279 else:
244 self.stdin.close() 280 self.stdin.close()
245 break 281 break
246 finally: 282 finally:
247 queue.put('stdin') 283 queue.put('stdin')
248 284
249 def _queue_pipe_read(pipe, name): 285 def _queue_pipe_read(pipe, name):
250 """Queues characters read from a pipe into a queue.""" 286 """Queues characters read from a pipe into a queue."""
251 try: 287 try:
252 while True: 288 while True:
253 data = pipe.read(1) 289 data = pipe.read(1)
254 if not data: 290 if not data:
255 break 291 break
256 last_output[0] = time.time() 292 if nag:
293 nag.event()
257 queue.put((name, data)) 294 queue.put((name, data))
258 finally: 295 finally:
259 queue.put(name) 296 queue.put(name)
260 297
261 def nag_fn():
262 now = time.time()
263 if done.is_set():
264 return
265 if last_output[0] == last_output[1]:
266 logging.warn(' No output for %.0f seconds from command:' % (
267 now - last_output[1]))
268 logging.warn(' %s' % self.cmd_str)
269 # Use 0.1 fudge factor in case:
270 # now ~= last_output[0] + self.nag_timer
271 sleep_time = self.nag_timer + last_output[0] - now - 0.1
272 while sleep_time < 0:
273 sleep_time += self.nag_timer
274 last_output[1] = last_output[0]
275 timer[0] = threading.Timer(sleep_time, nag_fn)
276 timer[0].start()
277
278 def timeout_fn(): 298 def timeout_fn():
279 try: 299 try:
280 done.wait(self.timeout) 300 done.wait(self.timeout)
281 finally: 301 finally:
282 queue.put('timeout') 302 queue.put('timeout')
283 303
284 def wait_fn(): 304 def wait_fn():
285 try: 305 try:
286 self.wait() 306 self.wait()
287 finally: 307 finally:
(...skipping 18 matching lines...) Expand all
306 target=_queue_pipe_read, args=(self.stderr, 'stderr')) 326 target=_queue_pipe_read, args=(self.stderr, 'stderr'))
307 if input: 327 if input:
308 threads['stdin'] = threading.Thread(target=write_stdin) 328 threads['stdin'] = threading.Thread(target=write_stdin)
309 elif self.stdin: 329 elif self.stdin:
310 # Pipe but no input, make sure it's closed. 330 # Pipe but no input, make sure it's closed.
311 self.stdin.close() 331 self.stdin.close()
312 for t in threads.itervalues(): 332 for t in threads.itervalues():
313 t.start() 333 t.start()
314 334
315 if self.nag_timer: 335 if self.nag_timer:
316 timer.append(threading.Timer(self.nag_timer, nag_fn)) 336 def _nag_cb(elapsed):
317 timer[0].start() 337 logging.warn(' No output for %.0f seconds from command:' % elapsed)
338 logging.warn(' %s' % self.cmd_str)
339 if (self.nag_max and
340 int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max):
341 queue.put('timeout')
342 done.set() # Must do this so that timeout thread stops waiting.
343 nag = NagTimer(self.nag_timer, _nag_cb)
344 nag.start()
318 345
319 timed_out = False 346 timed_out = False
320 try: 347 try:
321 # This thread needs to be optimized for speed. 348 # This thread needs to be optimized for speed.
322 while threads: 349 while threads:
323 item = queue.get() 350 item = queue.get()
324 if item[0] == 'stdout': 351 if item[0] == 'stdout':
325 self.stdout_cb(item[1]) 352 self.stdout_cb(item[1])
326 elif item[0] == 'stderr': 353 elif item[0] == 'stderr':
327 self.stderr_cb(item[1]) 354 self.stderr_cb(item[1])
328 else: 355 else:
329 # A thread terminated. 356 # A thread terminated.
330 threads[item].join() 357 if item in threads:
331 del threads[item] 358 threads[item].join()
359 del threads[item]
332 if item == 'wait': 360 if item == 'wait':
333 # Terminate the timeout thread if necessary. 361 # Terminate the timeout thread if necessary.
334 done.set() 362 done.set()
335 elif item == 'timeout' and not timed_out and self.poll() is None: 363 elif item == 'timeout' and not timed_out and self.poll() is None:
336 logging.debug('Timed out after %fs: killing' % self.timeout) 364 logging.debug('Timed out after %.0fs: killing' % (
365 time.time() - self.start))
337 self.kill() 366 self.kill()
338 timed_out = True 367 timed_out = True
339 finally: 368 finally:
340 # Stop the threads. 369 # Stop the threads.
341 done.set() 370 done.set()
342 if timer: 371 if nag:
343 timer[0].cancel() 372 nag.cancel()
344 if 'wait' in threads: 373 if 'wait' in threads:
345 # Accelerate things, otherwise it would hang until the child process is 374 # Accelerate things, otherwise it would hang until the child process is
346 # done. 375 # done.
347 logging.debug('Killing child because of an exception') 376 logging.debug('Killing child because of an exception')
348 self.kill() 377 self.kill()
349 # Join threads. 378 # Join threads.
350 for thread in threads.itervalues(): 379 for thread in threads.itervalues():
351 thread.join() 380 thread.join()
352 if timed_out: 381 if timed_out:
353 self.returncode = TIMED_OUT 382 self.returncode = TIMED_OUT
354 383
355 # pylint: disable=W0221,W0622 384 # pylint: disable=W0221,W0622
356 def communicate(self, input=None, timeout=None, nag_timer=None): 385 def communicate(self, input=None, timeout=None, nag_timer=None,
386 nag_max=None):
357 """Adds timeout and callbacks support. 387 """Adds timeout and callbacks support.
358 388
359 Returns (stdout, stderr) like subprocess.Popen().communicate(). 389 Returns (stdout, stderr) like subprocess.Popen().communicate().
360 390
361 - The process will be killed after |timeout| seconds and returncode set to 391 - The process will be killed after |timeout| seconds and returncode set to
362 TIMED_OUT. 392 TIMED_OUT.
363 - If the subprocess runs for |nag_timer| seconds without producing terminal 393 - If the subprocess runs for |nag_timer| seconds without producing terminal
364 output, print a warning to stderr. 394 output, print a warning to stderr.
365 """ 395 """
366 self.timeout = timeout 396 self.timeout = timeout
367 self.nag_timer = nag_timer 397 self.nag_timer = nag_timer
398 self.nag_max = nag_max
368 if (not self.timeout and not self.nag_timer and 399 if (not self.timeout and not self.nag_timer and
369 not self.stdout_cb and not self.stderr_cb): 400 not self.stdout_cb and not self.stderr_cb):
370 return super(Popen, self).communicate(input) 401 return super(Popen, self).communicate(input)
371 402
372 if self.timeout and self.shell: 403 if self.timeout and self.shell:
373 raise TypeError( 404 raise TypeError(
374 'Using timeout and shell simultaneously will cause a process leak ' 405 'Using timeout and shell simultaneously will cause a process leak '
375 'since the shell will be killed instead of the child process.') 406 'since the shell will be killed instead of the child process.')
376 407
377 stdout = None 408 stdout = None
378 stderr = None 409 stderr = None
379 # Convert to a lambda to workaround python's deadlock. 410 # Convert to a lambda to workaround python's deadlock.
380 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait 411 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
381 # When the pipe fills up, it would deadlock this process. 412 # When the pipe fills up, it would deadlock this process.
382 if self.stdout and not self.stdout_cb and not self.stdout_is_void: 413 if self.stdout and not self.stdout_cb and not self.stdout_is_void:
383 stdout = [] 414 stdout = []
384 self.stdout_cb = stdout.append 415 self.stdout_cb = stdout.append
385 if self.stderr and not self.stderr_cb and not self.stderr_is_void: 416 if self.stderr and not self.stderr_cb and not self.stderr_is_void:
386 stderr = [] 417 stderr = []
387 self.stderr_cb = stderr.append 418 self.stderr_cb = stderr.append
388 self._tee_threads(input) 419 self._tee_threads(input)
389 if stdout is not None: 420 if stdout is not None:
390 stdout = ''.join(stdout) 421 stdout = ''.join(stdout)
391 if stderr is not None: 422 if stderr is not None:
392 stderr = ''.join(stderr) 423 stderr = ''.join(stderr)
393 return (stdout, stderr) 424 return (stdout, stderr)
394 425
395 426
396 def communicate(args, timeout=None, nag_timer=None, **kwargs): 427 def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs):
397 """Wraps subprocess.Popen().communicate() and add timeout support. 428 """Wraps subprocess.Popen().communicate() and add timeout support.
398 429
399 Returns ((stdout, stderr), returncode). 430 Returns ((stdout, stderr), returncode).
400 431
401 - The process will be killed after |timeout| seconds and returncode set to 432 - The process will be killed after |timeout| seconds and returncode set to
402 TIMED_OUT. 433 TIMED_OUT.
403 - If the subprocess runs for |nag_timer| seconds without producing terminal 434 - If the subprocess runs for |nag_timer| seconds without producing terminal
404 output, print a warning to stderr. 435 output, print a warning to stderr.
405 - Automatically passes stdin content as input so do not specify stdin=PIPE. 436 - Automatically passes stdin content as input so do not specify stdin=PIPE.
406 """ 437 """
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
474 505
475 - Throws if return code is not 0. 506 - Throws if return code is not 0.
476 - Works even prior to python 2.7. 507 - Works even prior to python 2.7.
477 - Blocks stdin by default if not specified since no output will be visible. 508 - Blocks stdin by default if not specified since no output will be visible.
478 - As per doc, "The stdout argument is not allowed as it is used internally." 509 - As per doc, "The stdout argument is not allowed as it is used internally."
479 """ 510 """
480 kwargs.setdefault('stdin', VOID) 511 kwargs.setdefault('stdin', VOID)
481 if 'stdout' in kwargs: 512 if 'stdout' in kwargs:
482 raise ValueError('stdout argument not allowed, it will be overridden.') 513 raise ValueError('stdout argument not allowed, it will be overridden.')
483 return check_call_out(args, stdout=PIPE, **kwargs)[0] 514 return check_call_out(args, stdout=PIPE, **kwargs)[0]
OLDNEW
« no previous file with comments | « gclient_utils.py ('k') | tests/gclient_scm_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698