Index: subprocess2.py |
diff --git a/subprocess2.py b/subprocess2.py |
index e81798613cf6946789841df1dd4fce44417d9897..ac44555dcf5a038478ca172669035d974910382d 100644 |
--- a/subprocess2.py |
+++ b/subprocess2.py |
@@ -132,6 +132,42 @@ def get_english_env(env): |
return env |
+class NagTimer(object): |
+ """ |
+ Triggers a callback when a time interval passes without an event being fired. |
+ |
+ For example, the event could be receiving terminal output from a subprocess; |
+ and the callback could print a warning to stderr that the subprocess appeared |
+ to be hung. |
+ """ |
+ def __init__(self, interval, cb): |
+ self.interval = interval |
+ self.cb = cb |
+ self.timer = threading.Timer(self.interval, self.fn) |
+ self.last_output = self.previous_last_output = 0 |
+ |
+ def start(self): |
+ self.last_output = self.previous_last_output = time.time() |
+ self.timer.start() |
+ |
+ def event(self): |
+ self.last_output = time.time() |
+ |
+ def fn(self): |
+ now = time.time() |
+ if self.last_output == self.previous_last_output: |
+ self.cb(now - self.previous_last_output) |
+ # Use 0.1 fudge factor, just in case |
+ # (self.last_output - now) is very close to zero. |
+ sleep_time = (self.last_output - now - 0.1) % self.interval |
+ self.previous_last_output = self.last_output |
+ self.timer = threading.Timer(sleep_time + 0.1, self.fn) |
+ self.timer.start() |
+ |
+ def cancel(self): |
+ self.timer.cancel() |
+ |
+ |
class Popen(subprocess.Popen): |
"""Wraps subprocess.Popen() with various workarounds. |
@@ -192,6 +228,7 @@ class Popen(subprocess.Popen): |
self.start = time.time() |
self.timeout = None |
self.nag_timer = None |
+ self.nag_max = None |
self.shell = kwargs.get('shell', None) |
# Silence pylint on MacOSX |
self.returncode = None |
@@ -230,8 +267,7 @@ class Popen(subprocess.Popen): |
# because of memory exhaustion. |
queue = Queue.Queue() |
done = threading.Event() |
- timer = [] |
- last_output = [time.time()] * 2 |
+ nag = None |
def write_stdin(): |
try: |
@@ -253,28 +289,12 @@ class Popen(subprocess.Popen): |
data = pipe.read(1) |
if not data: |
break |
- last_output[0] = time.time() |
+ if nag: |
+ nag.event() |
queue.put((name, data)) |
finally: |
queue.put(name) |
- def nag_fn(): |
- now = time.time() |
- if done.is_set(): |
- return |
- if last_output[0] == last_output[1]: |
- logging.warn(' No output for %.0f seconds from command:' % ( |
- now - last_output[1])) |
- logging.warn(' %s' % self.cmd_str) |
- # Use 0.1 fudge factor in case: |
- # now ~= last_output[0] + self.nag_timer |
- sleep_time = self.nag_timer + last_output[0] - now - 0.1 |
- while sleep_time < 0: |
- sleep_time += self.nag_timer |
- last_output[1] = last_output[0] |
- timer[0] = threading.Timer(sleep_time, nag_fn) |
- timer[0].start() |
- |
def timeout_fn(): |
try: |
done.wait(self.timeout) |
@@ -313,8 +333,15 @@ class Popen(subprocess.Popen): |
t.start() |
if self.nag_timer: |
- timer.append(threading.Timer(self.nag_timer, nag_fn)) |
- timer[0].start() |
+ def _nag_cb(elapsed): |
+ logging.warn(' No output for %.0f seconds from command:' % elapsed) |
+ logging.warn(' %s' % self.cmd_str) |
+ if (self.nag_max and |
+ int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max): |
+ queue.put('timeout') |
+ done.set() # Must do this so that timeout thread stops waiting. |
+ nag = NagTimer(self.nag_timer, _nag_cb) |
+ nag.start() |
timed_out = False |
try: |
@@ -327,20 +354,22 @@ class Popen(subprocess.Popen): |
self.stderr_cb(item[1]) |
else: |
# A thread terminated. |
- threads[item].join() |
- del threads[item] |
+ if item in threads: |
+ threads[item].join() |
+ del threads[item] |
if item == 'wait': |
# Terminate the timeout thread if necessary. |
done.set() |
elif item == 'timeout' and not timed_out and self.poll() is None: |
- logging.debug('Timed out after %fs: killing' % self.timeout) |
+ logging.debug('Timed out after %.0fs: killing' % ( |
+ time.time() - self.start)) |
self.kill() |
timed_out = True |
finally: |
# Stop the threads. |
done.set() |
- if timer: |
- timer[0].cancel() |
+ if nag: |
+ nag.cancel() |
if 'wait' in threads: |
# Accelerate things, otherwise it would hang until the child process is |
# done. |
@@ -353,7 +382,8 @@ class Popen(subprocess.Popen): |
self.returncode = TIMED_OUT |
# pylint: disable=W0221,W0622 |
- def communicate(self, input=None, timeout=None, nag_timer=None): |
+ def communicate(self, input=None, timeout=None, nag_timer=None, |
+ nag_max=None): |
"""Adds timeout and callbacks support. |
Returns (stdout, stderr) like subprocess.Popen().communicate(). |
@@ -365,6 +395,7 @@ class Popen(subprocess.Popen): |
""" |
self.timeout = timeout |
self.nag_timer = nag_timer |
+ self.nag_max = nag_max |
if (not self.timeout and not self.nag_timer and |
not self.stdout_cb and not self.stderr_cb): |
return super(Popen, self).communicate(input) |
@@ -393,7 +424,7 @@ class Popen(subprocess.Popen): |
return (stdout, stderr) |
-def communicate(args, timeout=None, nag_timer=None, **kwargs): |
+def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs): |
"""Wraps subprocess.Popen().communicate() and add timeout support. |
Returns ((stdout, stderr), returncode). |