Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: third_party/cherrypy/process/wspbus.py

Issue 9368042: Add CherryPy to third_party. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build/
Patch Set: '' Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « third_party/cherrypy/process/win32.py ('k') | third_party/cherrypy/scaffold/__init__.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 """An implementation of the Web Site Process Bus.
2
3 This module is completely standalone, depending only on the stdlib.
4
5 Web Site Process Bus
6 --------------------
7
8 A Bus object is used to contain and manage site-wide behavior:
9 daemonization, HTTP server start/stop, process reload, signal handling,
10 drop privileges, PID file management, logging for all of these,
11 and many more.
12
13 In addition, a Bus object provides a place for each web framework
14 to register code that runs in response to site-wide events (like
15 process start and stop), or which controls or otherwise interacts with
16 the site-wide components mentioned above. For example, a framework which
17 uses file-based templates would add known template filenames to an
18 autoreload component.
19
20 Ideally, a Bus object will be flexible enough to be useful in a variety
21 of invocation scenarios:
22
23 1. The deployer starts a site from the command line via a
24 framework-neutral deployment script; applications from multiple frameworks
25 are mixed in a single site. Command-line arguments and configuration
26 files are used to define site-wide components such as the HTTP server,
27 WSGI component graph, autoreload behavior, signal handling, etc.
28 2. The deployer starts a site via some other process, such as Apache;
29 applications from multiple frameworks are mixed in a single site.
30 Autoreload and signal handling (from Python at least) are disabled.
31 3. The deployer starts a site via a framework-specific mechanism;
32 for example, when running tests, exploring tutorials, or deploying
33 single applications from a single framework. The framework controls
34 which site-wide components are enabled as it sees fit.
35
36 The Bus object in this package uses topic-based publish-subscribe
37 messaging to accomplish all this. A few topic channels are built in
38 ('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
39 site containers are free to define their own. If a message is sent to a
40 channel that has not been defined or has no listeners, there is no effect.
41
42 In general, there should only ever be a single Bus object per process.
43 Frameworks and site containers share a single Bus object by publishing
44 messages and subscribing listeners.
45
46 The Bus object works as a finite state machine which models the current
47 state of the process. Bus methods move it from one state to another;
48 those methods then publish to subscribed listeners on the channel for
49 the new state.::
50
51 O
52 |
53 V
54 STOPPING --> STOPPED --> EXITING -> X
55 A A |
56 | \___ |
57 | \ |
58 | V V
59 STARTED <-- STARTING
60
61 """
62
63 import atexit
64 import os
65 import sys
66 import threading
67 import time
68 import traceback as _traceback
69 import warnings
70
71 from cherrypy._cpcompat import set
72
73 # Here I save the value of os.getcwd(), which, if I am imported early enough,
74 # will be the directory from which the startup script was run. This is needed
75 # by _do_execv(), to change back to the original directory before execv()ing a
76 # new process. This is a defense against the application having changed the
77 # current working directory (which could make sys.executable "not found" if
78 # sys.executable is a relative-path, and/or cause other problems).
79 _startup_cwd = os.getcwd()
80
81 class ChannelFailures(Exception):
82 """Exception raised when errors occur in a listener during Bus.publish()."""
83 delimiter = '\n'
84
85 def __init__(self, *args, **kwargs):
86 # Don't use 'super' here; Exceptions are old-style in Py2.4
87 # See http://www.cherrypy.org/ticket/959
88 Exception.__init__(self, *args, **kwargs)
89 self._exceptions = list()
90
91 def handle_exception(self):
92 """Append the current exception to self."""
93 self._exceptions.append(sys.exc_info()[1])
94
95 def get_instances(self):
96 """Return a list of seen exception instances."""
97 return self._exceptions[:]
98
99 def __str__(self):
100 exception_strings = map(repr, self.get_instances())
101 return self.delimiter.join(exception_strings)
102
103 __repr__ = __str__
104
105 def __bool__(self):
106 return bool(self._exceptions)
107 __nonzero__ = __bool__
108
109 # Use a flag to indicate the state of the bus.
110 class _StateEnum(object):
111 class State(object):
112 name = None
113 def __repr__(self):
114 return "states.%s" % self.name
115
116 def __setattr__(self, key, value):
117 if isinstance(value, self.State):
118 value.name = key
119 object.__setattr__(self, key, value)
120 states = _StateEnum()
121 states.STOPPED = states.State()
122 states.STARTING = states.State()
123 states.STARTED = states.State()
124 states.STOPPING = states.State()
125 states.EXITING = states.State()
126
127
128 try:
129 import fcntl
130 except ImportError:
131 max_files = 0
132 else:
133 try:
134 max_files = os.sysconf('SC_OPEN_MAX')
135 except AttributeError:
136 max_files = 1024
137
138
139 class Bus(object):
140 """Process state-machine and messenger for HTTP site deployment.
141
142 All listeners for a given channel are guaranteed to be called even
143 if others at the same channel fail. Each failure is logged, but
144 execution proceeds on to the next listener. The only way to stop all
145 processing from inside a listener is to raise SystemExit and stop the
146 whole server.
147 """
148
149 states = states
150 state = states.STOPPED
151 execv = False
152 max_cloexec_files = max_files
153
154 def __init__(self):
155 self.execv = False
156 self.state = states.STOPPED
157 self.listeners = dict(
158 [(channel, set()) for channel
159 in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
160 self._priorities = {}
161
162 def subscribe(self, channel, callback, priority=None):
163 """Add the given callback at the given channel (if not present)."""
164 if channel not in self.listeners:
165 self.listeners[channel] = set()
166 self.listeners[channel].add(callback)
167
168 if priority is None:
169 priority = getattr(callback, 'priority', 50)
170 self._priorities[(channel, callback)] = priority
171
172 def unsubscribe(self, channel, callback):
173 """Discard the given callback (if present)."""
174 listeners = self.listeners.get(channel)
175 if listeners and callback in listeners:
176 listeners.discard(callback)
177 del self._priorities[(channel, callback)]
178
179 def publish(self, channel, *args, **kwargs):
180 """Return output of all subscribers for the given channel."""
181 if channel not in self.listeners:
182 return []
183
184 exc = ChannelFailures()
185 output = []
186
187 items = [(self._priorities[(channel, listener)], listener)
188 for listener in self.listeners[channel]]
189 try:
190 items.sort(key=lambda item: item[0])
191 except TypeError:
192 # Python 2.3 had no 'key' arg, but that doesn't matter
193 # since it could sort dissimilar types just fine.
194 items.sort()
195 for priority, listener in items:
196 try:
197 output.append(listener(*args, **kwargs))
198 except KeyboardInterrupt:
199 raise
200 except SystemExit:
201 e = sys.exc_info()[1]
202 # If we have previous errors ensure the exit code is non-zero
203 if exc and e.code == 0:
204 e.code = 1
205 raise
206 except:
207 exc.handle_exception()
208 if channel == 'log':
209 # Assume any further messages to 'log' will fail.
210 pass
211 else:
212 self.log("Error in %r listener %r" % (channel, listener),
213 level=40, traceback=True)
214 if exc:
215 raise exc
216 return output
217
218 def _clean_exit(self):
219 """An atexit handler which asserts the Bus is not running."""
220 if self.state != states.EXITING:
221 warnings.warn(
222 "The main thread is exiting, but the Bus is in the %r state; "
223 "shutting it down automatically now. You must either call "
224 "bus.block() after start(), or call bus.exit() before the "
225 "main thread exits." % self.state, RuntimeWarning)
226 self.exit()
227
228 def start(self):
229 """Start all services."""
230 atexit.register(self._clean_exit)
231
232 self.state = states.STARTING
233 self.log('Bus STARTING')
234 try:
235 self.publish('start')
236 self.state = states.STARTED
237 self.log('Bus STARTED')
238 except (KeyboardInterrupt, SystemExit):
239 raise
240 except:
241 self.log("Shutting down due to error in start listener:",
242 level=40, traceback=True)
243 e_info = sys.exc_info()[1]
244 try:
245 self.exit()
246 except:
247 # Any stop/exit errors will be logged inside publish().
248 pass
249 # Re-raise the original error
250 raise e_info
251
252 def exit(self):
253 """Stop all services and prepare to exit the process."""
254 exitstate = self.state
255 try:
256 self.stop()
257
258 self.state = states.EXITING
259 self.log('Bus EXITING')
260 self.publish('exit')
261 # This isn't strictly necessary, but it's better than seeing
262 # "Waiting for child threads to terminate..." and then nothing.
263 self.log('Bus EXITED')
264 except:
265 # This method is often called asynchronously (whether thread,
266 # signal handler, console handler, or atexit handler), so we
267 # can't just let exceptions propagate out unhandled.
268 # Assume it's been logged and just die.
269 os._exit(70) # EX_SOFTWARE
270
271 if exitstate == states.STARTING:
272 # exit() was called before start() finished, possibly due to
273 # Ctrl-C because a start listener got stuck. In this case,
274 # we could get stuck in a loop where Ctrl-C never exits the
275 # process, so we just call os.exit here.
276 os._exit(70) # EX_SOFTWARE
277
278 def restart(self):
279 """Restart the process (may close connections).
280
281 This method does not restart the process from the calling thread;
282 instead, it stops the bus and asks the main thread to call execv.
283 """
284 self.execv = True
285 self.exit()
286
287 def graceful(self):
288 """Advise all services to reload."""
289 self.log('Bus graceful')
290 self.publish('graceful')
291
292 def block(self, interval=0.1):
293 """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
294
295 This function is intended to be called only by the main thread.
296 After waiting for the EXITING state, it also waits for all threads
297 to terminate, and then calls os.execv if self.execv is True. This
298 design allows another thread to call bus.restart, yet have the main
299 thread perform the actual execv call (required on some platforms).
300 """
301 try:
302 self.wait(states.EXITING, interval=interval, channel='main')
303 except (KeyboardInterrupt, IOError):
304 # The time.sleep call might raise
305 # "IOError: [Errno 4] Interrupted function call" on KBInt.
306 self.log('Keyboard Interrupt: shutting down bus')
307 self.exit()
308 except SystemExit:
309 self.log('SystemExit raised: shutting down bus')
310 self.exit()
311 raise
312
313 # Waiting for ALL child threads to finish is necessary on OS X.
314 # See http://www.cherrypy.org/ticket/581.
315 # It's also good to let them all shut down before allowing
316 # the main thread to call atexit handlers.
317 # See http://www.cherrypy.org/ticket/751.
318 self.log("Waiting for child threads to terminate...")
319 for t in threading.enumerate():
320 if t != threading.currentThread() and t.isAlive():
321 # Note that any dummy (external) threads are always daemonic.
322 if hasattr(threading.Thread, "daemon"):
323 # Python 2.6+
324 d = t.daemon
325 else:
326 d = t.isDaemon()
327 if not d:
328 self.log("Waiting for thread %s." % t.getName())
329 t.join()
330
331 if self.execv:
332 self._do_execv()
333
334 def wait(self, state, interval=0.1, channel=None):
335 """Poll for the given state(s) at intervals; publish to channel."""
336 if isinstance(state, (tuple, list)):
337 states = state
338 else:
339 states = [state]
340
341 def _wait():
342 while self.state not in states:
343 time.sleep(interval)
344 self.publish(channel)
345
346 # From http://psyco.sourceforge.net/psycoguide/bugs.html:
347 # "The compiled machine code does not include the regular polling
348 # done by Python, meaning that a KeyboardInterrupt will not be
349 # detected before execution comes back to the regular Python
350 # interpreter. Your program cannot be interrupted if caught
351 # into an infinite Psyco-compiled loop."
352 try:
353 sys.modules['psyco'].cannotcompile(_wait)
354 except (KeyError, AttributeError):
355 pass
356
357 _wait()
358
359 def _do_execv(self):
360 """Re-execute the current process.
361
362 This must be called from the main thread, because certain platforms
363 (OS X) don't allow execv to be called in a child thread very well.
364 """
365 args = sys.argv[:]
366 self.log('Re-spawning %s' % ' '.join(args))
367
368 if sys.platform[:4] == 'java':
369 from _systemrestart import SystemRestart
370 raise SystemRestart
371 else:
372 args.insert(0, sys.executable)
373 if sys.platform == 'win32':
374 args = ['"%s"' % arg for arg in args]
375
376 os.chdir(_startup_cwd)
377 if self.max_cloexec_files:
378 self._set_cloexec()
379 os.execv(sys.executable, args)
380
381 def _set_cloexec(self):
382 """Set the CLOEXEC flag on all open files (except stdin/out/err).
383
384 If self.max_cloexec_files is an integer (the default), then on
385 platforms which support it, it represents the max open files setting
386 for the operating system. This function will be called just before
387 the process is restarted via os.execv() to prevent open files
388 from persisting into the new process.
389
390 Set self.max_cloexec_files to 0 to disable this behavior.
391 """
392 for fd in range(3, self.max_cloexec_files): # skip stdin/out/err
393 try:
394 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
395 except IOError:
396 continue
397 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
398
399 def stop(self):
400 """Stop all services."""
401 self.state = states.STOPPING
402 self.log('Bus STOPPING')
403 self.publish('stop')
404 self.state = states.STOPPED
405 self.log('Bus STOPPED')
406
407 def start_with_callback(self, func, args=None, kwargs=None):
408 """Start 'func' in a new thread T, then start self (and return T)."""
409 if args is None:
410 args = ()
411 if kwargs is None:
412 kwargs = {}
413 args = (func,) + args
414
415 def _callback(func, *a, **kw):
416 self.wait(states.STARTED)
417 func(*a, **kw)
418 t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
419 t.setName('Bus Callback ' + t.getName())
420 t.start()
421
422 self.start()
423
424 return t
425
426 def log(self, msg="", level=20, traceback=False):
427 """Log the given message. Append the last traceback if requested."""
428 if traceback:
429 msg += "\n" + "".join(_traceback.format_exception(*sys.exc_info()))
430 self.publish('log', msg, level)
431
432 bus = Bus()
OLDNEW
« no previous file with comments | « third_party/cherrypy/process/win32.py ('k') | third_party/cherrypy/scaffold/__init__.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698