OLD | NEW |
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 import cStringIO | 10 import cStringIO |
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) | 168 raise CalledProcessError(None, args, kwargs.get('cwd'), None, None) |
169 if kwargs.get('cwd', None): | 169 if kwargs.get('cwd', None): |
170 tmp_str += '; cwd=%s' % kwargs['cwd'] | 170 tmp_str += '; cwd=%s' % kwargs['cwd'] |
171 logging.debug(tmp_str) | 171 logging.debug(tmp_str) |
172 | 172 |
173 self.stdout_cb = None | 173 self.stdout_cb = None |
174 self.stderr_cb = None | 174 self.stderr_cb = None |
175 self.stdin_is_void = False | 175 self.stdin_is_void = False |
176 self.stdout_is_void = False | 176 self.stdout_is_void = False |
177 self.stderr_is_void = False | 177 self.stderr_is_void = False |
| 178 self.cmd_str = tmp_str |
178 | 179 |
179 if kwargs.get('stdin') is VOID: | 180 if kwargs.get('stdin') is VOID: |
180 kwargs['stdin'] = open(os.devnull, 'r') | 181 kwargs['stdin'] = open(os.devnull, 'r') |
181 self.stdin_is_void = True | 182 self.stdin_is_void = True |
182 | 183 |
183 for stream in ('stdout', 'stderr'): | 184 for stream in ('stdout', 'stderr'): |
184 if kwargs.get(stream) in (VOID, os.devnull): | 185 if kwargs.get(stream) in (VOID, os.devnull): |
185 kwargs[stream] = open(os.devnull, 'w') | 186 kwargs[stream] = open(os.devnull, 'w') |
186 setattr(self, stream + '_is_void', True) | 187 setattr(self, stream + '_is_void', True) |
187 if callable(kwargs.get(stream)): | 188 if callable(kwargs.get(stream)): |
188 setattr(self, stream + '_cb', kwargs[stream]) | 189 setattr(self, stream + '_cb', kwargs[stream]) |
189 kwargs[stream] = PIPE | 190 kwargs[stream] = PIPE |
190 | 191 |
191 self.start = time.time() | 192 self.start = time.time() |
192 self.timeout = None | 193 self.timeout = None |
| 194 self.nag_timer = None |
193 self.shell = kwargs.get('shell', None) | 195 self.shell = kwargs.get('shell', None) |
194 # Silence pylint on MacOSX | 196 # Silence pylint on MacOSX |
195 self.returncode = None | 197 self.returncode = None |
196 | 198 |
197 try: | 199 try: |
198 super(Popen, self).__init__(args, **kwargs) | 200 super(Popen, self).__init__(args, **kwargs) |
199 except OSError, e: | 201 except OSError, e: |
200 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 202 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
201 # Convert fork() emulation failure into a CygwinRebaseError(). | 203 # Convert fork() emulation failure into a CygwinRebaseError(). |
202 raise CygwinRebaseError( | 204 raise CygwinRebaseError( |
(...skipping 18 matching lines...) Expand all Loading... |
221 effectiveness will be delayed accordingly. | 223 effectiveness will be delayed accordingly. |
222 """ | 224 """ |
223 # Queue of either of <threadname> when done or (<threadname>, data). In | 225 # Queue of either of <threadname> when done or (<threadname>, data). In |
224 # theory we would like to limit to ~64kb items to not cause large memory | 226 # theory we would like to limit to ~64kb items to not cause large memory |
225 # usage when the callback blocks. It is not done because it slows down | 227 # usage when the callback blocks. It is not done because it slows down |
226 # processing on OSX10.6 by a factor of 2x, making it even slower than | 228 # processing on OSX10.6 by a factor of 2x, making it even slower than |
227 # Windows! Revisit this decision if it becomes a problem, e.g. crash | 229 # Windows! Revisit this decision if it becomes a problem, e.g. crash |
228 # because of memory exhaustion. | 230 # because of memory exhaustion. |
229 queue = Queue.Queue() | 231 queue = Queue.Queue() |
230 done = threading.Event() | 232 done = threading.Event() |
| 233 timer = [] |
| 234 last_output = [time.time()] * 2 |
231 | 235 |
232 def write_stdin(): | 236 def write_stdin(): |
233 try: | 237 try: |
234 stdin_io = cStringIO.StringIO(input) | 238 stdin_io = cStringIO.StringIO(input) |
235 while True: | 239 while True: |
236 data = stdin_io.read(1024) | 240 data = stdin_io.read(1024) |
237 if data: | 241 if data: |
238 self.stdin.write(data) | 242 self.stdin.write(data) |
239 else: | 243 else: |
240 self.stdin.close() | 244 self.stdin.close() |
241 break | 245 break |
242 finally: | 246 finally: |
243 queue.put('stdin') | 247 queue.put('stdin') |
244 | 248 |
245 def _queue_pipe_read(pipe, name): | 249 def _queue_pipe_read(pipe, name): |
246 """Queues characters read from a pipe into a queue.""" | 250 """Queues characters read from a pipe into a queue.""" |
247 try: | 251 try: |
248 while True: | 252 while True: |
249 data = pipe.read(1) | 253 data = pipe.read(1) |
250 if not data: | 254 if not data: |
251 break | 255 break |
| 256 last_output[0] = time.time() |
252 queue.put((name, data)) | 257 queue.put((name, data)) |
253 finally: | 258 finally: |
254 queue.put(name) | 259 queue.put(name) |
255 | 260 |
| 261 def nag_fn(): |
| 262 now = time.time() |
| 263 if done.is_set(): |
| 264 return |
| 265 if last_output[0] == last_output[1]: |
| 266 logging.warn(' No output for %.0f seconds from command:' % ( |
| 267 now - last_output[1])) |
| 268 logging.warn(' %s' % self.cmd_str) |
| 269 # Use 0.1 fudge factor in case: |
| 270 # now ~= last_output[0] + self.nag_timer |
| 271 sleep_time = self.nag_timer + last_output[0] - now - 0.1 |
| 272 while sleep_time < 0: |
| 273 sleep_time += self.nag_timer |
| 274 last_output[1] = last_output[0] |
| 275 timer[0] = threading.Timer(sleep_time, nag_fn) |
| 276 timer[0].start() |
| 277 |
256 def timeout_fn(): | 278 def timeout_fn(): |
257 try: | 279 try: |
258 done.wait(self.timeout) | 280 done.wait(self.timeout) |
259 finally: | 281 finally: |
260 queue.put('timeout') | 282 queue.put('timeout') |
261 | 283 |
262 def wait_fn(): | 284 def wait_fn(): |
263 try: | 285 try: |
264 self.wait() | 286 self.wait() |
265 finally: | 287 finally: |
(...skipping 17 matching lines...) Expand all Loading... |
283 threads['stderr'] = threading.Thread( | 305 threads['stderr'] = threading.Thread( |
284 target=_queue_pipe_read, args=(self.stderr, 'stderr')) | 306 target=_queue_pipe_read, args=(self.stderr, 'stderr')) |
285 if input: | 307 if input: |
286 threads['stdin'] = threading.Thread(target=write_stdin) | 308 threads['stdin'] = threading.Thread(target=write_stdin) |
287 elif self.stdin: | 309 elif self.stdin: |
288 # Pipe but no input, make sure it's closed. | 310 # Pipe but no input, make sure it's closed. |
289 self.stdin.close() | 311 self.stdin.close() |
290 for t in threads.itervalues(): | 312 for t in threads.itervalues(): |
291 t.start() | 313 t.start() |
292 | 314 |
| 315 if self.nag_timer: |
| 316 timer.append(threading.Timer(self.nag_timer, nag_fn)) |
| 317 timer[0].start() |
| 318 |
293 timed_out = False | 319 timed_out = False |
294 try: | 320 try: |
295 # This thread needs to be optimized for speed. | 321 # This thread needs to be optimized for speed. |
296 while threads: | 322 while threads: |
297 item = queue.get() | 323 item = queue.get() |
298 if item[0] == 'stdout': | 324 if item[0] == 'stdout': |
299 self.stdout_cb(item[1]) | 325 self.stdout_cb(item[1]) |
300 elif item[0] == 'stderr': | 326 elif item[0] == 'stderr': |
301 self.stderr_cb(item[1]) | 327 self.stderr_cb(item[1]) |
302 else: | 328 else: |
303 # A thread terminated. | 329 # A thread terminated. |
304 threads[item].join() | 330 threads[item].join() |
305 del threads[item] | 331 del threads[item] |
306 if item == 'wait': | 332 if item == 'wait': |
307 # Terminate the timeout thread if necessary. | 333 # Terminate the timeout thread if necessary. |
308 done.set() | 334 done.set() |
309 elif item == 'timeout' and not timed_out and self.poll() is None: | 335 elif item == 'timeout' and not timed_out and self.poll() is None: |
310 logging.debug('Timed out after %fs: killing' % self.timeout) | 336 logging.debug('Timed out after %fs: killing' % self.timeout) |
311 self.kill() | 337 self.kill() |
312 timed_out = True | 338 timed_out = True |
313 finally: | 339 finally: |
314 # Stop the threads. | 340 # Stop the threads. |
315 done.set() | 341 done.set() |
| 342 if timer: |
| 343 timer[0].cancel() |
316 if 'wait' in threads: | 344 if 'wait' in threads: |
317 # Accelerate things, otherwise it would hang until the child process is | 345 # Accelerate things, otherwise it would hang until the child process is |
318 # done. | 346 # done. |
319 logging.debug('Killing child because of an exception') | 347 logging.debug('Killing child because of an exception') |
320 self.kill() | 348 self.kill() |
321 # Join threads. | 349 # Join threads. |
322 for thread in threads.itervalues(): | 350 for thread in threads.itervalues(): |
323 thread.join() | 351 thread.join() |
324 if timed_out: | 352 if timed_out: |
325 self.returncode = TIMED_OUT | 353 self.returncode = TIMED_OUT |
326 | 354 |
327 def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622 | 355 # pylint: disable=W0221,W0622 |
| 356 def communicate(self, input=None, timeout=None, nag_timer=None): |
328 """Adds timeout and callbacks support. | 357 """Adds timeout and callbacks support. |
329 | 358 |
330 Returns (stdout, stderr) like subprocess.Popen().communicate(). | 359 Returns (stdout, stderr) like subprocess.Popen().communicate(). |
331 | 360 |
332 - The process will be killed after |timeout| seconds and returncode set to | 361 - The process will be killed after |timeout| seconds and returncode set to |
333 TIMED_OUT. | 362 TIMED_OUT. |
| 363 - If the subprocess runs for |nag_timer| seconds without producing terminal |
| 364 output, print a warning to stderr. |
334 """ | 365 """ |
335 self.timeout = timeout | 366 self.timeout = timeout |
336 if not self.timeout and not self.stdout_cb and not self.stderr_cb: | 367 self.nag_timer = nag_timer |
| 368 if (not self.timeout and not self.nag_timer and |
| 369 not self.stdout_cb and not self.stderr_cb): |
337 return super(Popen, self).communicate(input) | 370 return super(Popen, self).communicate(input) |
338 | 371 |
339 if self.timeout and self.shell: | 372 if self.timeout and self.shell: |
340 raise TypeError( | 373 raise TypeError( |
341 'Using timeout and shell simultaneously will cause a process leak ' | 374 'Using timeout and shell simultaneously will cause a process leak ' |
342 'since the shell will be killed instead of the child process.') | 375 'since the shell will be killed instead of the child process.') |
343 | 376 |
344 stdout = None | 377 stdout = None |
345 stderr = None | 378 stderr = None |
346 # Convert to a lambda to workaround python's deadlock. | 379 # Convert to a lambda to workaround python's deadlock. |
347 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 380 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
348 # When the pipe fills up, it would deadlock this process. | 381 # When the pipe fills up, it would deadlock this process. |
349 if self.stdout and not self.stdout_cb and not self.stdout_is_void: | 382 if self.stdout and not self.stdout_cb and not self.stdout_is_void: |
350 stdout = [] | 383 stdout = [] |
351 self.stdout_cb = stdout.append | 384 self.stdout_cb = stdout.append |
352 if self.stderr and not self.stderr_cb and not self.stderr_is_void: | 385 if self.stderr and not self.stderr_cb and not self.stderr_is_void: |
353 stderr = [] | 386 stderr = [] |
354 self.stderr_cb = stderr.append | 387 self.stderr_cb = stderr.append |
355 self._tee_threads(input) | 388 self._tee_threads(input) |
356 if stdout is not None: | 389 if stdout is not None: |
357 stdout = ''.join(stdout) | 390 stdout = ''.join(stdout) |
358 if stderr is not None: | 391 if stderr is not None: |
359 stderr = ''.join(stderr) | 392 stderr = ''.join(stderr) |
360 return (stdout, stderr) | 393 return (stdout, stderr) |
361 | 394 |
362 | 395 |
363 def communicate(args, timeout=None, **kwargs): | 396 def communicate(args, timeout=None, nag_timer=None, **kwargs): |
364 """Wraps subprocess.Popen().communicate() and add timeout support. | 397 """Wraps subprocess.Popen().communicate() and add timeout support. |
365 | 398 |
366 Returns ((stdout, stderr), returncode). | 399 Returns ((stdout, stderr), returncode). |
367 | 400 |
368 - The process will be killed after |timeout| seconds and returncode set to | 401 - The process will be killed after |timeout| seconds and returncode set to |
369 TIMED_OUT. | 402 TIMED_OUT. |
| 403 - If the subprocess runs for |nag_timer| seconds without producing terminal |
| 404 output, print a warning to stderr. |
370 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 405 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
371 """ | 406 """ |
372 stdin = kwargs.pop('stdin', None) | 407 stdin = kwargs.pop('stdin', None) |
373 if stdin is not None: | 408 if stdin is not None: |
374 if isinstance(stdin, basestring): | 409 if isinstance(stdin, basestring): |
375 # When stdin is passed as an argument, use it as the actual input data and | 410 # When stdin is passed as an argument, use it as the actual input data and |
376 # set the Popen() parameter accordingly. | 411 # set the Popen() parameter accordingly. |
377 kwargs['stdin'] = PIPE | 412 kwargs['stdin'] = PIPE |
378 else: | 413 else: |
379 kwargs['stdin'] = stdin | 414 kwargs['stdin'] = stdin |
380 stdin = None | 415 stdin = None |
381 | 416 |
382 proc = Popen(args, **kwargs) | 417 proc = Popen(args, **kwargs) |
383 if stdin: | 418 if stdin: |
384 return proc.communicate(stdin, timeout), proc.returncode | 419 return proc.communicate(stdin, timeout, nag_timer), proc.returncode |
385 else: | 420 else: |
386 return proc.communicate(None, timeout), proc.returncode | 421 return proc.communicate(None, timeout, nag_timer), proc.returncode |
387 | 422 |
388 | 423 |
389 def call(args, **kwargs): | 424 def call(args, **kwargs): |
390 """Emulates subprocess.call(). | 425 """Emulates subprocess.call(). |
391 | 426 |
392 Automatically convert stdout=PIPE or stderr=PIPE to VOID. | 427 Automatically convert stdout=PIPE or stderr=PIPE to VOID. |
393 In no case they can be returned since no code path raises | 428 In no case they can be returned since no code path raises |
394 subprocess2.CalledProcessError. | 429 subprocess2.CalledProcessError. |
395 """ | 430 """ |
396 if kwargs.get('stdout') == PIPE: | 431 if kwargs.get('stdout') == PIPE: |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
439 | 474 |
440 - Throws if return code is not 0. | 475 - Throws if return code is not 0. |
441 - Works even prior to python 2.7. | 476 - Works even prior to python 2.7. |
442 - Blocks stdin by default if not specified since no output will be visible. | 477 - Blocks stdin by default if not specified since no output will be visible. |
443 - As per doc, "The stdout argument is not allowed as it is used internally." | 478 - As per doc, "The stdout argument is not allowed as it is used internally." |
444 """ | 479 """ |
445 kwargs.setdefault('stdin', VOID) | 480 kwargs.setdefault('stdin', VOID) |
446 if 'stdout' in kwargs: | 481 if 'stdout' in kwargs: |
447 raise ValueError('stdout argument not allowed, it will be overridden.') | 482 raise ValueError('stdout argument not allowed, it will be overridden.') |
448 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 483 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |