Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(160)

Side by Side Diff: recipe_engine/stream_logdog.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Stronger flush meta logic, moar test. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """stream.StreamEngine implementation for LogDog, using Milo annotation
6 protobuf.
7 """
8
9 import collections
10 import contextlib
11 import copy
12 import datetime
13 import functools
14 import itertools
15 import os
16 import threading
17 import sys
18
19 from . import env
20 from . import stream
21 from . import util
22
23 import google.protobuf.message
24 import google.protobuf.timestamp_pb2 as timestamp_pb2
25 import libs.logdog.bootstrap
26 import libs.logdog.stream
27 import libs.logdog.streamname
28 import annotations_pb2 as pb
29
30
31 # The datetime for the epoch.
32 _EPOCH = datetime.datetime.utcfromtimestamp(0)
33
34 # The annotation stream ContentType.
35 #
36 # This must match the ContentType for the annotation binary protobuf, which
37 # is specified in "<luci-go>/common/proto/milo/util.go".
38 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2'
39
40
41 class _Environment(object):
42 """Simulated system environment. The StreamEngine uses this to probe for
43 system parameters. By default, the environment will be derived from the
44 actual system.
45 """
46
47 def __init__(self, now_fn, argv, environ, cwd):
48 self._now_fn = now_fn
49 self.argv = argv
50 self.environ = environ
51 self.cwd = cwd
52
53 @property
54 def now(self):
55 return self._now_fn()
56
57 @classmethod
58 def real(cls):
59 """Returns (_Environment): An _Environment bound to the real system"""
60 return cls(
61 now_fn=datetime.datetime.now,
62 argv=sys.argv[:],
63 environ=dict(os.environ),
64 cwd=os.getcwd(),
65 )
66
67
68 class _StepStreamMeta(type):
69 """Metaclass for StepStream that automatically notifies the engine of
70 potential step data change if the step variable is accessed by an overridden
71 (callback) method.
72
73 The alternative to this is to hope that implementors remember to perform this
74 check each time they modify the step, which could be a problem if the ABC
75 methods change at some point in the future. This option concentrates all of
76 the checking evil in one place.
77 """
78
79 def __new__(mcs, name, parents, attrs):
80 assert len(parents) == 1
81
82 # Wrap implemented methods so that if they modify the StepStream's embedded
83 # Step, they will automatically notify the StepStream's engine.
84 attrs['_step'] = property(
85 mcs._wrap_set_step_referenced(attrs['_step'].fget),
86 doc=attrs['_step'].__doc__)
87 for k, v in attrs.iteritems():
88 if k.startswith('__') or not (callable(v) and hasattr(parents[0], k)):
89 continue
90 attrs[k] = mcs._wrap_notify_annotation_changed(v)
91 return super(_StepStreamMeta, mcs).__new__(mcs, name, parents, attrs)
92
93 @classmethod
94 def _wrap_set_step_referenced(mcs, fn):
95 """Wraps the "_step" property to set a flag if referenced.
96
97 This flag is used by functions wrapped via "_wrap_notify_annotated_changed"
98 to detect if they should consider notifying the StreamEngine.
99 """
100 @functools.wraps(fn)
101 def mark_referenced(wrapSelf, *args, **kwargs):
102 wrapSelf._meta_step_referenced = True
103 return fn(wrapSelf, *args, **kwargs)
104 return mark_referenced
105
106 @classmethod
107 def _wrap_notify_annotation_changed(mcs, fn):
108 """Wraps a function to automatically notify the StreamEngine when the Step
109 changes.
110 """
111 @functools.wraps(fn)
112 def notify_after(wrapSelf, *args, **kwargs):
113 wrapSelf._meta_step_referenced = False
114 try:
115 return fn(wrapSelf, *args, **kwargs)
116 finally:
117 if wrapSelf._meta_step_referenced:
118 wrapSelf._engine._notify_annotation_changed()
119 return notify_after
120
121
122 class StreamEngine(stream.StreamEngine):
123 """A stream.StreamEngine implementation that uses Logdog streams and Milo
124 annotation protobufs.
125
126 The generated LogDog streams will be (relative to "name_base"):
127 /annotations
128 The base annotations stream.
129
130 /steps/<step_name>/
131 Base stream name for a given step. Note that if multiple steps normalize
132 to the same <step_name> value, an index will be appended, such as
133 <step_name>_0. This can happen because the stream name component of a
134 step is normalized, so two validly-independent steps ("My Thing" and
135 "My_Thing") will both normalize to "My_Thing". In this case, the second
136 one would have the stream name component, "My_Thing_1".
137
138 /steps/<step_name>/stdout
139 STDOUT stream for step "<step_name>".
140 /steps/<step_name>/stderr
141 STDOUT stream for step "<step_name>".
142
143 /steps/<step_name>/logs/<log_name>/<log_name_index>
144 Stream name for a given step's logs. <log_name_index> is the index of the
145 log with the given normalized name. This is similar to <step_name>, only
146 the index is added as a separate stream name component.
147 """
148
149 # The name of the annotation stream.
150 ANNOTATION_NAME = 'annotations'
151
152 # The default amount of time in between anotation pushes.
153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30)
154
155
156 def __init__(self, client=None, streamserver_uri=None, name_base=None,
157 ignore_triggers=False, environment=None, update_interval=None):
158 """Initializes a new LogDog/Annotation StreamEngine.
159
160 Args:
161 client (libs.logdog.stream.StreamClient or None): the LogDog stream client
162 to use. If this is None, a new StreamClient will be instantiated when
163 this StreamEngine is opened.
164 streamserver_uri (str or None): The LogDog Butler stream server URI. See
165 LogDog client library docs for details on supported protocols and
166 format. This will only be used when "client" is None. If this is also
167 None, a StreamClient will be created through probing.
168 name_base (str or None): The default stream name prefix that will be added
169 to generated LogDog stream names. If None, no prefix will be applied.
170 ignore_triggers (bool): Triggers are not supported in LogDog annotation
171 streams. If True, attempts to trigger will be silently ignored. If
172 False, they will cause a NotImplementedError to be raised.
173 environment (_Environment or None): The _Environment instance to use for
174 operations. This will be None at production, but can be overridden
175 here for testing.
176 update_interval (datetime.timedelta or None): The interval of time between
177 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be
178 used.
179 """
180
181 self._client = client
182 self._streamserver_uri = streamserver_uri
183 self._name_base = _StreamName(name_base)
184 self._ignore_triggers = ignore_triggers
185 self._env = environment or _Environment.real()
186 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL
187
188 self._astate = None
189
190 self._annotation_stream = None
191 self._annotation_monitor = None
192 self._streams = collections.OrderedDict()
193
194
195 class TextStream(stream.StreamEngine.Stream):
196
197 def __init__(self, fd):
198 super(StreamEngine.TextStream, self).__init__()
199 self._fd = fd
200
201 ##
202 # Implement stream.StreamEngine.Stream
203 ##
204
205 def write_line(self, line):
206 self._fd.write(line)
207 self._fd.write('\n')
208
209 def write_split(self, string):
210 self._fd.write(string)
211 if not string.endswith('\n'):
212 self._fd.write('\n')
213
214 def close(self):
215 self._fd.close()
216
217
218 class StepStream(stream.StreamEngine.StepStream):
219 """An individual step stream."""
220
221 __metaclass__ = _StepStreamMeta
222
223 def __init__(self, engine, step):
224 """Initialize a new StepStream.
225
226 Args:
227 engine (StreamEngine): The StreamEngine that owns this StepStream.
228 step (pb.Step): The Step instance that this Stream is managing.
229 """
230 # We will lazily create the STDOUT stream when the first data is written.
231 super(StreamEngine.StepStream, self).__init__()
232
233 self._engine = engine
234
235 self._step_value = step
236 self._step_referenced = False
237
238 # We keep track of the log streams associated with this step.
239 self._log_stream_index = {}
240
241 # We will lazily instantiate our stdout stream when content is actually
242 # written to it.
243 self._stdout_stream = None
244
245 # The retained step summary text. When generating failure details, this
246 # will be consumed to populate their text field.
247 self._summary_text = None
248
249 @classmethod
250 def create(cls, engine, step):
251 strm = cls(engine, step)
252
253 # Start our step.
254 strm._step.msg.status = pb.RUNNING
255 engine._set_timestamp(strm._step.msg.started)
256
257 return strm
258
259 @property
260 def _step(self):
261 return self._step_value
262
263 def _get_stdout(self):
264 if self._stdout_stream is None:
265 # Create a new STDOUT text stream.
266 stream_name = self._step.stream_name_base.append('stdout')
267 self._stdout_stream = self._engine._client.open_text(str(stream_name))
268
269 self._step.msg.stdout_stream.name = str(stream_name)
270 return self._stdout_stream
271
272 ##
273 # Implement stream.StreamEngine.Stream
274 ##
275
276 def write_line(self, line):
277 stdout = self._get_stdout()
278 stdout.write(line)
279 stdout.write('\n')
280
281 def write_split(self, string):
282 stdout = self._get_stdout()
283 stdout.write(string)
284 if not string.endswith('\n'):
285 stdout.write('\n')
286
287 def close(self):
288 if self._stdout_stream is not None:
289 self._stdout_stream.close()
290
291 # If we still have retained summary text, a failure, and no failure detail
292 # text, copy it there.
293 if self._summary_text is not None:
294 if (self._step.msg.HasField('failure_details') and
295 not self._step.msg.failure_details.text):
296 self._step.msg.failure_details.text = self._summary_text
297
298 # Close our Step.
299 self._engine._close_step(self._step.msg)
300
301 ##
302 # Implement stream.StreamEngine.StepStream
303 ##
304
305 def new_log_stream(self, log_name):
306 # Generate the base normalized log stream name for this log.
307 stream_name = self._step.stream_name_base.append('logs', log_name)
308
309 # Add the log stream index to the end of the stream name.
310 index = self._log_stream_index.setdefault(str(stream_name), 0)
311 self._log_stream_index[str(stream_name)] = index + 1
312 stream_name = stream_name.append(str(index))
313
314 # Create a new log stream for this name.
315 fd = self._engine._client.open_text(str(stream_name))
316
317 # Update our step to include the log stream.
318 link = self._step.msg.other_links.add(label=log_name)
319 link.logdog_stream.name = str(stream_name)
320
321 return self._engine.TextStream(fd)
322
323 def add_step_text(self, text):
324 self._step.msg.text.append(text)
325
326 def add_step_summary_text(self, text):
327 self._step.msg.text.insert(0, text)
328 self._summary_text = text
329
330 def add_step_link(self, name, url):
331 self._step.msg.other_links.add(label=name, url=url)
332
333 def reset_subannotation_state(self):
334 pass
335
336 def set_step_status(self, status):
337 if status == 'SUCCESS':
338 self._step.msg.status = pb.SUCCESS
339 elif status == 'WARNING':
340 self._step.msg.status = pb.SUCCESS
341 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL
342 elif status == 'FAILURE':
343 self._step.msg.status = pb.FAILURE
344 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL
345 elif status == 'EXCEPTION':
346 self._step.msg.status = pb.FAILURE
347 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION
348 else:
349 raise ValueError('Unknown status [%s]' % (status,))
350
351 def set_build_property(self, key, value):
352 self._engine._anno.update_properties(key=value)
353
354 def trigger(self, trigger_spec):
355 if self._engine._ignore_triggers:
356 return
357 raise NotImplementedError(
358 'Stream-based triggering is not supported for LogDog. Please use '
359 'a recipe module (e.g., buildbucket) directly for build scheduling.')
360
361
362 def new_step_stream(self, step_config):
363 # TODO(dnj): In the current iteration, subannotations are NOT supported.
364 # In order to support them, they would have to be parsed out of the stream
365 # and converted into Milo Annotation protobuf. This is a non-trivial effort
366 # and may be a waste of time, as in a LogDog-enabled world, the component
367 # emitting sub-annotations would actually just create its own annotation
368 # stream and emit its own Milo protobuf.
369 #
370 # Components that emit subannotations and don't want to be converted to use
371 # LogDog streams could bootstrap themselves through Annotee and let it do
372 # the work.
373 #
374 # For now, though, we explicitly do NOT support LogDog running with
375 # subannotations enabled.
376 if step_config.allow_subannotations:
377 raise NotImplementedError('Subannotations are not supported with LogDog '
378 'output.')
379
380 strm = self.StepStream.create(self, self._astate.create_step(step_config))
381 self._notify_annotation_changed()
382 return strm
383
384 def open(self):
385 # Initialize our client, if one is not provided.
386 if self._client is None:
387 if self._streamserver_uri:
388 self._client = libs.logdog.stream.create(self._streamserver_uri)
389 else:
390 # Probe the stream client via Bootstrap.
391 bootstrap = libs.logdog.bootstrap.probe()
392 self._client = bootstrap.stream_client()
393
394 annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME)
395 self._annotation_stream = self._client.open_datagram(
396 str(annotation_stream_name),
397 content_type=ANNOTATION_CONTENT_TYPE)
398
399 self._annotation_monitor = _AnnotationMonitor(
400 self._env, self._annotation_stream, self._update_interval)
401
402 # Initialize our open streams list.
403 self._streams.clear()
404
405 # Initialize our annotation state.
406 self._astate = _AnnotationState.create(self._name_base,
407 environment=self._env)
408 self._astate.base.status = pb.RUNNING
409 self._set_timestamp(self._astate.base.started)
410 self._notify_annotation_changed()
411
412 def close(self):
413 assert self._astate is not None, 'StreamEngine is not open.'
414
415 # Shut down any outstanding streams that may not have been closed for
416 # whatever reason.
417 for s in reversed(self._streams.values()):
418 s.close()
419
420 # Close out our root Step. Manually check annotation state afterwards.
421 self._close_step(self._astate.base)
422 self._notify_annotation_changed()
423
424 # Shut down our annotation monitor and close our annotation stream.
425 self._annotation_monitor.flush_and_join()
426 self._annotation_stream.close()
427
428 # Clear our client and state. We are now closed.
429 self._streams.clear()
430 self._client = None
431 self._astate = None
432
433 def _notify_annotation_changed(self):
434 if self._astate is None:
435 return
436
437 step_data = self._astate.check()
438 if step_data is not None:
439 self._annotation_monitor.signal_update(step_data)
440
441 def _set_timestamp(self, dst, dt=None):
442 """Populates a timestamp_pb2.Timestamp, dst, with a datetime.
443
444 Args:
445 dst (timestamp_pb2.Timestamp): the timestamp protobuf that will be loaded
446 with the time.
447 dt (datetime.datetime or None): If not None, the datetime to load. If
448 None, the current time (via now) will be used.
449 """
450 dt = (dt) if dt else (self._env.now)
451
452 # Convert to milliseconds from epoch.
453 v = (dt - _EPOCH).total_seconds()
454
455 dst.seconds = int(v)
456 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos.
457
458 def _close_step(self, step):
459 """Closes a step, and any open substeps, propagating status.
460
461 If all of the substeps are already closed, this will do nothing. However, if
462 any are open, it will close them with an infra failure state.
463
464 If any substeps failed, the failure will be propagated to step.
465
466 Args:
467 step (pb.Step): The Step message to close.
468 """
469 # Close any open substeps, in case some of them didn't close.
470 failed = []
471 incomplete = []
472 for sub in step.substep:
473 if not sub.HasField('step'):
474 # Not an embedded substep.
475 continue
476
477 # Did this step actually complete? It should have, by now, so if it didn't
478 # we'll be reporting an infra failure in "step".
479 if sub.step.status not in (pb.SUCCESS, pb.FAILURE):
480 incomplete.append(sub.step)
481
482 # Close this substep. This may be a no-op, if the substep is already
483 # closed.
484 self._close_step(sub.step)
485
486 # If a substep failed, propagate its failure status to "step".
487 if sub.step.status == pb.FAILURE:
488 failed.append(sub.step)
489
490 # If we had any incomplete steps, mark that we failed.
491 if incomplete:
492 step.status = pb.FAILURE
493 if step.failure_details is None:
494 step.failure_details = pb.FailureDetails(
495 type=pb.FailureDetails.INFRA,
496 text='Some substeps did not complete: %s' % (
497 ', '.join(s.name for s in incomplete)),
498 )
499 elif failed:
500 step.status = pb.FAILURE
501 if step.failure_details is None:
502 # This step didn't successfully close, so propagate an infra failure.
503 step.failure_details = pb.FailureDetails(
504 type=pb.FailureDetails.GENERAL,
505 text='Some substeps failed: %s' % (
506 ', '.join(s.name for s in failed)),
507 )
508
509 # Now close "step". If it's still RUNNING, assume that it was successful.
510 if step.status == pb.RUNNING:
511 step.status = pb.SUCCESS
512 if not step.HasField('ended'):
513 self._set_timestamp(step.ended)
514
515
516
517 class _AnnotationMonitor(object):
518 """The owner of the annotation datagram stream, sending annotation updates in
519 a controlled manner and buffering them when the content hasn't changed.
520
521 By default, since annotation state can change rapidly, minor annotation
522 changes are throttled such that they are only actually sent periodically.
523
524 New annotation state updates can be installed by calling `signal_update`.
525 After being started, the _AnnotationMonitor thread must be shut down by
526 calling its `flush_and_join` method.
527 """
528
529 def __init__(self, env, fd, flush_period):
530 self._env = env
531 self._fd = fd
532 self._flush_period = flush_period
533
534 # The following group of variables is protected by "_lock".
535 self._lock = threading.Lock()
536 self._current_data = None
537 self._flush_timer = None
538 self._last_flush_time = None
539 self._last_flush_data = None
540
541 def signal_update(self, step_data, structural=False):
542 """Updates the annotation state with new step data.
543
544 This updates our state to include new step data. The annotation monitor
545 will pick this up and dispatch it, either:
546 - Eventually, when the flush period completes, or
547 - Immediately, if this is a structural change.
548
549 TODO(dnj): Re-examine the use case for "structural" based on actual usage
550 and decide to remove / use it.
551
552 Args:
553 step_data (str): The updated binary annotation protobuf step data.
554 structural (bool): If True, this is a structural update and should be
555 pushed immediately.
556 """
557 with self._lock:
558 # Did our data actually change?
559 if step_data == self._last_flush_data:
560 # Nope, leave things as-is.
561 return
562
563 # This is new data. Is it structural? If so, flush immediately.
564 # If not, make sure our timer is running so it will eventually be flushed.
565 # Note that the timer may also suggest that we flush immediately if we're
566 # already past our last flush interval.
567 now = self._env.now
568 self._current_data = step_data
569 if structural or self._set_flush_timer_locked(now):
570 # We should flush immediately.
571 self._flush_now_locked(now)
572
573 def flush_and_join(self):
574 """Flushes any remaining updates and blocks until the monitor is complete.
575 """
576 # Mark that we're finished and signal our event.
577 with self._lock:
578 self._flush_now_locked(self._env.now)
579
580 @property
581 def latest(self):
582 with self._lock:
583 return self._last_flush_data
584
585 def _flush_now_locked(self, now):
586 # Clear any current flush timer.
587 self._clear_flush_timer_locked()
588
589 # Record this flush.
590 #
591 # We set the last flush time to now because even if we don't actually send
592 # data, we have responded to the flush request.
593 flush_data, self._current_data = self._current_data, None
594 self._last_flush_time = now
595
596 # If the data hasn't changed since the last flush, then don't actually
597 # do anything.
598 if flush_data is None or flush_data == self._last_flush_data:
599 return
600
601 self._last_flush_data = flush_data
602 self._fd.send(flush_data)
603
604 def _clear_flush_timer_locked(self):
605 if self._flush_timer is not None:
606 self._flush_timer.cancel()
607 self._flush_timer = None
608
609 def _set_flush_timer_locked(self, now):
610 if self._flush_timer is not None:
611 # Our flush timer is already running.
612 return False
613
614 if self._last_flush_time is None:
615 # We have never flushed before, so flush immediately.
616 return True
617
618 deadline = self._last_flush_time + self._flush_period
619 if deadline <= now:
620 # We're past our flush deadline, and should flush immediately.
621 return True
622
623 # Start our flush timer.
624 self._flush_timer = threading.Timer((deadline - now).total_seconds(),
625 self._flush_timer_expired)
626 self._flush_timer.daemon = True
627 self._flush_timer.start()
628
629 def _flush_timer_expired(self):
630 with self._lock:
631 self._flush_now_locked(self._env.now)
632
633
634 class _AnnotationState(object):
635 """Manages an outer Milo annotation protobuf Step."""
636
637 Step = collections.namedtuple('Step', (
638 'msg', 'stream_name_base', 'substream_name_index'))
639
640 def __init__(self, base_step, stream_name_base):
641 self._base = self.Step(
642 msg=base_step,
643 stream_name_base=stream_name_base,
644 substream_name_index={})
645 self._check_snapshot = None
646
647 # The current step stack. This is built by updating state after new steps'
648 # nesting levels.
649 self._nest_stack = [self._base]
650
651 # Index initial properties.
652 self._properties = {p.name: p for p in self._base.msg.property}
653
654 @classmethod
655 def create(cls, stream_name_base, environment=None, properties=None):
656 base = pb.Step()
657 base.name = 'steps'
658 base.status = pb.PENDING
659 if environment:
660 if environment.argv:
661 base.command.command_line.extend(environment.argv)
662 if environment.cwd:
663 base.command.cwd = environment.cwd
664 if environment.environ:
665 base.command.environ.update(environment.environ)
666 if properties:
667 for k, v in sorted(properties.iteritems()):
668 base.property.add(name=k, value=v)
669 return cls(base, stream_name_base)
670
671 @property
672 def base(self):
673 return self._base.msg
674
675 def check(self):
676 """Checks if the annotation state has been updated and, if so, returns it.
677
678 After check returns, the latest annotation state will be used as the current
679 snapshot for future checks.
680
681 Returns (str/None): A serialized binary Step protobuf if modified, None
682 otherwise.
683 """
684 if self._check_snapshot is None or self._check_snapshot != self.base:
685 self._check_snapshot = copy.deepcopy(self.base)
686 return self._check_snapshot.SerializeToString()
687 return None
688
689 def create_step(self, step_config):
690 # Identify our parent Step by examining the nesting level. The first step
691 # in the nest stack will always be the base (nesting level "-1", since it's
692 # the parent of level 0). Since the step's "nest_level" is one more than the
693 # parent, and we need to offset by 1 to reach the stack index, they cancel
694 # each other out, so the nest level is the same as the parent's stack index.
695 assert step_config.nest_level < len(self._nest_stack), (
696 'Invalid nest level %d (highest is %d)' % (
697 step_config.nest_level, len(self._nest_stack)-1))
698
699 # Clear any items in the nest stack that are deeper than the current
700 # element.
701 del(self._nest_stack[step_config.nest_level+1:])
702 parent = self._nest_stack[-1]
703
704 # Create a stream name for this step. Even though step names are unique,
705 # the normalized LogDog step name may overlap with a different step name.
706 # We keep track of the step names we've issued to this step space and
707 # add indexes if a conflict is identified.
708 stream_name_base = parent.stream_name_base.append('steps',
709 step_config.base_name)
710 index = parent.substream_name_index.setdefault(str(stream_name_base), 0)
711 parent.substream_name_index[str(stream_name_base)] += 1
712 if index > 0:
713 stream_name_base += '_%d' % (index,)
714
715 # Create and populate our new step.
716 msg = parent.msg.substep.add().step
717 msg.name = step_config.base_name
718 msg.status = pb.PENDING
719 if step_config.cmd:
720 msg.command.command_line.extend(step_config.cmd)
721 if step_config.cwd:
722 msg.command.cwd = step_config.cwd
723 if step_config.env:
724 msg.command.environ = step_config.env
725
726 step = self.Step(
727 msg=msg,
728 stream_name_base=stream_name_base,
729 substream_name_index={})
730 self._nest_stack.append(step)
731 return step
732
733 def update_properties(self, **kwargs):
734 """Updates a Step's property values to incorporate kwargs."""
735 for k, v in sorted(kwargs.iteritems()):
736 cur = self._properties.get(k)
737 if cur is None:
738 cur = self.base.property.add(name=k, value=str(v))
739 self._properties[k] = cur
740 continue
741
742 # A Property message already exists for this key, so update its value.
743 if cur.value != v:
744 cur.value = str(v)
745
746
747 class _StreamName(object):
748 """An immutable validated wrapper for a LogDog stream name."""
749
750 def __init__(self, base):
751 if base is not None:
752 libs.logdog.streamname.validate_stream_name(base)
753 self._base = base
754
755 def append(self, *components):
756 """Returns (_StreamName): A new _StreamName instance with components added.
757
758 Each component in "components" will become a new normalized stream name
759 component. Conseqeuntly, any separators (/) in the components will be
760 replaced with underscores.
761
762 Args:
763 components: the path components to append to this _StreamName.
764 """
765 if len(components) == 0:
766 return self
767
768 components = [self._normalize(self._flatten(p))
769 for p in reversed(components)]
770 if self._base:
771 components.append(self._base)
772 return type(self)('/'.join(reversed(components)))
773
774 def augment(self, val):
775 """Returns (_StreamName): A new _StreamName with "val" appended.
776
777 This generates a new, normalized _StreamName with the contents of "val"
778 appended to the end. For example:
779
780 Original: "foo/bar"
781 Append "baz qux": "foo/barbaz_qux"
782 """
783 if not val:
784 return self
785 val = self._flatten(val)
786 if self._base:
787 val = self._base + val
788 return type(self)(self._normalize(val))
789
790 def __iadd__(self, val):
791 return self.augment(val)
792
793 @staticmethod
794 def _flatten(v):
795 return v.replace('/', '_')
796
797 @staticmethod
798 def _normalize(v):
799 return libs.logdog.streamname.normalize(v, prefix='s_')
800
801 def __str__(self):
802 if not self._base:
803 raise ValueError('Cannot generate string from empty StreamName.')
804 return self._base
OLDNEW
« no previous file with comments | « recipe_engine/stream.py ('k') | recipe_engine/third_party/requests/requests-2.10.0.dist-info/INSTALLER » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698