Index: recipe_engine/stream.py |
diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py |
index ce98d2079e88cc4d5f1a3ad443221e681992d483..4be99e2fcd754a39ca5e9c15051cdff511aaa3bb 100644 |
--- a/recipe_engine/stream.py |
+++ b/recipe_engine/stream.py |
@@ -22,6 +22,7 @@ import time |
from . import env |
from . import recipe_api |
+from . import util |
class StreamEngine(object): |
@@ -104,10 +105,18 @@ class StreamEngine(object): |
self.close() |
-# Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
-# form products. This code is entirely mechanical from the types (if we |
-# had them formalized...). |
class ProductStreamEngine(StreamEngine): |
+ """A StreamEngine that forms the non-commutative product of two other |
+ StreamEngines. |
+ |
+ Because StreamEngine has no observations (i.e. it is an F-Algebra), we can |
+ form products. This code is entirely mechanical from the types (if we |
+ had them formalized...). |
+ |
+ This product is non-commutative, meaning order matters. Specifically, an |
+ exception in "engine_a" will prevent "engine_b" from being evaluated. |
+ """ |
+ |
def __init__(self, engine_a, engine_b): |
assert engine_a and engine_b |
self._engine_a = engine_a |
@@ -128,6 +137,7 @@ class ProductStreamEngine(StreamEngine): |
self._stream_b.close() |
class StepStream(Stream): |
+ # pylint: disable=no-self-argument |
def _void_product(method_name): |
def inner(self, *args): |
getattr(self._stream_a, method_name)(*args) |
@@ -161,7 +171,81 @@ class ProductStreamEngine(StreamEngine): |
self._engine_b.close() |
-def _noop(*args, **kwargs): |
+class MultiStreamEngine(StreamEngine): |
+ """A StreamEngine consisting of one or more inner StreamEngines. |
+ |
+ A call to this StreamEngine will be distributed to the inner StreamEngines. |
+ Any exceptions that are caught during an inner handler will be deferred until |
+ all inner handlers have been executed. |
+ """ |
+ |
+ def __init__(self, base, *engines): |
+ self._engines = (base,) + engines |
+ assert None not in self._engines |
+ |
+ @classmethod |
+ def create(cls, *engines): |
+ assert len(engines) > 0, 'At least one engine must be provided.' |
+ if len(engines) == 1: |
+ return engines[0] |
+ return cls(engines[0], *engines[1:]) |
+ |
+ class Stream(StreamEngine.Stream): |
+ def __init__(self, *streams): |
+ assert all(streams) |
+ self._streams = streams |
+ |
+ def write_line(self, line): |
+ util.map_defer_exceptions(lambda s: s.write_line(line), self._streams) |
+ |
+ def close(self): |
+ util.map_defer_exceptions(lambda s: s.close(), self._streams) |
+ |
+ class StepStream(Stream): |
+ # pylint: disable=no-self-argument |
+ def _multiplex(method_name): |
+ def inner(self, *args): |
+ util.map_defer_exceptions(lambda s: getattr(s, method_name)(*args), |
+ self._streams) |
+ return inner |
+ |
+ def new_log_stream(self, log_name): |
+ log_streams = [] |
+ try: |
+ for s in self._streams: |
+ log_streams.append(s.new_log_stream(log_name)) |
+ return MultiStreamEngine.Stream(*log_streams) |
+ except Exception: |
+ # Close any opened log streams. |
+ util.map_defer_exceptions(lambda ls: ls.close(), log_streams) |
+ raise |
+ |
+ add_step_text = _multiplex('add_step_text') |
+ add_step_summary_text = _multiplex('add_step_summary_text') |
+ add_step_link = _multiplex('add_step_link') |
+ reset_subannotation_state = _multiplex('reset_subannotation_state') |
+ set_step_status = _multiplex('set_step_status') |
+ set_build_property = _multiplex('set_build_property') |
+ trigger = _multiplex('trigger') |
+ |
+ def new_step_stream(self, step_config): |
+ return self.StepStream( |
+ *(se.new_step_stream(step_config) |
+ for se in self._engines)) |
+ |
+ def open(self): |
+ for se in self._engines: |
+ se.open() |
+ |
+ def close(self): |
+ util.map_defer_exceptions(lambda se: se.close(), self._engines) |
+ |
+ def append_stream_engine(self, se): |
+ assert isinstance(se, StreamEngine) |
+ self._engines.append(se) |
+ |
+ |
+def _noop(*_args, **_kwargs): |
pass |
class NoopStreamEngine(StreamEngine): |
@@ -170,7 +254,7 @@ class NoopStreamEngine(StreamEngine): |
close = _noop |
class StepStream(Stream): |
- def new_log_stream(self, log_name): |
+ def new_log_stream(self, _log_name): |
return NoopStreamEngine.Stream() |
add_step_text = _noop |
add_step_summary_text = _noop |
@@ -193,8 +277,15 @@ class StreamEngineInvariants(StreamEngine): |
def __init__(self): |
self._streams = set() |
+ @classmethod |
+ def wrap(cls, other): |
+ """Returns (ProductStreamEngine): A product applying invariants to "other". |
+ """ |
+ return ProductStreamEngine(cls(), other) |
+ |
class StepStream(StreamEngine.StepStream): |
def __init__(self, engine, step_name): |
+ super(StreamEngineInvariants.StepStream, self).__init__() |
self._engine = engine |
self._step_name = step_name |
self._open = True |
@@ -315,7 +406,7 @@ class AnnotatorStreamEngine(StreamEngine): |
class StepStream(StreamEngine.StepStream): |
def __init__(self, engine, outstream, step_name): |
- super(StreamEngine.StepStream, self).__init__() |
+ super(AnnotatorStreamEngine.StepStream, self).__init__() |
self._engine = engine |
self._outstream = outstream |