OLD | NEW |
1 # coding=utf8 | 1 # coding=utf8 |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 """Collection of subprocess wrapper functions. | 5 """Collection of subprocess wrapper functions. |
6 | 6 |
7 In theory you shouldn't need anything else in subprocess, or this module failed. | 7 In theory you shouldn't need anything else in subprocess, or this module failed. |
8 """ | 8 """ |
9 | 9 |
10 import cStringIO | 10 import cStringIO |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
125 # Requires modifications. | 125 # Requires modifications. |
126 env = env.copy() | 126 env = env.copy() |
127 def fix_lang(name): | 127 def fix_lang(name): |
128 if not is_english(name): | 128 if not is_english(name): |
129 env[name] = 'en_US.UTF-8' | 129 env[name] = 'en_US.UTF-8' |
130 fix_lang('LANG') | 130 fix_lang('LANG') |
131 fix_lang('LANGUAGE') | 131 fix_lang('LANGUAGE') |
132 return env | 132 return env |
133 | 133 |
134 | 134 |
| 135 class NagTimer(object): |
| 136 """ |
| 137 Triggers a callback when a time interval passes without an event being fired. |
| 138 |
| 139 For example, the event could be receiving terminal output from a subprocess; |
| 140 and the callback could print a warning to stderr that the subprocess appeared |
| 141 to be hung. |
| 142 """ |
| 143 def __init__(self, interval, cb): |
| 144 self.interval = interval |
| 145 self.cb = cb |
| 146 self.timer = threading.Timer(self.interval, self.fn) |
| 147 self.last_output = self.previous_last_output = 0 |
| 148 |
| 149 def start(self): |
| 150 self.last_output = self.previous_last_output = time.time() |
| 151 self.timer.start() |
| 152 |
| 153 def event(self): |
| 154 self.last_output = time.time() |
| 155 |
| 156 def fn(self): |
| 157 now = time.time() |
| 158 if self.last_output == self.previous_last_output: |
| 159 self.cb(now - self.previous_last_output) |
| 160 # Use 0.1 fudge factor, just in case |
| 161 # (self.last_output - now) is very close to zero. |
| 162 sleep_time = (self.last_output - now - 0.1) % self.interval |
| 163 self.previous_last_output = self.last_output |
| 164 self.timer = threading.Timer(sleep_time + 0.1, self.fn) |
| 165 self.timer.start() |
| 166 |
| 167 def cancel(self): |
| 168 self.timer.cancel() |
| 169 |
| 170 |
135 class Popen(subprocess.Popen): | 171 class Popen(subprocess.Popen): |
136 """Wraps subprocess.Popen() with various workarounds. | 172 """Wraps subprocess.Popen() with various workarounds. |
137 | 173 |
138 - Forces English output since it's easier to parse the stdout if it is always | 174 - Forces English output since it's easier to parse the stdout if it is always |
139 in English. | 175 in English. |
140 - Sets shell=True on windows by default. You can override this by forcing | 176 - Sets shell=True on windows by default. You can override this by forcing |
141 shell parameter to a value. | 177 shell parameter to a value. |
142 - Adds support for VOID to not buffer when not needed. | 178 - Adds support for VOID to not buffer when not needed. |
143 - Adds self.start property. | 179 - Adds self.start property. |
144 | 180 |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
185 if kwargs.get(stream) in (VOID, os.devnull): | 221 if kwargs.get(stream) in (VOID, os.devnull): |
186 kwargs[stream] = open(os.devnull, 'w') | 222 kwargs[stream] = open(os.devnull, 'w') |
187 setattr(self, stream + '_is_void', True) | 223 setattr(self, stream + '_is_void', True) |
188 if callable(kwargs.get(stream)): | 224 if callable(kwargs.get(stream)): |
189 setattr(self, stream + '_cb', kwargs[stream]) | 225 setattr(self, stream + '_cb', kwargs[stream]) |
190 kwargs[stream] = PIPE | 226 kwargs[stream] = PIPE |
191 | 227 |
192 self.start = time.time() | 228 self.start = time.time() |
193 self.timeout = None | 229 self.timeout = None |
194 self.nag_timer = None | 230 self.nag_timer = None |
| 231 self.nag_max = None |
195 self.shell = kwargs.get('shell', None) | 232 self.shell = kwargs.get('shell', None) |
196 # Silence pylint on MacOSX | 233 # Silence pylint on MacOSX |
197 self.returncode = None | 234 self.returncode = None |
198 | 235 |
199 try: | 236 try: |
200 super(Popen, self).__init__(args, **kwargs) | 237 super(Popen, self).__init__(args, **kwargs) |
201 except OSError, e: | 238 except OSError, e: |
202 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': | 239 if e.errno == errno.EAGAIN and sys.platform == 'cygwin': |
203 # Convert fork() emulation failure into a CygwinRebaseError(). | 240 # Convert fork() emulation failure into a CygwinRebaseError(). |
204 raise CygwinRebaseError( | 241 raise CygwinRebaseError( |
(...skipping 18 matching lines...) Expand all Loading... |
223 effectiveness will be delayed accordingly. | 260 effectiveness will be delayed accordingly. |
224 """ | 261 """ |
225 # Queue of either of <threadname> when done or (<threadname>, data). In | 262 # Queue of either of <threadname> when done or (<threadname>, data). In |
226 # theory we would like to limit to ~64kb items to not cause large memory | 263 # theory we would like to limit to ~64kb items to not cause large memory |
227 # usage when the callback blocks. It is not done because it slows down | 264 # usage when the callback blocks. It is not done because it slows down |
228 # processing on OSX10.6 by a factor of 2x, making it even slower than | 265 # processing on OSX10.6 by a factor of 2x, making it even slower than |
229 # Windows! Revisit this decision if it becomes a problem, e.g. crash | 266 # Windows! Revisit this decision if it becomes a problem, e.g. crash |
230 # because of memory exhaustion. | 267 # because of memory exhaustion. |
231 queue = Queue.Queue() | 268 queue = Queue.Queue() |
232 done = threading.Event() | 269 done = threading.Event() |
233 timer = [] | 270 nag = None |
234 last_output = [time.time()] * 2 | |
235 | 271 |
236 def write_stdin(): | 272 def write_stdin(): |
237 try: | 273 try: |
238 stdin_io = cStringIO.StringIO(input) | 274 stdin_io = cStringIO.StringIO(input) |
239 while True: | 275 while True: |
240 data = stdin_io.read(1024) | 276 data = stdin_io.read(1024) |
241 if data: | 277 if data: |
242 self.stdin.write(data) | 278 self.stdin.write(data) |
243 else: | 279 else: |
244 self.stdin.close() | 280 self.stdin.close() |
245 break | 281 break |
246 finally: | 282 finally: |
247 queue.put('stdin') | 283 queue.put('stdin') |
248 | 284 |
249 def _queue_pipe_read(pipe, name): | 285 def _queue_pipe_read(pipe, name): |
250 """Queues characters read from a pipe into a queue.""" | 286 """Queues characters read from a pipe into a queue.""" |
251 try: | 287 try: |
252 while True: | 288 while True: |
253 data = pipe.read(1) | 289 data = pipe.read(1) |
254 if not data: | 290 if not data: |
255 break | 291 break |
256 last_output[0] = time.time() | 292 if nag: |
| 293 nag.event() |
257 queue.put((name, data)) | 294 queue.put((name, data)) |
258 finally: | 295 finally: |
259 queue.put(name) | 296 queue.put(name) |
260 | 297 |
261 def nag_fn(): | |
262 now = time.time() | |
263 if done.is_set(): | |
264 return | |
265 if last_output[0] == last_output[1]: | |
266 logging.warn(' No output for %.0f seconds from command:' % ( | |
267 now - last_output[1])) | |
268 logging.warn(' %s' % self.cmd_str) | |
269 # Use 0.1 fudge factor in case: | |
270 # now ~= last_output[0] + self.nag_timer | |
271 sleep_time = self.nag_timer + last_output[0] - now - 0.1 | |
272 while sleep_time < 0: | |
273 sleep_time += self.nag_timer | |
274 last_output[1] = last_output[0] | |
275 timer[0] = threading.Timer(sleep_time, nag_fn) | |
276 timer[0].start() | |
277 | |
278 def timeout_fn(): | 298 def timeout_fn(): |
279 try: | 299 try: |
280 done.wait(self.timeout) | 300 done.wait(self.timeout) |
281 finally: | 301 finally: |
282 queue.put('timeout') | 302 queue.put('timeout') |
283 | 303 |
284 def wait_fn(): | 304 def wait_fn(): |
285 try: | 305 try: |
286 self.wait() | 306 self.wait() |
287 finally: | 307 finally: |
(...skipping 18 matching lines...) Expand all Loading... |
306 target=_queue_pipe_read, args=(self.stderr, 'stderr')) | 326 target=_queue_pipe_read, args=(self.stderr, 'stderr')) |
307 if input: | 327 if input: |
308 threads['stdin'] = threading.Thread(target=write_stdin) | 328 threads['stdin'] = threading.Thread(target=write_stdin) |
309 elif self.stdin: | 329 elif self.stdin: |
310 # Pipe but no input, make sure it's closed. | 330 # Pipe but no input, make sure it's closed. |
311 self.stdin.close() | 331 self.stdin.close() |
312 for t in threads.itervalues(): | 332 for t in threads.itervalues(): |
313 t.start() | 333 t.start() |
314 | 334 |
315 if self.nag_timer: | 335 if self.nag_timer: |
316 timer.append(threading.Timer(self.nag_timer, nag_fn)) | 336 def _nag_cb(elapsed): |
317 timer[0].start() | 337 logging.warn(' No output for %.0f seconds from command:' % elapsed) |
| 338 logging.warn(' %s' % self.cmd_str) |
| 339 if (self.nag_max and |
| 340 int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max): |
| 341 queue.put('timeout') |
| 342 done.set() # Must do this so that timeout thread stops waiting. |
| 343 nag = NagTimer(self.nag_timer, _nag_cb) |
| 344 nag.start() |
318 | 345 |
319 timed_out = False | 346 timed_out = False |
320 try: | 347 try: |
321 # This thread needs to be optimized for speed. | 348 # This thread needs to be optimized for speed. |
322 while threads: | 349 while threads: |
323 item = queue.get() | 350 item = queue.get() |
324 if item[0] == 'stdout': | 351 if item[0] == 'stdout': |
325 self.stdout_cb(item[1]) | 352 self.stdout_cb(item[1]) |
326 elif item[0] == 'stderr': | 353 elif item[0] == 'stderr': |
327 self.stderr_cb(item[1]) | 354 self.stderr_cb(item[1]) |
328 else: | 355 else: |
329 # A thread terminated. | 356 # A thread terminated. |
330 threads[item].join() | 357 if item in threads: |
331 del threads[item] | 358 threads[item].join() |
| 359 del threads[item] |
332 if item == 'wait': | 360 if item == 'wait': |
333 # Terminate the timeout thread if necessary. | 361 # Terminate the timeout thread if necessary. |
334 done.set() | 362 done.set() |
335 elif item == 'timeout' and not timed_out and self.poll() is None: | 363 elif item == 'timeout' and not timed_out and self.poll() is None: |
336 logging.debug('Timed out after %fs: killing' % self.timeout) | 364 logging.debug('Timed out after %.0fs: killing' % ( |
| 365 time.time() - self.start)) |
337 self.kill() | 366 self.kill() |
338 timed_out = True | 367 timed_out = True |
339 finally: | 368 finally: |
340 # Stop the threads. | 369 # Stop the threads. |
341 done.set() | 370 done.set() |
342 if timer: | 371 if nag: |
343 timer[0].cancel() | 372 nag.cancel() |
344 if 'wait' in threads: | 373 if 'wait' in threads: |
345 # Accelerate things, otherwise it would hang until the child process is | 374 # Accelerate things, otherwise it would hang until the child process is |
346 # done. | 375 # done. |
347 logging.debug('Killing child because of an exception') | 376 logging.debug('Killing child because of an exception') |
348 self.kill() | 377 self.kill() |
349 # Join threads. | 378 # Join threads. |
350 for thread in threads.itervalues(): | 379 for thread in threads.itervalues(): |
351 thread.join() | 380 thread.join() |
352 if timed_out: | 381 if timed_out: |
353 self.returncode = TIMED_OUT | 382 self.returncode = TIMED_OUT |
354 | 383 |
355 # pylint: disable=W0221,W0622 | 384 # pylint: disable=W0221,W0622 |
356 def communicate(self, input=None, timeout=None, nag_timer=None): | 385 def communicate(self, input=None, timeout=None, nag_timer=None, |
| 386 nag_max=None): |
357 """Adds timeout and callbacks support. | 387 """Adds timeout and callbacks support. |
358 | 388 |
359 Returns (stdout, stderr) like subprocess.Popen().communicate(). | 389 Returns (stdout, stderr) like subprocess.Popen().communicate(). |
360 | 390 |
361 - The process will be killed after |timeout| seconds and returncode set to | 391 - The process will be killed after |timeout| seconds and returncode set to |
362 TIMED_OUT. | 392 TIMED_OUT. |
363 - If the subprocess runs for |nag_timer| seconds without producing terminal | 393 - If the subprocess runs for |nag_timer| seconds without producing terminal |
364 output, print a warning to stderr. | 394 output, print a warning to stderr. |
365 """ | 395 """ |
366 self.timeout = timeout | 396 self.timeout = timeout |
367 self.nag_timer = nag_timer | 397 self.nag_timer = nag_timer |
| 398 self.nag_max = nag_max |
368 if (not self.timeout and not self.nag_timer and | 399 if (not self.timeout and not self.nag_timer and |
369 not self.stdout_cb and not self.stderr_cb): | 400 not self.stdout_cb and not self.stderr_cb): |
370 return super(Popen, self).communicate(input) | 401 return super(Popen, self).communicate(input) |
371 | 402 |
372 if self.timeout and self.shell: | 403 if self.timeout and self.shell: |
373 raise TypeError( | 404 raise TypeError( |
374 'Using timeout and shell simultaneously will cause a process leak ' | 405 'Using timeout and shell simultaneously will cause a process leak ' |
375 'since the shell will be killed instead of the child process.') | 406 'since the shell will be killed instead of the child process.') |
376 | 407 |
377 stdout = None | 408 stdout = None |
378 stderr = None | 409 stderr = None |
379 # Convert to a lambda to workaround python's deadlock. | 410 # Convert to a lambda to workaround python's deadlock. |
380 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait | 411 # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait |
381 # When the pipe fills up, it would deadlock this process. | 412 # When the pipe fills up, it would deadlock this process. |
382 if self.stdout and not self.stdout_cb and not self.stdout_is_void: | 413 if self.stdout and not self.stdout_cb and not self.stdout_is_void: |
383 stdout = [] | 414 stdout = [] |
384 self.stdout_cb = stdout.append | 415 self.stdout_cb = stdout.append |
385 if self.stderr and not self.stderr_cb and not self.stderr_is_void: | 416 if self.stderr and not self.stderr_cb and not self.stderr_is_void: |
386 stderr = [] | 417 stderr = [] |
387 self.stderr_cb = stderr.append | 418 self.stderr_cb = stderr.append |
388 self._tee_threads(input) | 419 self._tee_threads(input) |
389 if stdout is not None: | 420 if stdout is not None: |
390 stdout = ''.join(stdout) | 421 stdout = ''.join(stdout) |
391 if stderr is not None: | 422 if stderr is not None: |
392 stderr = ''.join(stderr) | 423 stderr = ''.join(stderr) |
393 return (stdout, stderr) | 424 return (stdout, stderr) |
394 | 425 |
395 | 426 |
396 def communicate(args, timeout=None, nag_timer=None, **kwargs): | 427 def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs): |
397 """Wraps subprocess.Popen().communicate() and add timeout support. | 428 """Wraps subprocess.Popen().communicate() and add timeout support. |
398 | 429 |
399 Returns ((stdout, stderr), returncode). | 430 Returns ((stdout, stderr), returncode). |
400 | 431 |
401 - The process will be killed after |timeout| seconds and returncode set to | 432 - The process will be killed after |timeout| seconds and returncode set to |
402 TIMED_OUT. | 433 TIMED_OUT. |
403 - If the subprocess runs for |nag_timer| seconds without producing terminal | 434 - If the subprocess runs for |nag_timer| seconds without producing terminal |
404 output, print a warning to stderr. | 435 output, print a warning to stderr. |
405 - Automatically passes stdin content as input so do not specify stdin=PIPE. | 436 - Automatically passes stdin content as input so do not specify stdin=PIPE. |
406 """ | 437 """ |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
474 | 505 |
475 - Throws if return code is not 0. | 506 - Throws if return code is not 0. |
476 - Works even prior to python 2.7. | 507 - Works even prior to python 2.7. |
477 - Blocks stdin by default if not specified since no output will be visible. | 508 - Blocks stdin by default if not specified since no output will be visible. |
478 - As per doc, "The stdout argument is not allowed as it is used internally." | 509 - As per doc, "The stdout argument is not allowed as it is used internally." |
479 """ | 510 """ |
480 kwargs.setdefault('stdin', VOID) | 511 kwargs.setdefault('stdin', VOID) |
481 if 'stdout' in kwargs: | 512 if 'stdout' in kwargs: |
482 raise ValueError('stdout argument not allowed, it will be overridden.') | 513 raise ValueError('stdout argument not allowed, it will be overridden.') |
483 return check_call_out(args, stdout=PIPE, **kwargs)[0] | 514 return check_call_out(args, stdout=PIPE, **kwargs)[0] |
OLD | NEW |