Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(146)

Unified Diff: recipe_engine/stream.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Stronger flush meta logic, moar test. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « recipe_engine/simulation_test.py ('k') | recipe_engine/stream_logdog.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: recipe_engine/stream.py
diff --git a/recipe_engine/stream.py b/recipe_engine/stream.py
index ce98d2079e88cc4d5f1a3ad443221e681992d483..4be99e2fcd754a39ca5e9c15051cdff511aaa3bb 100644
--- a/recipe_engine/stream.py
+++ b/recipe_engine/stream.py
@@ -22,6 +22,7 @@ import time
from . import env
from . import recipe_api
+from . import util
class StreamEngine(object):
@@ -104,10 +105,18 @@ class StreamEngine(object):
self.close()
-# Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
-# form products. This code is entirely mechanical from the types (if we
-# had them formalized...).
class ProductStreamEngine(StreamEngine):
+ """A StreamEngine that forms the non-commutative product of two other
+ StreamEngines.
+
+ Because StreamEngine has no observations (i.e. it is an F-Algebra), we can
+ form products. This code is entirely mechanical from the types (if we
+ had them formalized...).
+
+ This product is non-commutative, meaning order matters. Specifically, an
+ exception in "engine_a" will prevent "engine_b" from being evaluated.
+ """
+
def __init__(self, engine_a, engine_b):
assert engine_a and engine_b
self._engine_a = engine_a
@@ -128,6 +137,7 @@ class ProductStreamEngine(StreamEngine):
self._stream_b.close()
class StepStream(Stream):
+ # pylint: disable=no-self-argument
def _void_product(method_name):
def inner(self, *args):
getattr(self._stream_a, method_name)(*args)
@@ -161,7 +171,81 @@ class ProductStreamEngine(StreamEngine):
self._engine_b.close()
-def _noop(*args, **kwargs):
+class MultiStreamEngine(StreamEngine):
+ """A StreamEngine consisting of one or more inner StreamEngines.
+
+ A call to this StreamEngine will be distributed to the inner StreamEngines.
+ Any exceptions that are caught during an inner handler will be deferred until
+ all inner handlers have been executed.
+ """
+
+ def __init__(self, base, *engines):
+ self._engines = (base,) + engines
+ assert None not in self._engines
+
+ @classmethod
+ def create(cls, *engines):
+ assert len(engines) > 0, 'At least one engine must be provided.'
+ if len(engines) == 1:
+ return engines[0]
+ return cls(engines[0], *engines[1:])
+
+ class Stream(StreamEngine.Stream):
+ def __init__(self, *streams):
+ assert all(streams)
+ self._streams = streams
+
+ def write_line(self, line):
+ util.map_defer_exceptions(lambda s: s.write_line(line), self._streams)
+
+ def close(self):
+ util.map_defer_exceptions(lambda s: s.close(), self._streams)
+
+ class StepStream(Stream):
+ # pylint: disable=no-self-argument
+ def _multiplex(method_name):
+ def inner(self, *args):
+ util.map_defer_exceptions(lambda s: getattr(s, method_name)(*args),
+ self._streams)
+ return inner
+
+ def new_log_stream(self, log_name):
+ log_streams = []
+ try:
+ for s in self._streams:
+ log_streams.append(s.new_log_stream(log_name))
+ return MultiStreamEngine.Stream(*log_streams)
+ except Exception:
+ # Close any opened log streams.
+ util.map_defer_exceptions(lambda ls: ls.close(), log_streams)
+ raise
+
+ add_step_text = _multiplex('add_step_text')
+ add_step_summary_text = _multiplex('add_step_summary_text')
+ add_step_link = _multiplex('add_step_link')
+ reset_subannotation_state = _multiplex('reset_subannotation_state')
+ set_step_status = _multiplex('set_step_status')
+ set_build_property = _multiplex('set_build_property')
+ trigger = _multiplex('trigger')
+
+ def new_step_stream(self, step_config):
+ return self.StepStream(
+ *(se.new_step_stream(step_config)
+ for se in self._engines))
+
+ def open(self):
+ for se in self._engines:
+ se.open()
+
+ def close(self):
+ util.map_defer_exceptions(lambda se: se.close(), self._engines)
+
+ def append_stream_engine(self, se):
+ assert isinstance(se, StreamEngine)
+ self._engines.append(se)
+
+
+def _noop(*_args, **_kwargs):
pass
class NoopStreamEngine(StreamEngine):
@@ -170,7 +254,7 @@ class NoopStreamEngine(StreamEngine):
close = _noop
class StepStream(Stream):
- def new_log_stream(self, log_name):
+ def new_log_stream(self, _log_name):
return NoopStreamEngine.Stream()
add_step_text = _noop
add_step_summary_text = _noop
@@ -193,8 +277,15 @@ class StreamEngineInvariants(StreamEngine):
def __init__(self):
self._streams = set()
+ @classmethod
+ def wrap(cls, other):
+ """Returns (ProductStreamEngine): A product applying invariants to "other".
+ """
+ return ProductStreamEngine(cls(), other)
+
class StepStream(StreamEngine.StepStream):
def __init__(self, engine, step_name):
+ super(StreamEngineInvariants.StepStream, self).__init__()
self._engine = engine
self._step_name = step_name
self._open = True
@@ -315,7 +406,7 @@ class AnnotatorStreamEngine(StreamEngine):
class StepStream(StreamEngine.StepStream):
def __init__(self, engine, outstream, step_name):
- super(StreamEngine.StepStream, self).__init__()
+ super(AnnotatorStreamEngine.StepStream, self).__init__()
self._engine = engine
self._outstream = outstream
« no previous file with comments | « recipe_engine/simulation_test.py ('k') | recipe_engine/stream_logdog.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698