OLD | NEW |
(Empty) | |
| 1 """An implementation of the Web Site Process Bus. |
| 2 |
| 3 This module is completely standalone, depending only on the stdlib. |
| 4 |
| 5 Web Site Process Bus |
| 6 -------------------- |
| 7 |
| 8 A Bus object is used to contain and manage site-wide behavior: |
| 9 daemonization, HTTP server start/stop, process reload, signal handling, |
| 10 drop privileges, PID file management, logging for all of these, |
| 11 and many more. |
| 12 |
| 13 In addition, a Bus object provides a place for each web framework |
| 14 to register code that runs in response to site-wide events (like |
| 15 process start and stop), or which controls or otherwise interacts with |
| 16 the site-wide components mentioned above. For example, a framework which |
| 17 uses file-based templates would add known template filenames to an |
| 18 autoreload component. |
| 19 |
| 20 Ideally, a Bus object will be flexible enough to be useful in a variety |
| 21 of invocation scenarios: |
| 22 |
| 23 1. The deployer starts a site from the command line via a |
| 24 framework-neutral deployment script; applications from multiple frameworks |
| 25 are mixed in a single site. Command-line arguments and configuration |
| 26 files are used to define site-wide components such as the HTTP server, |
| 27 WSGI component graph, autoreload behavior, signal handling, etc. |
| 28 2. The deployer starts a site via some other process, such as Apache; |
| 29 applications from multiple frameworks are mixed in a single site. |
| 30 Autoreload and signal handling (from Python at least) are disabled. |
| 31 3. The deployer starts a site via a framework-specific mechanism; |
| 32 for example, when running tests, exploring tutorials, or deploying |
| 33 single applications from a single framework. The framework controls |
| 34 which site-wide components are enabled as it sees fit. |
| 35 |
| 36 The Bus object in this package uses topic-based publish-subscribe |
| 37 messaging to accomplish all this. A few topic channels are built in |
| 38 ('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and |
| 39 site containers are free to define their own. If a message is sent to a |
| 40 channel that has not been defined or has no listeners, there is no effect. |
| 41 |
| 42 In general, there should only ever be a single Bus object per process. |
| 43 Frameworks and site containers share a single Bus object by publishing |
| 44 messages and subscribing listeners. |
| 45 |
| 46 The Bus object works as a finite state machine which models the current |
| 47 state of the process. Bus methods move it from one state to another; |
| 48 those methods then publish to subscribed listeners on the channel for |
| 49 the new state.:: |
| 50 |
| 51 O |
| 52 | |
| 53 V |
| 54 STOPPING --> STOPPED --> EXITING -> X |
| 55 A A | |
| 56 | \___ | |
| 57 | \ | |
| 58 | V V |
| 59 STARTED <-- STARTING |
| 60 |
| 61 """ |
| 62 |
| 63 import atexit |
| 64 import os |
| 65 import sys |
| 66 import threading |
| 67 import time |
| 68 import traceback as _traceback |
| 69 import warnings |
| 70 |
| 71 from cherrypy._cpcompat import set |
| 72 |
| 73 # Here I save the value of os.getcwd(), which, if I am imported early enough, |
| 74 # will be the directory from which the startup script was run. This is needed |
| 75 # by _do_execv(), to change back to the original directory before execv()ing a |
| 76 # new process. This is a defense against the application having changed the |
| 77 # current working directory (which could make sys.executable "not found" if |
| 78 # sys.executable is a relative-path, and/or cause other problems). |
| 79 _startup_cwd = os.getcwd() |
| 80 |
| 81 class ChannelFailures(Exception): |
| 82 """Exception raised when errors occur in a listener during Bus.publish().""" |
| 83 delimiter = '\n' |
| 84 |
| 85 def __init__(self, *args, **kwargs): |
| 86 # Don't use 'super' here; Exceptions are old-style in Py2.4 |
| 87 # See http://www.cherrypy.org/ticket/959 |
| 88 Exception.__init__(self, *args, **kwargs) |
| 89 self._exceptions = list() |
| 90 |
| 91 def handle_exception(self): |
| 92 """Append the current exception to self.""" |
| 93 self._exceptions.append(sys.exc_info()[1]) |
| 94 |
| 95 def get_instances(self): |
| 96 """Return a list of seen exception instances.""" |
| 97 return self._exceptions[:] |
| 98 |
| 99 def __str__(self): |
| 100 exception_strings = map(repr, self.get_instances()) |
| 101 return self.delimiter.join(exception_strings) |
| 102 |
| 103 __repr__ = __str__ |
| 104 |
| 105 def __bool__(self): |
| 106 return bool(self._exceptions) |
| 107 __nonzero__ = __bool__ |
| 108 |
| 109 # Use a flag to indicate the state of the bus. |
| 110 class _StateEnum(object): |
| 111 class State(object): |
| 112 name = None |
| 113 def __repr__(self): |
| 114 return "states.%s" % self.name |
| 115 |
| 116 def __setattr__(self, key, value): |
| 117 if isinstance(value, self.State): |
| 118 value.name = key |
| 119 object.__setattr__(self, key, value) |
| 120 states = _StateEnum() |
| 121 states.STOPPED = states.State() |
| 122 states.STARTING = states.State() |
| 123 states.STARTED = states.State() |
| 124 states.STOPPING = states.State() |
| 125 states.EXITING = states.State() |
| 126 |
| 127 |
| 128 try: |
| 129 import fcntl |
| 130 except ImportError: |
| 131 max_files = 0 |
| 132 else: |
| 133 try: |
| 134 max_files = os.sysconf('SC_OPEN_MAX') |
| 135 except AttributeError: |
| 136 max_files = 1024 |
| 137 |
| 138 |
| 139 class Bus(object): |
| 140 """Process state-machine and messenger for HTTP site deployment. |
| 141 |
| 142 All listeners for a given channel are guaranteed to be called even |
| 143 if others at the same channel fail. Each failure is logged, but |
| 144 execution proceeds on to the next listener. The only way to stop all |
| 145 processing from inside a listener is to raise SystemExit and stop the |
| 146 whole server. |
| 147 """ |
| 148 |
| 149 states = states |
| 150 state = states.STOPPED |
| 151 execv = False |
| 152 max_cloexec_files = max_files |
| 153 |
| 154 def __init__(self): |
| 155 self.execv = False |
| 156 self.state = states.STOPPED |
| 157 self.listeners = dict( |
| 158 [(channel, set()) for channel |
| 159 in ('start', 'stop', 'exit', 'graceful', 'log', 'main')]) |
| 160 self._priorities = {} |
| 161 |
| 162 def subscribe(self, channel, callback, priority=None): |
| 163 """Add the given callback at the given channel (if not present).""" |
| 164 if channel not in self.listeners: |
| 165 self.listeners[channel] = set() |
| 166 self.listeners[channel].add(callback) |
| 167 |
| 168 if priority is None: |
| 169 priority = getattr(callback, 'priority', 50) |
| 170 self._priorities[(channel, callback)] = priority |
| 171 |
| 172 def unsubscribe(self, channel, callback): |
| 173 """Discard the given callback (if present).""" |
| 174 listeners = self.listeners.get(channel) |
| 175 if listeners and callback in listeners: |
| 176 listeners.discard(callback) |
| 177 del self._priorities[(channel, callback)] |
| 178 |
| 179 def publish(self, channel, *args, **kwargs): |
| 180 """Return output of all subscribers for the given channel.""" |
| 181 if channel not in self.listeners: |
| 182 return [] |
| 183 |
| 184 exc = ChannelFailures() |
| 185 output = [] |
| 186 |
| 187 items = [(self._priorities[(channel, listener)], listener) |
| 188 for listener in self.listeners[channel]] |
| 189 try: |
| 190 items.sort(key=lambda item: item[0]) |
| 191 except TypeError: |
| 192 # Python 2.3 had no 'key' arg, but that doesn't matter |
| 193 # since it could sort dissimilar types just fine. |
| 194 items.sort() |
| 195 for priority, listener in items: |
| 196 try: |
| 197 output.append(listener(*args, **kwargs)) |
| 198 except KeyboardInterrupt: |
| 199 raise |
| 200 except SystemExit: |
| 201 e = sys.exc_info()[1] |
| 202 # If we have previous errors ensure the exit code is non-zero |
| 203 if exc and e.code == 0: |
| 204 e.code = 1 |
| 205 raise |
| 206 except: |
| 207 exc.handle_exception() |
| 208 if channel == 'log': |
| 209 # Assume any further messages to 'log' will fail. |
| 210 pass |
| 211 else: |
| 212 self.log("Error in %r listener %r" % (channel, listener), |
| 213 level=40, traceback=True) |
| 214 if exc: |
| 215 raise exc |
| 216 return output |
| 217 |
| 218 def _clean_exit(self): |
| 219 """An atexit handler which asserts the Bus is not running.""" |
| 220 if self.state != states.EXITING: |
| 221 warnings.warn( |
| 222 "The main thread is exiting, but the Bus is in the %r state; " |
| 223 "shutting it down automatically now. You must either call " |
| 224 "bus.block() after start(), or call bus.exit() before the " |
| 225 "main thread exits." % self.state, RuntimeWarning) |
| 226 self.exit() |
| 227 |
| 228 def start(self): |
| 229 """Start all services.""" |
| 230 atexit.register(self._clean_exit) |
| 231 |
| 232 self.state = states.STARTING |
| 233 self.log('Bus STARTING') |
| 234 try: |
| 235 self.publish('start') |
| 236 self.state = states.STARTED |
| 237 self.log('Bus STARTED') |
| 238 except (KeyboardInterrupt, SystemExit): |
| 239 raise |
| 240 except: |
| 241 self.log("Shutting down due to error in start listener:", |
| 242 level=40, traceback=True) |
| 243 e_info = sys.exc_info()[1] |
| 244 try: |
| 245 self.exit() |
| 246 except: |
| 247 # Any stop/exit errors will be logged inside publish(). |
| 248 pass |
| 249 # Re-raise the original error |
| 250 raise e_info |
| 251 |
| 252 def exit(self): |
| 253 """Stop all services and prepare to exit the process.""" |
| 254 exitstate = self.state |
| 255 try: |
| 256 self.stop() |
| 257 |
| 258 self.state = states.EXITING |
| 259 self.log('Bus EXITING') |
| 260 self.publish('exit') |
| 261 # This isn't strictly necessary, but it's better than seeing |
| 262 # "Waiting for child threads to terminate..." and then nothing. |
| 263 self.log('Bus EXITED') |
| 264 except: |
| 265 # This method is often called asynchronously (whether thread, |
| 266 # signal handler, console handler, or atexit handler), so we |
| 267 # can't just let exceptions propagate out unhandled. |
| 268 # Assume it's been logged and just die. |
| 269 os._exit(70) # EX_SOFTWARE |
| 270 |
| 271 if exitstate == states.STARTING: |
| 272 # exit() was called before start() finished, possibly due to |
| 273 # Ctrl-C because a start listener got stuck. In this case, |
| 274 # we could get stuck in a loop where Ctrl-C never exits the |
| 275 # process, so we just call os.exit here. |
| 276 os._exit(70) # EX_SOFTWARE |
| 277 |
| 278 def restart(self): |
| 279 """Restart the process (may close connections). |
| 280 |
| 281 This method does not restart the process from the calling thread; |
| 282 instead, it stops the bus and asks the main thread to call execv. |
| 283 """ |
| 284 self.execv = True |
| 285 self.exit() |
| 286 |
| 287 def graceful(self): |
| 288 """Advise all services to reload.""" |
| 289 self.log('Bus graceful') |
| 290 self.publish('graceful') |
| 291 |
| 292 def block(self, interval=0.1): |
| 293 """Wait for the EXITING state, KeyboardInterrupt or SystemExit. |
| 294 |
| 295 This function is intended to be called only by the main thread. |
| 296 After waiting for the EXITING state, it also waits for all threads |
| 297 to terminate, and then calls os.execv if self.execv is True. This |
| 298 design allows another thread to call bus.restart, yet have the main |
| 299 thread perform the actual execv call (required on some platforms). |
| 300 """ |
| 301 try: |
| 302 self.wait(states.EXITING, interval=interval, channel='main') |
| 303 except (KeyboardInterrupt, IOError): |
| 304 # The time.sleep call might raise |
| 305 # "IOError: [Errno 4] Interrupted function call" on KBInt. |
| 306 self.log('Keyboard Interrupt: shutting down bus') |
| 307 self.exit() |
| 308 except SystemExit: |
| 309 self.log('SystemExit raised: shutting down bus') |
| 310 self.exit() |
| 311 raise |
| 312 |
| 313 # Waiting for ALL child threads to finish is necessary on OS X. |
| 314 # See http://www.cherrypy.org/ticket/581. |
| 315 # It's also good to let them all shut down before allowing |
| 316 # the main thread to call atexit handlers. |
| 317 # See http://www.cherrypy.org/ticket/751. |
| 318 self.log("Waiting for child threads to terminate...") |
| 319 for t in threading.enumerate(): |
| 320 if t != threading.currentThread() and t.isAlive(): |
| 321 # Note that any dummy (external) threads are always daemonic. |
| 322 if hasattr(threading.Thread, "daemon"): |
| 323 # Python 2.6+ |
| 324 d = t.daemon |
| 325 else: |
| 326 d = t.isDaemon() |
| 327 if not d: |
| 328 self.log("Waiting for thread %s." % t.getName()) |
| 329 t.join() |
| 330 |
| 331 if self.execv: |
| 332 self._do_execv() |
| 333 |
| 334 def wait(self, state, interval=0.1, channel=None): |
| 335 """Poll for the given state(s) at intervals; publish to channel.""" |
| 336 if isinstance(state, (tuple, list)): |
| 337 states = state |
| 338 else: |
| 339 states = [state] |
| 340 |
| 341 def _wait(): |
| 342 while self.state not in states: |
| 343 time.sleep(interval) |
| 344 self.publish(channel) |
| 345 |
| 346 # From http://psyco.sourceforge.net/psycoguide/bugs.html: |
| 347 # "The compiled machine code does not include the regular polling |
| 348 # done by Python, meaning that a KeyboardInterrupt will not be |
| 349 # detected before execution comes back to the regular Python |
| 350 # interpreter. Your program cannot be interrupted if caught |
| 351 # into an infinite Psyco-compiled loop." |
| 352 try: |
| 353 sys.modules['psyco'].cannotcompile(_wait) |
| 354 except (KeyError, AttributeError): |
| 355 pass |
| 356 |
| 357 _wait() |
| 358 |
| 359 def _do_execv(self): |
| 360 """Re-execute the current process. |
| 361 |
| 362 This must be called from the main thread, because certain platforms |
| 363 (OS X) don't allow execv to be called in a child thread very well. |
| 364 """ |
| 365 args = sys.argv[:] |
| 366 self.log('Re-spawning %s' % ' '.join(args)) |
| 367 |
| 368 if sys.platform[:4] == 'java': |
| 369 from _systemrestart import SystemRestart |
| 370 raise SystemRestart |
| 371 else: |
| 372 args.insert(0, sys.executable) |
| 373 if sys.platform == 'win32': |
| 374 args = ['"%s"' % arg for arg in args] |
| 375 |
| 376 os.chdir(_startup_cwd) |
| 377 if self.max_cloexec_files: |
| 378 self._set_cloexec() |
| 379 os.execv(sys.executable, args) |
| 380 |
| 381 def _set_cloexec(self): |
| 382 """Set the CLOEXEC flag on all open files (except stdin/out/err). |
| 383 |
| 384 If self.max_cloexec_files is an integer (the default), then on |
| 385 platforms which support it, it represents the max open files setting |
| 386 for the operating system. This function will be called just before |
| 387 the process is restarted via os.execv() to prevent open files |
| 388 from persisting into the new process. |
| 389 |
| 390 Set self.max_cloexec_files to 0 to disable this behavior. |
| 391 """ |
| 392 for fd in range(3, self.max_cloexec_files): # skip stdin/out/err |
| 393 try: |
| 394 flags = fcntl.fcntl(fd, fcntl.F_GETFD) |
| 395 except IOError: |
| 396 continue |
| 397 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) |
| 398 |
| 399 def stop(self): |
| 400 """Stop all services.""" |
| 401 self.state = states.STOPPING |
| 402 self.log('Bus STOPPING') |
| 403 self.publish('stop') |
| 404 self.state = states.STOPPED |
| 405 self.log('Bus STOPPED') |
| 406 |
| 407 def start_with_callback(self, func, args=None, kwargs=None): |
| 408 """Start 'func' in a new thread T, then start self (and return T).""" |
| 409 if args is None: |
| 410 args = () |
| 411 if kwargs is None: |
| 412 kwargs = {} |
| 413 args = (func,) + args |
| 414 |
| 415 def _callback(func, *a, **kw): |
| 416 self.wait(states.STARTED) |
| 417 func(*a, **kw) |
| 418 t = threading.Thread(target=_callback, args=args, kwargs=kwargs) |
| 419 t.setName('Bus Callback ' + t.getName()) |
| 420 t.start() |
| 421 |
| 422 self.start() |
| 423 |
| 424 return t |
| 425 |
| 426 def log(self, msg="", level=20, traceback=False): |
| 427 """Log the given message. Append the last traceback if requested.""" |
| 428 if traceback: |
| 429 msg += "\n" + "".join(_traceback.format_exception(*sys.exc_info())) |
| 430 self.publish('log', msg, level) |
| 431 |
| 432 bus = Bus() |
OLD | NEW |