| OLD | NEW |
| 1 # coding=utf8 | 1 # coding=utf8 |
| 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
| 6 | 6 |
| 7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import cStringIO | 10 import cStringIO |
| (...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 125 # Requires modifications. | 125 # Requires modifications. |
| 126 env = env.copy() | 126 env = env.copy() |
| 127 def fix_lang(name): | 127 def fix_lang(name): |
| 128 if not is_english(name): | 128 if not is_english(name): |
| 129 env[name] = 'en_US.UTF-8' | 129 env[name] = 'en_US.UTF-8' |
| 130 fix_lang('LANG') | 130 fix_lang('LANG') |
| 131 fix_lang('LANGUAGE') | 131 fix_lang('LANGUAGE') |
| 132 return env | 132 return env |
| 133 | 133 |
| 134 | 134 |
| 135 class NagTimer(object): |
| 136 """ |
| 137 Triggers a callback when a time interval passes without an event being fired. |
| 138 |
| 139 For example, the event could be receiving terminal output from a subprocess; |
| 140 and the callback could print a warning to stderr that the subprocess appeared |
| 141 to be hung. |
| 142 """ |
| 143 def __init__(self, interval, cb): |
| 144 self.interval = interval |
| 145 self.cb = cb |
| 146 self.timer = threading.Timer(self.interval, self.fn) |
| 147 self.last_output = self.previous_last_output = 0 |
| 148 |
| 149 def start(self): |
| 150 self.last_output = self.previous_last_output = time.time() |
| 151 self.timer.start() |
| 152 |
| 153 def event(self): |
| 154 self.last_output = time.time() |
| 155 |
| 156 def fn(self): |
| 157 now = time.time() |
| 158 if self.last_output == self.previous_last_output: |
| 159 self.cb(now - self.previous_last_output) |
| 160 # Use 0.1 fudge factor, just in case |
| 161 # (self.last_output - now) is very close to zero. |
| 162 sleep_time = (self.last_output - now - 0.1) % self.interval |
| 163 self.previous_last_output = self.last_output |
| 164 self.timer = threading.Timer(sleep_time + 0.1, self.fn) |
| 165 self.timer.start() |
| 166 |
| 167 def cancel(self): |
| 168 self.timer.cancel() |
| 169 |
| 170 |
| 135 class Popen(subprocess.Popen): | 171 class Popen(subprocess.Popen): |
| 136 """Wraps subprocess.Popen() with various workarounds. | 172 """Wraps subprocess.Popen() with various workarounds. |
| 137 | 173 |
| 138 - Forces English output since it's easier to parse the stdout if it is always | 174 - Forces English output since it's easier to parse the stdout if it is always |
| 139 in English. | 175 in English. |
| 140 - Sets shell=True on windows by default. You can override this by forcing | 176 - Sets shell=True on windows by default. You can override this by forcing |
| 141 shell parameter to a value. | 177 shell parameter to a value. |
| 142 - Adds support for VOID to not buffer when not needed. | 178 - Adds support for VOID to not buffer when not needed. |
| 143 - Adds self.start property. | 179 - Adds self.start property. |
| 144 | 180 |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 185 if kwargs.get(stream) in (VOID, os.devnull): | 221 if kwargs.get(stream) in (VOID, os.devnull): |
| 186 kwargs[stream] = open(os.devnull, 'w') | 222 kwargs[stream] = open(os.devnull, 'w') |
| 187 setattr(self, stream + '_is_void', True) | 223 setattr(self, stream + '_is_void', True) |
| 188 if callable(kwargs.get(stream)): | 224 if callable(kwargs.get(stream)): |
| 189 setattr(self, stream + '_cb', kwargs[stream]) | 225 setattr(self, stream + '_cb', kwargs[stream]) |
| 190 kwargs[stream] = PIPE | 226 kwargs[stream] = PIPE |
| 191 | 227 |
| 192 self.start = time.time() | 228 self.start = time.time() |
| 193 self.timeout = None | 229 self.timeout = None |
| 194 self.nag_timer = None | 230 self.nag_timer = None |
| 231 self.nag_max = None |
| 195 self.shell = kwargs.get('shell', None) | 232 self.shell = kwargs.get('shell', None) |
| 196 # Silence pylint on MacOSX | 233 # Silence pylint on MacOSX |
| 197 self.returncode = None | 234 self.returncode = None |
| 198 | 235 |
| 199 try: | 236 try: |
| 200 super(Popen, self).__init__(args, **kwargs) | 237 super(Popen, self).__init__(args, **kwargs) |
| 201 except OSError, e: | 238 except OSError, e: |
| 202 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 239 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
| 203 # Convert fork() emulation failure into a CygwinRebaseError(). | 240 # Convert fork() emulation failure into a CygwinRebaseError(). |
| 204 raise CygwinRebaseError( | 241 raise CygwinRebaseError( |
| (...skipping 18 matching lines...) Expand all Loading... |
| 223 effectiveness will be delayed accordingly. | 260 effectiveness will be delayed accordingly. |
| 224 """ | 261 """ |
| 225 # Queue of either of <threadname> when done or (<threadname>, data). In | 262 # Queue of either of <threadname> when done or (<threadname>, data). In |
| 226 # theory we would like to limit to ~64kb items to not cause large memory | 263 # theory we would like to limit to ~64kb items to not cause large memory |
| 227 # usage when the callback blocks. It is not done because it slows down | 264 # usage when the callback blocks. It is not done because it slows down |
| 228 # processing on OSX10.6 by a factor of 2x, making it even slower than | 265 # processing on OSX10.6 by a factor of 2x, making it even slower than |
| 229 # Windows! Revisit this decision if it becomes a problem, e.g. crash | 266 # Windows! Revisit this decision if it becomes a problem, e.g. crash |
| 230 # because of memory exhaustion. | 267 # because of memory exhaustion. |
| 231 queue = Queue.Queue() | 268 queue = Queue.Queue() |
| 232 done = threading.Event() | 269 done = threading.Event() |
| 233 timer = [] | 270 nag = None |
| 234 last_output = [time.time()] * 2 | |
| 235 | 271 |
| 236 def write_stdin(): | 272 def write_stdin(): |
| 237 try: | 273 try: |
| 238 stdin_io = cStringIO.StringIO(input) | 274 stdin_io = cStringIO.StringIO(input) |
| 239 while True: | 275 while True: |
| 240 data = stdin_io.read(1024) | 276 data = stdin_io.read(1024) |
| 241 if data: | 277 if data: |
| 242 self.stdin.write(data) | 278 self.stdin.write(data) |
| 243 else: | 279 else: |
| 244 self.stdin.close() | 280 self.stdin.close() |
| 245 break | 281 break |
| 246 finally: | 282 finally: |
| 247 queue.put('stdin') | 283 queue.put('stdin') |
| 248 | 284 |
| 249 def _queue_pipe_read(pipe, name): | 285 def _queue_pipe_read(pipe, name): |
| 250 """Queues characters read from a pipe into a queue.""" | 286 """Queues characters read from a pipe into a queue.""" |
| 251 try: | 287 try: |
| 252 while True: | 288 while True: |
| 253 data = pipe.read(1) | 289 data = pipe.read(1) |
| 254 if not data: | 290 if not data: |
| 255 break | 291 break |
| 256 last_output[0] = time.time() | 292 if nag: |
| 293 nag.event() |
| 257 queue.put((name, data)) | 294 queue.put((name, data)) |
| 258 finally: | 295 finally: |
| 259 queue.put(name) | 296 queue.put(name) |
| 260 | 297 |
| 261 def nag_fn(): | |
| 262 now = time.time() | |
| 263 if done.is_set(): | |
| 264 return | |
| 265 if last_output[0] == last_output[1]: | |
| 266 logging.warn(' No output for %.0f seconds from command:' % ( | |
| 267 now - last_output[1])) | |
| 268 logging.warn(' %s' % self.cmd_str) | |
| 269 # Use 0.1 fudge factor in case: | |
| 270 # now ~= last_output[0] + self.nag_timer | |
| 271 sleep_time = self.nag_timer + last_output[0] - now - 0.1 | |
| 272 while sleep_time < 0: | |
| 273 sleep_time += self.nag_timer | |
| 274 last_output[1] = last_output[0] | |
| 275 timer[0] = threading.Timer(sleep_time, nag_fn) | |
| 276 timer[0].start() | |
| 277 | |
| 278 def timeout_fn(): | 298 def timeout_fn(): |
| 279 try: | 299 try: |
| 280 done.wait(self.timeout) | 300 done.wait(self.timeout) |
| 281 finally: | 301 finally: |
| 282 queue.put('timeout') | 302 queue.put('timeout') |
| 283 | 303 |
| 284 def wait_fn(): | 304 def wait_fn(): |
| 285 try: | 305 try: |
| 286 self.wait() | 306 self.wait() |
| 287 finally: | 307 finally: |
| (...skipping 18 matching lines...) Expand all Loading... |
| 306 target=_queue_pipe_read, args=(self.stderr, 'stderr')) | 326 target=_queue_pipe_read, args=(self.stderr, 'stderr')) |
| 307 if input: | 327 if input: |
| 308 threads['stdin'] = threading.Thread(target=write_stdin) | 328 threads['stdin'] = threading.Thread(target=write_stdin) |
| 309 elif self.stdin: | 329 elif self.stdin: |
| 310 # Pipe but no input, make sure it's closed. | 330 # Pipe but no input, make sure it's closed. |
| 311 self.stdin.close() | 331 self.stdin.close() |
| 312 for t in threads.itervalues(): | 332 for t in threads.itervalues(): |
| 313 t.start() | 333 t.start() |
| 314 | 334 |
| 315 if self.nag_timer: | 335 if self.nag_timer: |
| 316 timer.append(threading.Timer(self.nag_timer, nag_fn)) | 336 def _nag_cb(elapsed): |
| 317 timer[0].start() | 337 logging.warn(' No output for %.0f seconds from command:' % elapsed) |
| 338 logging.warn(' %s' % self.cmd_str) |
| 339 if (self.nag_max and |
| 340 int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max): |
| 341 queue.put('timeout') |
| 342 done.set() # Must do this so that timeout thread stops waiting. |
| 343 nag = NagTimer(self.nag_timer, _nag_cb) |
| 344 nag.start() |
| 318 | 345 |
| 319 timed_out = False | 346 timed_out = False |
| 320 try: | 347 try: |
| 321 # This thread needs to be optimized for speed. | 348 # This thread needs to be optimized for speed. |
| 322 while threads: | 349 while threads: |
| 323 item = queue.get() | 350 item = queue.get() |
| 324 if item[0] == 'stdout': | 351 if item[0] == 'stdout': |
| 325 self.stdout_cb(item[1]) | 352 self.stdout_cb(item[1]) |
| 326 elif item[0] == 'stderr': | 353 elif item[0] == 'stderr': |
| 327 self.stderr_cb(item[1]) | 354 self.stderr_cb(item[1]) |
| 328 else: | 355 else: |
| 329 # A thread terminated. | 356 # A thread terminated. |
| 330 threads[item].join() | 357 if item in threads: |
| 331 del threads[item] | 358 threads[item].join() |
| 359 del threads[item] |
| 332 if item == 'wait': | 360 if item == 'wait': |
| 333 # Terminate the timeout thread if necessary. | 361 # Terminate the timeout thread if necessary. |
| 334 done.set() | 362 done.set() |
| 335 elif item == 'timeout' and not timed_out and self.poll() is None: | 363 elif item == 'timeout' and not timed_out and self.poll() is None: |
| 336 logging.debug('Timed out after %fs: killing' % self.timeout) | 364 logging.debug('Timed out after %.0fs: killing' % ( |
| 365 time.time() - self.start)) |
| 337 self.kill() | 366 self.kill() |
| 338 timed_out = True | 367 timed_out = True |
| 339 finally: | 368 finally: |
| 340 # Stop the threads. | 369 # Stop the threads. |
| 341 done.set() | 370 done.set() |
| 342 if timer: | 371 if nag: |
| 343 timer[0].cancel() | 372 nag.cancel() |
| 344 if 'wait' in threads: | 373 if 'wait' in threads: |
| 345 # Accelerate things, otherwise it would hang until the child process is | 374 # Accelerate things, otherwise it would hang until the child process is |
| 346 # done. | 375 # done. |
| 347 logging.debug('Killing child because of an exception') | 376 logging.debug('Killing child because of an exception') |
| 348 self.kill() | 377 self.kill() |
| 349 # Join threads. | 378 # Join threads. |
| 350 for thread in threads.itervalues(): | 379 for thread in threads.itervalues(): |
| 351 thread.join() | 380 thread.join() |
| 352 if timed_out: | 381 if timed_out: |
| 353 self.returncode = TIMED_OUT | 382 self.returncode = TIMED_OUT |
| 354 | 383 |
| 355 # pylint: disable=W0221,W0622 | 384 # pylint: disable=W0221,W0622 |
| 356 def communicate(self, input=None, timeout=None, nag_timer=None): | 385 def communicate(self, input=None, timeout=None, nag_timer=None, |
| 386 nag_max=None): |
| 357 """Adds timeout and callbacks support. | 387 """Adds timeout and callbacks support. |
| 358 | 388 |
| 359 Returns (stdout, stderr) like subprocess.Popen().communicate(). | 389 Returns (stdout, stderr) like subprocess.Popen().communicate(). |
| 360 | 390 |
| 361 - The process will be killed after |timeout| seconds and returncode set to | 391 - The process will be killed after |timeout| seconds and returncode set to |
| 362 TIMED_OUT. | 392 TIMED_OUT. |
| 363 - If the subprocess runs for |nag_timer| seconds without producing terminal | 393 - If the subprocess runs for |nag_timer| seconds without producing terminal |
| 364 output, print a warning to stderr. | 394 output, print a warning to stderr. |
| 365 """ | 395 """ |
| 366 self.timeout = timeout | 396 self.timeout = timeout |
| 367 self.nag_timer = nag_timer | 397 self.nag_timer = nag_timer |
| 398 self.nag_max = nag_max |
| 368 if (not self.timeout and not self.nag_timer and | 399 if (not self.timeout and not self.nag_timer and |
| 369 not self.stdout_cb and not self.stderr_cb): | 400 not self.stdout_cb and not self.stderr_cb): |
| 370 return super(Popen, self).communicate(input) | 401 return super(Popen, self).communicate(input) |
| 371 | 402 |
| 372 if self.timeout and self.shell: | 403 if self.timeout and self.shell: |
| 373 raise TypeError( | 404 raise TypeError( |
| 374 'Using timeout and shell simultaneously will cause a process leak ' | 405 'Using timeout and shell simultaneously will cause a process leak ' |
| 375 'since the shell will be killed instead of the child process.') | 406 'since the shell will be killed instead of the child process.') |
| 376 | 407 |
| 377 stdout = None | 408 stdout = None |
| 378 stderr = None | 409 stderr = None |
| 379 # Convert to a lambda to workaround python's deadlock. | 410 # Convert to a lambda to workaround python's deadlock. |
| 380 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 411 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
| 381 # When the pipe fills up, it would deadlock this process. | 412 # When the pipe fills up, it would deadlock this process. |
| 382 if self.stdout and not self.stdout_cb and not self.stdout_is_void: | 413 if self.stdout and not self.stdout_cb and not self.stdout_is_void: |
| 383 stdout = [] | 414 stdout = [] |
| 384 self.stdout_cb = stdout.append | 415 self.stdout_cb = stdout.append |
| 385 if self.stderr and not self.stderr_cb and not self.stderr_is_void: | 416 if self.stderr and not self.stderr_cb and not self.stderr_is_void: |
| 386 stderr = [] | 417 stderr = [] |
| 387 self.stderr_cb = stderr.append | 418 self.stderr_cb = stderr.append |
| 388 self._tee_threads(input) | 419 self._tee_threads(input) |
| 389 if stdout is not None: | 420 if stdout is not None: |
| 390 stdout = ''.join(stdout) | 421 stdout = ''.join(stdout) |
| 391 if stderr is not None: | 422 if stderr is not None: |
| 392 stderr = ''.join(stderr) | 423 stderr = ''.join(stderr) |
| 393 return (stdout, stderr) | 424 return (stdout, stderr) |
| 394 | 425 |
| 395 | 426 |
| 396 def communicate(args, timeout=None, nag_timer=None, **kwargs): | 427 def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs): |
| 397 """Wraps subprocess.Popen().communicate() and add timeout support. | 428 """Wraps subprocess.Popen().communicate() and add timeout support. |
| 398 | 429 |
| 399 Returns ((stdout, stderr), returncode). | 430 Returns ((stdout, stderr), returncode). |
| 400 | 431 |
| 401 - The process will be killed after |timeout| seconds and returncode set to | 432 - The process will be killed after |timeout| seconds and returncode set to |
| 402 TIMED_OUT. | 433 TIMED_OUT. |
| 403 - If the subprocess runs for |nag_timer| seconds without producing terminal | 434 - If the subprocess runs for |nag_timer| seconds without producing terminal |
| 404 output, print a warning to stderr. | 435 output, print a warning to stderr. |
| 405 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 436 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
| 406 """ | 437 """ |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 474 | 505 |
| 475 - Throws if return code is not 0. | 506 - Throws if return code is not 0. |
| 476 - Works even prior to python 2.7. | 507 - Works even prior to python 2.7. |
| 477 - Blocks stdin by default if not specified since no output will be visible. | 508 - Blocks stdin by default if not specified since no output will be visible. |
| 478 - As per doc, "The stdout argument is not allowed as it is used internally." | 509 - As per doc, "The stdout argument is not allowed as it is used internally." |
| 479 """ | 510 """ |
| 480 kwargs.setdefault('stdin', VOID) | 511 kwargs.setdefault('stdin', VOID) |
| 481 if 'stdout' in kwargs: | 512 if 'stdout' in kwargs: |
| 482 raise ValueError('stdout argument not allowed, it will be overridden.') | 513 raise ValueError('stdout argument not allowed, it will be overridden.') |
| 483 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 514 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
| OLD | NEW |