| Index: third_party/cherrypy/process/wspbus.py
|
| ===================================================================
|
| --- third_party/cherrypy/process/wspbus.py (revision 0)
|
| +++ third_party/cherrypy/process/wspbus.py (revision 0)
|
| @@ -0,0 +1,432 @@
|
| +"""An implementation of the Web Site Process Bus.
|
| +
|
| +This module is completely standalone, depending only on the stdlib.
|
| +
|
| +Web Site Process Bus
|
| +--------------------
|
| +
|
| +A Bus object is used to contain and manage site-wide behavior:
|
| +daemonization, HTTP server start/stop, process reload, signal handling,
|
| +drop privileges, PID file management, logging for all of these,
|
| +and many more.
|
| +
|
| +In addition, a Bus object provides a place for each web framework
|
| +to register code that runs in response to site-wide events (like
|
| +process start and stop), or which controls or otherwise interacts with
|
| +the site-wide components mentioned above. For example, a framework which
|
| +uses file-based templates would add known template filenames to an
|
| +autoreload component.
|
| +
|
| +Ideally, a Bus object will be flexible enough to be useful in a variety
|
| +of invocation scenarios:
|
| +
|
| + 1. The deployer starts a site from the command line via a
|
| + framework-neutral deployment script; applications from multiple frameworks
|
| + are mixed in a single site. Command-line arguments and configuration
|
| + files are used to define site-wide components such as the HTTP server,
|
| + WSGI component graph, autoreload behavior, signal handling, etc.
|
| + 2. The deployer starts a site via some other process, such as Apache;
|
| + applications from multiple frameworks are mixed in a single site.
|
| + Autoreload and signal handling (from Python at least) are disabled.
|
| + 3. The deployer starts a site via a framework-specific mechanism;
|
| + for example, when running tests, exploring tutorials, or deploying
|
| + single applications from a single framework. The framework controls
|
| + which site-wide components are enabled as it sees fit.
|
| +
|
| +The Bus object in this package uses topic-based publish-subscribe
|
| +messaging to accomplish all this. A few topic channels are built in
|
| +('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
|
| +site containers are free to define their own. If a message is sent to a
|
| +channel that has not been defined or has no listeners, there is no effect.
|
| +
|
| +In general, there should only ever be a single Bus object per process.
|
| +Frameworks and site containers share a single Bus object by publishing
|
| +messages and subscribing listeners.
|
| +
|
| +The Bus object works as a finite state machine which models the current
|
| +state of the process. Bus methods move it from one state to another;
|
| +those methods then publish to subscribed listeners on the channel for
|
| +the new state.::
|
| +
|
| + O
|
| + |
|
| + V
|
| + STOPPING --> STOPPED --> EXITING -> X
|
| + A A |
|
| + | \___ |
|
| + | \ |
|
| + | V V
|
| + STARTED <-- STARTING
|
| +
|
| +"""
|
| +
|
| +import atexit
|
| +import os
|
| +import sys
|
| +import threading
|
| +import time
|
| +import traceback as _traceback
|
| +import warnings
|
| +
|
| +from cherrypy._cpcompat import set
|
| +
|
| +# Here I save the value of os.getcwd(), which, if I am imported early enough,
|
| +# will be the directory from which the startup script was run. This is needed
|
| +# by _do_execv(), to change back to the original directory before execv()ing a
|
| +# new process. This is a defense against the application having changed the
|
| +# current working directory (which could make sys.executable "not found" if
|
| +# sys.executable is a relative-path, and/or cause other problems).
|
| +_startup_cwd = os.getcwd()
|
| +
|
| +class ChannelFailures(Exception):
|
| + """Exception raised when errors occur in a listener during Bus.publish()."""
|
| + delimiter = '\n'
|
| +
|
| + def __init__(self, *args, **kwargs):
|
| + # Don't use 'super' here; Exceptions are old-style in Py2.4
|
| + # See http://www.cherrypy.org/ticket/959
|
| + Exception.__init__(self, *args, **kwargs)
|
| + self._exceptions = list()
|
| +
|
| + def handle_exception(self):
|
| + """Append the current exception to self."""
|
| + self._exceptions.append(sys.exc_info()[1])
|
| +
|
| + def get_instances(self):
|
| + """Return a list of seen exception instances."""
|
| + return self._exceptions[:]
|
| +
|
| + def __str__(self):
|
| + exception_strings = map(repr, self.get_instances())
|
| + return self.delimiter.join(exception_strings)
|
| +
|
| + __repr__ = __str__
|
| +
|
| + def __bool__(self):
|
| + return bool(self._exceptions)
|
| + __nonzero__ = __bool__
|
| +
|
| +# Use a flag to indicate the state of the bus.
|
| +class _StateEnum(object):
|
| + class State(object):
|
| + name = None
|
| + def __repr__(self):
|
| + return "states.%s" % self.name
|
| +
|
| + def __setattr__(self, key, value):
|
| + if isinstance(value, self.State):
|
| + value.name = key
|
| + object.__setattr__(self, key, value)
|
| +states = _StateEnum()
|
| +states.STOPPED = states.State()
|
| +states.STARTING = states.State()
|
| +states.STARTED = states.State()
|
| +states.STOPPING = states.State()
|
| +states.EXITING = states.State()
|
| +
|
| +
|
| +try:
|
| + import fcntl
|
| +except ImportError:
|
| + max_files = 0
|
| +else:
|
| + try:
|
| + max_files = os.sysconf('SC_OPEN_MAX')
|
| + except AttributeError:
|
| + max_files = 1024
|
| +
|
| +
|
| +class Bus(object):
|
| + """Process state-machine and messenger for HTTP site deployment.
|
| +
|
| + All listeners for a given channel are guaranteed to be called even
|
| + if others at the same channel fail. Each failure is logged, but
|
| + execution proceeds on to the next listener. The only way to stop all
|
| + processing from inside a listener is to raise SystemExit and stop the
|
| + whole server.
|
| + """
|
| +
|
| + states = states
|
| + state = states.STOPPED
|
| + execv = False
|
| + max_cloexec_files = max_files
|
| +
|
| + def __init__(self):
|
| + self.execv = False
|
| + self.state = states.STOPPED
|
| + self.listeners = dict(
|
| + [(channel, set()) for channel
|
| + in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
|
| + self._priorities = {}
|
| +
|
| + def subscribe(self, channel, callback, priority=None):
|
| + """Add the given callback at the given channel (if not present)."""
|
| + if channel not in self.listeners:
|
| + self.listeners[channel] = set()
|
| + self.listeners[channel].add(callback)
|
| +
|
| + if priority is None:
|
| + priority = getattr(callback, 'priority', 50)
|
| + self._priorities[(channel, callback)] = priority
|
| +
|
| + def unsubscribe(self, channel, callback):
|
| + """Discard the given callback (if present)."""
|
| + listeners = self.listeners.get(channel)
|
| + if listeners and callback in listeners:
|
| + listeners.discard(callback)
|
| + del self._priorities[(channel, callback)]
|
| +
|
| + def publish(self, channel, *args, **kwargs):
|
| + """Return output of all subscribers for the given channel."""
|
| + if channel not in self.listeners:
|
| + return []
|
| +
|
| + exc = ChannelFailures()
|
| + output = []
|
| +
|
| + items = [(self._priorities[(channel, listener)], listener)
|
| + for listener in self.listeners[channel]]
|
| + try:
|
| + items.sort(key=lambda item: item[0])
|
| + except TypeError:
|
| + # Python 2.3 had no 'key' arg, but that doesn't matter
|
| + # since it could sort dissimilar types just fine.
|
| + items.sort()
|
| + for priority, listener in items:
|
| + try:
|
| + output.append(listener(*args, **kwargs))
|
| + except KeyboardInterrupt:
|
| + raise
|
| + except SystemExit:
|
| + e = sys.exc_info()[1]
|
| + # If we have previous errors ensure the exit code is non-zero
|
| + if exc and e.code == 0:
|
| + e.code = 1
|
| + raise
|
| + except:
|
| + exc.handle_exception()
|
| + if channel == 'log':
|
| + # Assume any further messages to 'log' will fail.
|
| + pass
|
| + else:
|
| + self.log("Error in %r listener %r" % (channel, listener),
|
| + level=40, traceback=True)
|
| + if exc:
|
| + raise exc
|
| + return output
|
| +
|
| + def _clean_exit(self):
|
| + """An atexit handler which asserts the Bus is not running."""
|
| + if self.state != states.EXITING:
|
| + warnings.warn(
|
| + "The main thread is exiting, but the Bus is in the %r state; "
|
| + "shutting it down automatically now. You must either call "
|
| + "bus.block() after start(), or call bus.exit() before the "
|
| + "main thread exits." % self.state, RuntimeWarning)
|
| + self.exit()
|
| +
|
| + def start(self):
|
| + """Start all services."""
|
| + atexit.register(self._clean_exit)
|
| +
|
| + self.state = states.STARTING
|
| + self.log('Bus STARTING')
|
| + try:
|
| + self.publish('start')
|
| + self.state = states.STARTED
|
| + self.log('Bus STARTED')
|
| + except (KeyboardInterrupt, SystemExit):
|
| + raise
|
| + except:
|
| + self.log("Shutting down due to error in start listener:",
|
| + level=40, traceback=True)
|
| + e_info = sys.exc_info()[1]
|
| + try:
|
| + self.exit()
|
| + except:
|
| + # Any stop/exit errors will be logged inside publish().
|
| + pass
|
| + # Re-raise the original error
|
| + raise e_info
|
| +
|
| + def exit(self):
|
| + """Stop all services and prepare to exit the process."""
|
| + exitstate = self.state
|
| + try:
|
| + self.stop()
|
| +
|
| + self.state = states.EXITING
|
| + self.log('Bus EXITING')
|
| + self.publish('exit')
|
| + # This isn't strictly necessary, but it's better than seeing
|
| + # "Waiting for child threads to terminate..." and then nothing.
|
| + self.log('Bus EXITED')
|
| + except:
|
| + # This method is often called asynchronously (whether thread,
|
| + # signal handler, console handler, or atexit handler), so we
|
| + # can't just let exceptions propagate out unhandled.
|
| + # Assume it's been logged and just die.
|
| + os._exit(70) # EX_SOFTWARE
|
| +
|
| + if exitstate == states.STARTING:
|
| + # exit() was called before start() finished, possibly due to
|
| + # Ctrl-C because a start listener got stuck. In this case,
|
| + # we could get stuck in a loop where Ctrl-C never exits the
|
| + # process, so we just call os.exit here.
|
| + os._exit(70) # EX_SOFTWARE
|
| +
|
| + def restart(self):
|
| + """Restart the process (may close connections).
|
| +
|
| + This method does not restart the process from the calling thread;
|
| + instead, it stops the bus and asks the main thread to call execv.
|
| + """
|
| + self.execv = True
|
| + self.exit()
|
| +
|
| + def graceful(self):
|
| + """Advise all services to reload."""
|
| + self.log('Bus graceful')
|
| + self.publish('graceful')
|
| +
|
| + def block(self, interval=0.1):
|
| + """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
|
| +
|
| + This function is intended to be called only by the main thread.
|
| + After waiting for the EXITING state, it also waits for all threads
|
| + to terminate, and then calls os.execv if self.execv is True. This
|
| + design allows another thread to call bus.restart, yet have the main
|
| + thread perform the actual execv call (required on some platforms).
|
| + """
|
| + try:
|
| + self.wait(states.EXITING, interval=interval, channel='main')
|
| + except (KeyboardInterrupt, IOError):
|
| + # The time.sleep call might raise
|
| + # "IOError: [Errno 4] Interrupted function call" on KBInt.
|
| + self.log('Keyboard Interrupt: shutting down bus')
|
| + self.exit()
|
| + except SystemExit:
|
| + self.log('SystemExit raised: shutting down bus')
|
| + self.exit()
|
| + raise
|
| +
|
| + # Waiting for ALL child threads to finish is necessary on OS X.
|
| + # See http://www.cherrypy.org/ticket/581.
|
| + # It's also good to let them all shut down before allowing
|
| + # the main thread to call atexit handlers.
|
| + # See http://www.cherrypy.org/ticket/751.
|
| + self.log("Waiting for child threads to terminate...")
|
| + for t in threading.enumerate():
|
| + if t != threading.currentThread() and t.isAlive():
|
| + # Note that any dummy (external) threads are always daemonic.
|
| + if hasattr(threading.Thread, "daemon"):
|
| + # Python 2.6+
|
| + d = t.daemon
|
| + else:
|
| + d = t.isDaemon()
|
| + if not d:
|
| + self.log("Waiting for thread %s." % t.getName())
|
| + t.join()
|
| +
|
| + if self.execv:
|
| + self._do_execv()
|
| +
|
| + def wait(self, state, interval=0.1, channel=None):
|
| + """Poll for the given state(s) at intervals; publish to channel."""
|
| + if isinstance(state, (tuple, list)):
|
| + states = state
|
| + else:
|
| + states = [state]
|
| +
|
| + def _wait():
|
| + while self.state not in states:
|
| + time.sleep(interval)
|
| + self.publish(channel)
|
| +
|
| + # From http://psyco.sourceforge.net/psycoguide/bugs.html:
|
| + # "The compiled machine code does not include the regular polling
|
| + # done by Python, meaning that a KeyboardInterrupt will not be
|
| + # detected before execution comes back to the regular Python
|
| + # interpreter. Your program cannot be interrupted if caught
|
| + # into an infinite Psyco-compiled loop."
|
| + try:
|
| + sys.modules['psyco'].cannotcompile(_wait)
|
| + except (KeyError, AttributeError):
|
| + pass
|
| +
|
| + _wait()
|
| +
|
| + def _do_execv(self):
|
| + """Re-execute the current process.
|
| +
|
| + This must be called from the main thread, because certain platforms
|
| + (OS X) don't allow execv to be called in a child thread very well.
|
| + """
|
| + args = sys.argv[:]
|
| + self.log('Re-spawning %s' % ' '.join(args))
|
| +
|
| + if sys.platform[:4] == 'java':
|
| + from _systemrestart import SystemRestart
|
| + raise SystemRestart
|
| + else:
|
| + args.insert(0, sys.executable)
|
| + if sys.platform == 'win32':
|
| + args = ['"%s"' % arg for arg in args]
|
| +
|
| + os.chdir(_startup_cwd)
|
| + if self.max_cloexec_files:
|
| + self._set_cloexec()
|
| + os.execv(sys.executable, args)
|
| +
|
| + def _set_cloexec(self):
|
| + """Set the CLOEXEC flag on all open files (except stdin/out/err).
|
| +
|
| + If self.max_cloexec_files is an integer (the default), then on
|
| + platforms which support it, it represents the max open files setting
|
| + for the operating system. This function will be called just before
|
| + the process is restarted via os.execv() to prevent open files
|
| + from persisting into the new process.
|
| +
|
| + Set self.max_cloexec_files to 0 to disable this behavior.
|
| + """
|
| + for fd in range(3, self.max_cloexec_files): # skip stdin/out/err
|
| + try:
|
| + flags = fcntl.fcntl(fd, fcntl.F_GETFD)
|
| + except IOError:
|
| + continue
|
| + fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
|
| +
|
| + def stop(self):
|
| + """Stop all services."""
|
| + self.state = states.STOPPING
|
| + self.log('Bus STOPPING')
|
| + self.publish('stop')
|
| + self.state = states.STOPPED
|
| + self.log('Bus STOPPED')
|
| +
|
| + def start_with_callback(self, func, args=None, kwargs=None):
|
| + """Start 'func' in a new thread T, then start self (and return T)."""
|
| + if args is None:
|
| + args = ()
|
| + if kwargs is None:
|
| + kwargs = {}
|
| + args = (func,) + args
|
| +
|
| + def _callback(func, *a, **kw):
|
| + self.wait(states.STARTED)
|
| + func(*a, **kw)
|
| + t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
|
| + t.setName('Bus Callback ' + t.getName())
|
| + t.start()
|
| +
|
| + self.start()
|
| +
|
| + return t
|
| +
|
| + def log(self, msg="", level=20, traceback=False):
|
| + """Log the given message. Append the last traceback if requested."""
|
| + if traceback:
|
| + msg += "\n" + "".join(_traceback.format_exception(*sys.exc_info()))
|
| + self.publish('log', msg, level)
|
| +
|
| +bus = Bus()
|
|
|
| Property changes on: third_party/cherrypy/process/wspbus.py
|
| ___________________________________________________________________
|
| Added: svn:eol-style
|
| + LF
|
|
|
|
|