Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(340)

Side by Side Diff: subprocess2.py

Issue 14798003: Add nag_timer. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: 80 chars Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding=utf8 1 # coding=utf8
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 """Collection of subprocess wrapper functions. 5 """Collection of subprocess wrapper functions.
6 6
7 In theory you shouldn't need anything else in subprocess, or this module failed. 7 In theory you shouldn't need anything else in subprocess, or this module failed.
8 """ 8 """
9 9
10 import cStringIO 10 import cStringIO
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after
168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) 168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None)
169 if kwargs.get('cwd', None): 169 if kwargs.get('cwd', None):
170 tmp_str += '; cwd=%s' % kwargs['cwd'] 170 tmp_str += '; cwd=%s' % kwargs['cwd']
171 logging.debug(tmp_str) 171 logging.debug(tmp_str)
172 172
173 self.stdout_cb = None 173 self.stdout_cb = None
174 self.stderr_cb = None 174 self.stderr_cb = None
175 self.stdin_is_void = False 175 self.stdin_is_void = False
176 self.stdout_is_void = False 176 self.stdout_is_void = False
177 self.stderr_is_void = False 177 self.stderr_is_void = False
178 self.cmd_str = tmp_str
178 179
179 if kwargs.get('stdin') is VOID: 180 if kwargs.get('stdin') is VOID:
180 kwargs['stdin'] = open(os.devnull, 'r') 181 kwargs['stdin'] = open(os.devnull, 'r')
181 self.stdin_is_void = True 182 self.stdin_is_void = True
182 183
183 for stream in ('stdout', 'stderr'): 184 for stream in ('stdout', 'stderr'):
184 if kwargs.get(stream) in (VOID, os.devnull): 185 if kwargs.get(stream) in (VOID, os.devnull):
185 kwargs[stream] = open(os.devnull, 'w') 186 kwargs[stream] = open(os.devnull, 'w')
186 setattr(self, stream + '_is_void', True) 187 setattr(self, stream + '_is_void', True)
187 if callable(kwargs.get(stream)): 188 if callable(kwargs.get(stream)):
188 setattr(self, stream + '_cb', kwargs[stream]) 189 setattr(self, stream + '_cb', kwargs[stream])
189 kwargs[stream] = PIPE 190 kwargs[stream] = PIPE
190 191
191 self.start = time.time() 192 self.start = time.time()
192 self.timeout = None 193 self.timeout = None
194 self.nag_timer = None
193 self.shell = kwargs.get('shell', None) 195 self.shell = kwargs.get('shell', None)
194 # Silence pylint on MacOSX 196 # Silence pylint on MacOSX
195 self.returncode = None 197 self.returncode = None
196 198
197 try: 199 try:
198 super(Popen, self).__init__(args, **kwargs) 200 super(Popen, self).__init__(args, **kwargs)
199 except OSError, e: 201 except OSError, e:
200 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': 202 if e.errno == errno.EAGAIN and sys.platform == 'cygwin':
201 # Convert fork() emulation failure into a CygwinRebaseError(). 203 # Convert fork() emulation failure into a CygwinRebaseError().
202 raise CygwinRebaseError( 204 raise CygwinRebaseError(
(...skipping 18 matching lines...) Expand all
221 effectiveness will be delayed accordingly. 223 effectiveness will be delayed accordingly.
222 """ 224 """
223 # Queue of either of <threadname> when done or (<threadname>, data). In 225 # Queue of either of <threadname> when done or (<threadname>, data). In
224 # theory we would like to limit to ~64kb items to not cause large memory 226 # theory we would like to limit to ~64kb items to not cause large memory
225 # usage when the callback blocks. It is not done because it slows down 227 # usage when the callback blocks. It is not done because it slows down
226 # processing on OSX10.6 by a factor of 2x, making it even slower than 228 # processing on OSX10.6 by a factor of 2x, making it even slower than
227 # Windows! Revisit this decision if it becomes a problem, e.g. crash 229 # Windows! Revisit this decision if it becomes a problem, e.g. crash
228 # because of memory exhaustion. 230 # because of memory exhaustion.
229 queue = Queue.Queue() 231 queue = Queue.Queue()
230 done = threading.Event() 232 done = threading.Event()
233 timer = []
234 last_output = [time.time()] * 2
231 235
232 def write_stdin(): 236 def write_stdin():
233 try: 237 try:
234 stdin_io = cStringIO.StringIO(input) 238 stdin_io = cStringIO.StringIO(input)
235 while True: 239 while True:
236 data = stdin_io.read(1024) 240 data = stdin_io.read(1024)
237 if data: 241 if data:
238 self.stdin.write(data) 242 self.stdin.write(data)
239 else: 243 else:
240 self.stdin.close() 244 self.stdin.close()
241 break 245 break
242 finally: 246 finally:
243 queue.put('stdin') 247 queue.put('stdin')
244 248
245 def _queue_pipe_read(pipe, name): 249 def _queue_pipe_read(pipe, name):
246 """Queues characters read from a pipe into a queue.""" 250 """Queues characters read from a pipe into a queue."""
247 try: 251 try:
248 while True: 252 while True:
249 data = pipe.read(1) 253 data = pipe.read(1)
250 if not data: 254 if not data:
251 break 255 break
256 last_output[0] = time.time()
252 queue.put((name, data)) 257 queue.put((name, data))
253 finally: 258 finally:
254 queue.put(name) 259 queue.put(name)
255 260
261 def nag_fn():
262 now = time.time()
263 if done.is_set():
264 return
265 if last_output[0] == last_output[1]:
266 logging.warn(' No output for %.0f seconds from command:' % (
267 now - last_output[1]))
268 logging.warn(' %s' % self.cmd_str)
269 # Use 0.1 fudge factor in case:
270 # now ~= last_output[0] + self.nag_timer
271 sleep_time = self.nag_timer + last_output[0] - now - 0.1
272 while sleep_time < 0:
273 sleep_time += self.nag_timer
274 last_output[1] = last_output[0]
275 timer[0] = threading.Timer(sleep_time, nag_fn)
276 timer[0].start()
277
256 def timeout_fn(): 278 def timeout_fn():
257 try: 279 try:
258 done.wait(self.timeout) 280 done.wait(self.timeout)
259 finally: 281 finally:
260 queue.put('timeout') 282 queue.put('timeout')
261 283
262 def wait_fn(): 284 def wait_fn():
263 try: 285 try:
264 self.wait() 286 self.wait()
265 finally: 287 finally:
(...skipping 17 matching lines...) Expand all
283 threads['stderr'] = threading.Thread( 305 threads['stderr'] = threading.Thread(
284 target=_queue_pipe_read, args=(self.stderr, 'stderr')) 306 target=_queue_pipe_read, args=(self.stderr, 'stderr'))
285 if input: 307 if input:
286 threads['stdin'] = threading.Thread(target=write_stdin) 308 threads['stdin'] = threading.Thread(target=write_stdin)
287 elif self.stdin: 309 elif self.stdin:
288 # Pipe but no input, make sure it's closed. 310 # Pipe but no input, make sure it's closed.
289 self.stdin.close() 311 self.stdin.close()
290 for t in threads.itervalues(): 312 for t in threads.itervalues():
291 t.start() 313 t.start()
292 314
315 if self.nag_timer:
316 timer.append(threading.Timer(self.nag_timer, nag_fn))
317 timer[0].start()
318
293 timed_out = False 319 timed_out = False
294 try: 320 try:
295 # This thread needs to be optimized for speed. 321 # This thread needs to be optimized for speed.
296 while threads: 322 while threads:
297 item = queue.get() 323 item = queue.get()
298 if item[0] == 'stdout': 324 if item[0] == 'stdout':
299 self.stdout_cb(item[1]) 325 self.stdout_cb(item[1])
300 elif item[0] == 'stderr': 326 elif item[0] == 'stderr':
301 self.stderr_cb(item[1]) 327 self.stderr_cb(item[1])
302 else: 328 else:
303 # A thread terminated. 329 # A thread terminated.
304 threads[item].join() 330 threads[item].join()
305 del threads[item] 331 del threads[item]
306 if item == 'wait': 332 if item == 'wait':
307 # Terminate the timeout thread if necessary. 333 # Terminate the timeout thread if necessary.
308 done.set() 334 done.set()
309 elif item == 'timeout' and not timed_out and self.poll() is None: 335 elif item == 'timeout' and not timed_out and self.poll() is None:
310 logging.debug('Timed out after %fs: killing' % self.timeout) 336 logging.debug('Timed out after %fs: killing' % self.timeout)
311 self.kill() 337 self.kill()
312 timed_out = True 338 timed_out = True
313 finally: 339 finally:
314 # Stop the threads. 340 # Stop the threads.
315 done.set() 341 done.set()
342 if timer:
343 timer[0].cancel()
316 if 'wait' in threads: 344 if 'wait' in threads:
317 # Accelerate things, otherwise it would hang until the child process is 345 # Accelerate things, otherwise it would hang until the child process is
318 # done. 346 # done.
319 logging.debug('Killing child because of an exception') 347 logging.debug('Killing child because of an exception')
320 self.kill() 348 self.kill()
321 # Join threads. 349 # Join threads.
322 for thread in threads.itervalues(): 350 for thread in threads.itervalues():
323 thread.join() 351 thread.join()
324 if timed_out: 352 if timed_out:
325 self.returncode = TIMED_OUT 353 self.returncode = TIMED_OUT
326 354
327 def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 355 # pylint: disable=W0221,W0622
356 def communicate(self, input=None, timeout=None, nag_timer=None):
328 """Adds timeout and callbacks support. 357 """Adds timeout and callbacks support.
329 358
330 Returns (stdout, stderr) like subprocess.Popen().communicate(). 359 Returns (stdout, stderr) like subprocess.Popen().communicate().
331 360
332 - The process will be killed after |timeout| seconds and returncode set to 361 - The process will be killed after |timeout| seconds and returncode set to
333 TIMED_OUT. 362 TIMED_OUT.
363 - If the subprocess runs for |nag_timer| seconds without producing terminal
364 output, print a warning to stderr.
334 """ 365 """
335 self.timeout = timeout 366 self.timeout = timeout
336 if not self.timeout and not self.stdout_cb and not self.stderr_cb: 367 self.nag_timer = nag_timer
368 if (not self.timeout and not self.nag_timer and
369 not self.stdout_cb and not self.stderr_cb):
337 return super(Popen, self).communicate(input) 370 return super(Popen, self).communicate(input)
338 371
339 if self.timeout and self.shell: 372 if self.timeout and self.shell:
340 raise TypeError( 373 raise TypeError(
341 'Using timeout and shell simultaneously will cause a process leak ' 374 'Using timeout and shell simultaneously will cause a process leak '
342 'since the shell will be killed instead of the child process.') 375 'since the shell will be killed instead of the child process.')
343 376
344 stdout = None 377 stdout = None
345 stderr = None 378 stderr = None
346 # Convert to a lambda to workaround python's deadlock. 379 # Convert to a lambda to workaround python's deadlock.
347 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait 380 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
348 # When the pipe fills up, it would deadlock this process. 381 # When the pipe fills up, it would deadlock this process.
349 if self.stdout and not self.stdout_cb and not self.stdout_is_void: 382 if self.stdout and not self.stdout_cb and not self.stdout_is_void:
350 stdout = [] 383 stdout = []
351 self.stdout_cb = stdout.append 384 self.stdout_cb = stdout.append
352 if self.stderr and not self.stderr_cb and not self.stderr_is_void: 385 if self.stderr and not self.stderr_cb and not self.stderr_is_void:
353 stderr = [] 386 stderr = []
354 self.stderr_cb = stderr.append 387 self.stderr_cb = stderr.append
355 self._tee_threads(input) 388 self._tee_threads(input)
356 if stdout is not None: 389 if stdout is not None:
357 stdout = ''.join(stdout) 390 stdout = ''.join(stdout)
358 if stderr is not None: 391 if stderr is not None:
359 stderr = ''.join(stderr) 392 stderr = ''.join(stderr)
360 return (stdout, stderr) 393 return (stdout, stderr)
361 394
362 395
363 def communicate(args, timeout=None, **kwargs): 396 def communicate(args, timeout=None, nag_timer=None, **kwargs):
364 """Wraps subprocess.Popen().communicate() and add timeout support. 397 """Wraps subprocess.Popen().communicate() and add timeout support.
365 398
366 Returns ((stdout, stderr), returncode). 399 Returns ((stdout, stderr), returncode).
367 400
368 - The process will be killed after |timeout| seconds and returncode set to 401 - The process will be killed after |timeout| seconds and returncode set to
369 TIMED_OUT. 402 TIMED_OUT.
403 - If the subprocess runs for |nag_timer| seconds without producing terminal
404 output, print a warning to stderr.
370 - Automatically passes stdin content as input so do not specify stdin=PIPE. 405 - Automatically passes stdin content as input so do not specify stdin=PIPE.
371 """ 406 """
372 stdin = kwargs.pop('stdin', None) 407 stdin = kwargs.pop('stdin', None)
373 if stdin is not None: 408 if stdin is not None:
374 if isinstance(stdin, basestring): 409 if isinstance(stdin, basestring):
375 # When stdin is passed as an argument, use it as the actual input data and 410 # When stdin is passed as an argument, use it as the actual input data and
376 # set the Popen() parameter accordingly. 411 # set the Popen() parameter accordingly.
377 kwargs['stdin'] = PIPE 412 kwargs['stdin'] = PIPE
378 else: 413 else:
379 kwargs['stdin'] = stdin 414 kwargs['stdin'] = stdin
380 stdin = None 415 stdin = None
381 416
382 proc = Popen(args, **kwargs) 417 proc = Popen(args, **kwargs)
383 if stdin: 418 if stdin:
384 return proc.communicate(stdin, timeout), proc.returncode 419 return proc.communicate(stdin, timeout, nag_timer), proc.returncode
385 else: 420 else:
386 return proc.communicate(None, timeout), proc.returncode 421 return proc.communicate(None, timeout, nag_timer), proc.returncode
387 422
388 423
389 def call(args, **kwargs): 424 def call(args, **kwargs):
390 """Emulates subprocess.call(). 425 """Emulates subprocess.call().
391 426
392 Automatically convert stdout=PIPE or stderr=PIPE to VOID. 427 Automatically convert stdout=PIPE or stderr=PIPE to VOID.
393 In no case they can be returned since no code path raises 428 In no case they can be returned since no code path raises
394 subprocess2.CalledProcessError. 429 subprocess2.CalledProcessError.
395 """ 430 """
396 if kwargs.get('stdout') == PIPE: 431 if kwargs.get('stdout') == PIPE:
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
439 474
440 - Throws if return code is not 0. 475 - Throws if return code is not 0.
441 - Works even prior to python 2.7. 476 - Works even prior to python 2.7.
442 - Blocks stdin by default if not specified since no output will be visible. 477 - Blocks stdin by default if not specified since no output will be visible.
443 - As per doc, "The stdout argument is not allowed as it is used internally." 478 - As per doc, "The stdout argument is not allowed as it is used internally."
444 """ 479 """
445 kwargs.setdefault('stdin', VOID) 480 kwargs.setdefault('stdin', VOID)
446 if 'stdout' in kwargs: 481 if 'stdout' in kwargs:
447 raise ValueError('stdout argument not allowed, it will be overridden.') 482 raise ValueError('stdout argument not allowed, it will be overridden.')
448 return check_call_out(args, stdout=PIPE, **kwargs)[0] 483 return check_call_out(args, stdout=PIPE, **kwargs)[0]
OLDNEW
« no previous file with comments | « no previous file | tests/subprocess2_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698