Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(235)

Side by Side Diff: recipe_engine/stream_logdog.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Code review comments. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """stream.StreamEngine implementation for LogDog, using Milo annotation
6 protobuf.
7 """
8
9 import collections
10 import contextlib
11 import copy
12 import datetime
13 import os
14 import threading
15 import sys
16
17 from . import env
18 from . import stream
19 from . import util
20
21 import google.protobuf.message
22 import google.protobuf.timestamp_pb2 as timestamp_pb2
23 import libs.logdog.bootstrap
24 import libs.logdog.stream
25 import libs.logdog.streamname
26 import annotations_pb2 as pb
27
28
29 # The datetime for the epoch.
30 _EPOCH = datetime.datetime.utcfromtimestamp(0)
31
32 # The annotation stream ContentType.
33 #
34 # This must match the ContentType for the annotation binary protobuf, which
35 # is specified in "<luci-go>/common/proto/milo/util.go".
36 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2'
37
38
39 class _Environment(object):
40 """Simulated system environment. The StreamEngine uses this to probe for
41 system parameters. By default, the environment will be derived from the
42 actual system.
43 """
44
45 def __init__(self, now_fn, argv, environ, cwd):
46 self._now_fn = now_fn
47 self.argv = argv
48 self.environ = environ
49 self.cwd = cwd
50
51 @property
52 def now(self):
53 return self._now_fn()
54
55 @classmethod
56 def probe(cls):
iannucci 2016/10/10 19:05:28 maybe `real`?
dnj 2016/10/10 22:56:23 Done.
57 return cls(
58 now_fn=datetime.datetime.now,
59 argv=sys.argv[:],
60 environ=dict(os.environ),
61 cwd=os.getcwd(),
62 )
63
64
65 def _check_annotation_changed(fn):
66 """Decorator that can be applied to a StepStream instance method to call
67 our bound Engine's _check after the function is finished.
68
69 This should decorate any method that modifies annotation state.
iannucci 2016/10/10 19:05:28 why not just build this into the annotation state?
dnj 2016/10/10 22:56:23 The only way to do that would be to have the proto
70 """
71 def check_after(inner, *args, **kwargs):
72 v = fn(inner, *args, **kwargs)
73 inner._engine._check()
74 return v
75 return check_after
76
77
78 class StreamEngine(stream.StreamEngine):
79 """A stream.StreamEngine implementation that uses Logdog streams and Milo
80 annotation protobufs.
81
82 The generated LogDog streams will be (relative to "name_base"):
83 /annotations
84 The base annotations stream.
85
86 /steps/<step_name>/
87 Base stream name for a given step. Note that if multiple steps normalize
88 to the same <step_name> value, an index will be appended, such as
89 <step_name>_0. This can happen because the stream name component of a
90 step is normalized, so two validly-independent steps ("My Thing" and
91 "My_Thing") will both normalize to "My_Thing". In this case, the second
92 one would have the stream name component, "My_Thing_1".
93
94 /steps/<step_name>/stdout
95 STDOUT stream for step "<step_name>".
96 /steps/<step_name>/stderr
97 STDOUT stream for step "<step_name>".
98
99 /steps/<step_name>/logs/<log_name>/<log_name_index>
100 Stream name for a given step's logs. <log_name_index> is the index of the
101 log with the given normalized name. This is similar to <step_name>, only
102 the index is added as a separate stream name component.
103 """
104
105 # The name of the annotation stream.
106 ANNOTATION_NAME = 'annotations'
107
108
109 def __init__(self, client=None, streamserver_uri=None, name_base=None,
110 ignore_triggers=False, environment=None):
111 """Initializes a new LogDog/Annotation StreamEngine.
112
113 Args:
114 client (libs.logdog.stream.StreamClient or None): the LogDog stream client
115 to use. If this is None, a new StreamClient will be instantiated when
116 this StreamEngine is opened.
117 streamserver_uri (str or None): The LogDog Butler stream server URI. See
118 LogDog client library docs for details on supported protocols and
119 format. This will only be used when "client" is None. If this is also
120 None, a StreamClient will be created through probing.
121 name_base (str or None): The default stream name prefix that will be added
122 to generated LogDog stream names. If None, no prefix will be applied.
iannucci 2016/10/10 19:05:27 does it actually make sense for this to be None?
dnj 2016/10/10 22:56:23 Depends on the intended layout of the stream. ATM
123 ignore_triggers (bool): Triggers are not supported in LogDog annotation
124 streams. If True, attempts to trigger will be silently ignored. If
125 False, they will cause a NotImplementedError to be raised.
126 environment (_Environment or None): The _Environment instance to use for
127 operations. This will be None at production, but can be overridden
128 here for testing.
129 """
130
131 self._client = client
132 self._streamserver_uri = streamserver_uri
133 self._name_base = _StreamName(name_base)
134 self._ignore_triggers = ignore_triggers
135 self._env = environment or _Environment.probe()
136
137 self._astate = None
138
139 self._annotation_stream = None
140 self._annotation_monitor = None
141 self._streams = collections.OrderedDict()
142
143
144 class TextStream(stream.StreamEngine.Stream):
145
146 def __init__(self, fd):
147 super(StreamEngine.TextStream, self).__init__()
148 self._fd = fd
149
150 ##
151 # Implement stream.StreamEngine.Stream
152 ##
153
154 def write_line(self, line):
155 self._fd.write(line)
156 self._fd.write('\n')
157
158 def write_split(self, string):
159 self._fd.write(string)
160 if not string.endswith('\n'):
161 self._fd.write('\n')
162
163 def close(self):
164 self._fd.close()
165
166
167 class StepStream(stream.StreamEngine.StepStream):
168 """An individual step stream."""
169
170 def __init__(self, engine, step):
171 """Initialize a new StepStream.
172
173 Args:
174 engine (StreamEngine): The StreamEngine that owns this StepStream.
175 step (pb.Step): The Step instance that this Stream is managing.
176 """
177 # We will lazily create the STDOUT stream when the first data is written.
178 super(StreamEngine.StepStream, self).__init__()
179
180 self._engine = engine
181 self._step = step
182
183 # We keep track of the log streams associated with this step.
184 self._log_stream_index = {}
185
186 # We will lazily instantiate our stdout stream when content is actually
187 # written to it.
188 self._stdout_stream = None
189
190 # The retained step summary text. When generating failure details, this
191 # will be consumed to populate their text field.
192 self._summary_text = None
193
194 @classmethod
195 def create(cls, engine, step):
196 strm = cls(engine, step)
197
198 # Start our step.
199 strm._step.msg.status = pb.RUNNING
200 engine._set_timestamp(strm._step.msg.started)
201
202 return strm
203
204 @_check_annotation_changed
205 def _get_stdout(self):
206 if self._stdout_stream is None:
207 # Create a new STDOUT text stream.
208 stream_name = self._step.stream_name_base.append('stdout')
209 self._stdout_stream = self._engine._client.open_text(str(stream_name))
210
211 self._step.msg.stdout_stream.name = str(stream_name)
212 return self._stdout_stream
213
214 ##
215 # Implement stream.StreamEngine.Stream
216 ##
217
218 def write_line(self, line):
219 stdout = self._get_stdout()
220 stdout.write(line)
221 stdout.write('\n')
222
223 def write_split(self, string):
224 stdout = self._get_stdout()
225 stdout.write(string)
226 if not string.endswith('\n'):
227 stdout.write('\n')
228
229 @_check_annotation_changed
230 def close(self):
231 if self._stdout_stream is not None:
232 self._stdout_stream.close()
233
234 # If we still have retained summary text, a failure, and no failure detail
235 # text, copy it there.
236 if self._summary_text is not None:
237 if (self._step.msg.HasField('failure_details') and
238 not self._step.msg.failure_details.text):
239 self._step.msg.failure_details.text = self._summary_text
240
241 # Close our Step.
242 self._engine._close_step(self._step.msg)
243
244 ##
245 # Implement stream.StreamEngine.StepStream
246 ##
247
248 @_check_annotation_changed
249 def new_log_stream(self, log_name):
250 # Generate the base normalized log stream name for this log.
251 stream_name = self._step.stream_name_base.append('logs', log_name)
252
253 # Add the log stream index to the end of the stream name.
254 index = self._log_stream_index.setdefault(str(stream_name), 0)
255 self._log_stream_index[str(stream_name)] = index + 1
256 stream_name = stream_name.append(str(index))
257
258 # Create a new log stream for this name.
259 fd = self._engine._client.open_text(str(stream_name))
260
261 # Update our step to include the log stream.
262 link = self._step.msg.other_links.add(label=log_name)
263 link.logdog_stream.name = str(stream_name)
264
265 return self._engine.TextStream(fd)
266
267 @_check_annotation_changed
268 def add_step_text(self, text):
269 self._step.msg.text.append(text)
270
271 @_check_annotation_changed
272 def add_step_summary_text(self, text):
273 self._step.msg.text.insert(0, text)
274 self._summary_text = text
275
276 @_check_annotation_changed
277 def add_step_link(self, name, url):
278 self._step.msg.other_links.add(label=name, url=url)
279
280 def reset_subannotation_state(self):
281 pass
282
283 @_check_annotation_changed
284 def set_step_status(self, status):
285 if status == 'SUCCESS':
286 self._step.msg.status = pb.SUCCESS
287 elif status == 'WARNING':
288 self._step.msg.status = pb.SUCCESS
289 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL
290 elif status == 'FAILURE':
291 self._step.msg.status = pb.FAILURE
292 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL
293 elif status == 'EXCEPTION':
294 self._step.msg.status = pb.FAILURE
295 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION
296 else:
297 raise ValueError('Unknown status [%s]' % (status,))
298
299 @_check_annotation_changed
300 def set_build_property(self, key, value):
301 self._engine._anno.update_properties(key=value)
302
303 def trigger(self, trigger_spec):
304 if self._engine._ignore_triggers:
305 return
306 raise NotImplementedError(
307 'Stream-based triggering is not supported for LogDog. Please use '
308 'a recipe module (e.g., buildbucket) directly for build scheduling.')
309
310
311 def new_step_stream(self, step_config):
312 # TODO(dnj): In the current iteration, subannotations are NOT supported.
313 # In order to support them, they would have to be parsed out of the stream
314 # and converted into Milo Annotation protobuf. This is a non-trivial effort
315 # and may be a waste of time, as in a LogDog-enabled world, the component
316 # emitting sub-annotations would actually just create its own annotation
317 # stream and emit its own Milo protobuf.
318 #
319 # Components that emit subannotations and don't want to be converted to use
320 # LogDog streams could bootstrap themselves through Annotee and let it do
321 # the work.
322 #
323 # For now, though, we explicitly do NOT support LogDog running with
324 # subannotations enabled.
325 if step_config.allow_subannotations:
326 raise NotImplementedError('Subannotations are not supported with LogDog '
327 'output.')
328
329 strm = self.StepStream.create(self, self._astate.create_step(step_config))
330 self._check()
331 return strm
332
333 def open(self):
334 # Initialize our client, if one is not provided.
335 if self._client is None:
336 if self._streamserver_uri:
337 self._client = libs.logdog.stream.create(self._streamserver_uri)
338 else:
339 # Probe the stream client via Bootstrap.
340 bootstrap = libs.logdog.bootstrap.probe()
341 self._client = bootstrap.stream_client()
342
343 annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME)
344 self._annotation_stream = self._client.open_datagram(
345 str(annotation_stream_name),
346 content_type=ANNOTATION_CONTENT_TYPE)
347
348 self._annotation_monitor = _AnnotationMonitor(self._annotation_stream)
349
350 # Initialize our open streams list.
351 self._streams.clear()
352
353 # Initialize our annotation state.
354 self._astate = _AnnotationState.create(self._name_base,
355 environment=self._env)
356 self._astate.base.status = pb.RUNNING
357 self._set_timestamp(self._astate.base.started)
358 self._check()
359
360 def close(self):
361 assert self._astate is not None, 'StreamEngine is not open.'
362
363 # Shut down any outstanding streams that may not have been closed for
364 # whatever reason.
365 for s in reversed(self._streams.values()):
366 s.close()
367
368 # Close out our root Step. Manually check annotation state afterwards.
369 self._close_step(self._astate.base)
370 self._check()
371
372 # Shut down our annotation monitor and close our annotation stream.
373 self._annotation_monitor.flush_and_join()
374 self._annotation_stream.close()
375
376 # Clear our client and state. We are now closed.
377 self._streams.clear()
378 self._client = None
379 self._astate = None
380
381 def _check(self):
382 if self._astate is None:
383 return
384
385 step_data = self._astate.check()
386 if step_data is not None:
387 self._annotation_monitor.signal_update(step_data)
388
389 def _set_timestamp(self, dst, dt=None):
390 """Populates a timestamp_pb2.Timestamp, dst, with a datetime.
391
392 Args:
393 dst (timestamp_pb2.Timestamp): the timestamp protobuf that will be loaded
394 with the time.
395 dt (datetime.datetime or None): If not None, the datetime to load. If
396 None, the current time (via now) will be used.
397 """
398 dt = (dt) if dt else (self._env.now)
iannucci 2016/10/10 19:05:27 why the extra parens?
dnj 2016/10/10 22:56:23 I think it makes the one-line "X if Y else Z" clea
399
400 # Convert to milliseconds from epoch.
401 v = (dt - _EPOCH).total_seconds()
402
403 dst.seconds = int(v)
404 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos.
405
406 def _close_step(self, step):
iannucci 2016/10/10 19:05:28 really unfortunate that this logic has to be here.
dnj 2016/10/10 22:56:23 Realistically the engine has already implemented t
407 """Closes a step, and any open substeps, propagating status.
408
409 If all of the substeps are already closed, this will do nothing. However, if
410 any are open, it will close them with an infra failure state.
411
412 If any substeps failed, the failure will be propagated to step.
413
414 Args:
415 step (pb.Step): The Step message to close.
416 """
417 # Close any open substeps, in case some of them didn't close.
418 failed = []
419 incomplete = []
420 for sub in step.substep:
421 if not sub.HasField('step'):
422 # Not an embedded substep.
423 continue
424
425 # Did this step actually complete? It should have, by now, so if it didn't
426 # we'll be reporting an infra failure in "step".
427 if sub.step.status not in (pb.SUCCESS, pb.FAILURE):
428 incomplete.append(sub.step)
429
430 # Close this substep. This may be a no-op, if the substep is already
431 # closed.
432 self._close_step(sub.step)
433
434 # If a substep failed, propagate its failure status to "step".
435 if sub.step.status == pb.FAILURE:
436 failed.append(sub.step)
437
438 # If we had any incomplete steps, mark that we failed.
439 if incomplete:
440 step.status = pb.FAILURE
441 if step.failure_details is None:
442 step.failure_details = pb.FailureDetails(
443 type=pb.FailureDetails.INFRA,
444 text='Some substeps did not complete: %s' % (
445 ', '.join(s.name for s in incomplete)),
446 )
447 elif failed:
448 step.status = pb.FAILURE
449 if step.failure_details is None:
450 # This step didn't successfully close, so propagate an infra failure.
451 step.failure_details = pb.FailureDetails(
452 type=pb.FailureDetails.GENERAL,
453 text='Some substeps failed: %s' % (
454 ', '.join(s.name for s in failed)),
455 )
456
457 # Now close "step". If it's still RUNNING, assume that it was successful.
458 if step.status == pb.RUNNING:
459 step.status = pb.SUCCESS
460 if not step.HasField('ended'):
461 self._set_timestamp(step.ended)
462
463
464
465 class _AnnotationMonitor(object):
466 """The owner of the annotation datagram stream, sending annotation updates in
467 a controlled manner and buffering them when the content hasn't changed.
468
469 By default, since annotation state can change rapidly, minor annotation
470 changes are throttled such that they are only actually sent periodically.
471
472 New annotation state updates can be installed by calling `signal_update`.
473 After being started, the _AnnotationMonitor thread must be shut down by
474 calling its `flush_and_join` method.
475 """
476
477 # Flush interval for non-structural events.
478 _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30)
iannucci 2016/10/10 19:05:27 I think 5 might be better, if you're just trying t
dnj 2016/10/10 22:56:23 Yeah, or "+% increase" or rapidly iterating proper
479
480 def __init__(self, fd, flush_period=None):
481 self._fd = fd
482 self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD
483
484 # The following group of variables is protected by "_lock".
485 self._lock = threading.Lock()
486 self._current_data = None
487 self._flush_timer = None
488 self._last_flush_time = None
489 self._last_flush_data = None
490
491 def signal_update(self, step_data, structural=False):
492 """Updates the annotation state with new step data.
493
494 This updates our state to include new step data. The annotation monitor
495 thread will pick this up and dispatch it, either:
496 - Eventually, when the flush period completes, or
497 - Immediately, if this is a structural change.
498
499 TODO(dnj): Re-examine the use case for "structural" based on actual usage
500 and decide to remove / use it.
501
502 Args:
503 step_data (str): The updated binary annotation protobuf step data.
504 structural (bool): If True, this is a structural update and should be
505 pushed immediately.
506 """
507 with self._lock:
508 # Did our data actually change?
509 if step_data == self._last_flush_data:
510 # Nope, leave things as-is.
511 return
512
513 # This is new data. Is it structural? If so, flush immediately.
514 # If not, make sure our timer is running so it will eventually be flushed.
515 # Note that the timer may also suggest that we flush immediately.
516 now = datetime.datetime.now()
517 self._current_data = step_data
518 if structural or self._set_flush_timer_locked(now):
519 # We should flush immediately.
520 self._flush_now_locked(now)
521
522 def flush_and_join(self):
523 """Flushes any remaining updates and blocks until the monitor is complete.
524 """
525 # Mark that we're finished and signal our event.
526 with self._lock:
527 self._flush_now_locked(datetime.datetime.now())
528
529 @property
530 def latest(self):
531 with self._lock:
532 return self._last_flush_data
533
534 def _flush_now_locked(self, now):
535 # Clear any current flush timer.
536 self._clear_flush_timer_locked()
537
538 # Record this flush.
539 #
540 # We set the last flush time to now because even if we don't actually send
541 # data, we have responded to the flush request.
542 flush_data, self._current_data = self._current_data, None
543 self._last_flush_time = now
544
545 # If the data hasn't changed since the last flush, then don't actually
546 # do anything.
547 if flush_data is None or flush_data == self._last_flush_data:
548 return
549
550 self._last_flush_data = flush_data
551 self._fd.send(flush_data)
552
553 def _clear_flush_timer_locked(self):
554 if self._flush_timer is not None:
555 self._flush_timer.cancel()
556 self._flush_timer = None
557
558 def _set_flush_timer_locked(self, now):
559 if self._flush_timer is not None:
560 # Our flush timer is already running.
561 return False
562
563 if self._last_flush_time is None:
564 # We have never flushed before, so flush immediately.
565 return True
566
567 deadline = self._last_flush_time + self._flush_period
568 if deadline <= now:
569 # We're past our flush deadline, and should flush immediately.
570 return True
571
572 # Start our flush timer.
573 self._flush_timer = threading.Timer((deadline - now).total_seconds(),
574 self._flush_timer_expired)
575 self._flush_timer.daemon = True
576 self._flush_timer.start()
577
578 def _flush_timer_expired(self):
579 with self._lock:
580 self._flush_now_locked(datetime.datetime.now())
581
582
583 class _AnnotationState(object):
584 """Manages an outer Milo annotation protobuf Step."""
585
586 Step = collections.namedtuple('Step', (
587 'msg', 'stream_name_base', 'substream_name_index'))
588
589 def __init__(self, base_step, stream_name_base):
590 self._base = self.Step(
591 msg=base_step,
592 stream_name_base=stream_name_base,
593 substream_name_index={})
594 self._check_snapshot = None
595
596 # The current step stack. This is built by updating state after new steps'
597 # nesting levels.
598 self._nest_stack = [self._base]
599
600 # Index initial properties.
601 self._properties = dict((p.name, p) for p in self._base.msg.property)
iannucci 2016/10/10 19:05:28 dict comprehensions work too btw. Do we guarantee
dnj 2016/10/10 22:56:23 Acknowledged.
602
603 @classmethod
604 def create(cls, stream_name_base, environment=None, properties=None):
605 base = pb.Step()
606 base.name = 'steps'
607 base.status = pb.PENDING
608 if environment:
609 if environment.argv:
610 base.command.command_line.extend(environment.argv)
611 if environment.cwd:
612 base.command.cwd = environment.cwd
613 if environment.environ:
614 base.command.environ.update(environment.environ)
615 if properties:
616 for k, v in sorted(properties.iteritems()):
617 base.property.add(name=k, value=v)
618 return cls(base, stream_name_base)
619
620 @property
621 def base(self):
622 return self._base.msg
623
624 def check(self):
625 """Checks if the annotation state has been updated and, if so, returns it.
626
627 After check returns, the latest annotation state will be used as the current
628 snapshot for future checks.
629
630 Returns (str/None): A serialized binary Step protobuf if modified, None
631 otherwise.
632 """
633 if self._check_snapshot is None or self._check_snapshot != self.base:
634 self._check_snapshot = copy.deepcopy(self.base)
635 return self._check_snapshot.SerializeToString()
636 return None
637
638 def create_step(self, step_config):
639 # Identify our parent Step by examining the nesting level. The first step
640 # in the nest stack will always be the base (nesting level "-1", since it's
641 # the parent of level 0). Since the step's "nest_level" is one more than the
642 # parent, and we need to offset by 1 to reach the stack index, they cancel
643 # each other out, so the nest level is the same as the parent's stack index.
644 assert step_config.nest_level < len(self._nest_stack), (
645 'Invalid nest level %d (highest is %d)' % (
646 step_config.nest_level, len(self._nest_stack)-1))
647
648 # Clear any items in the nest stack that are deeper than the current
649 # element.
650 del(self._nest_stack[step_config.nest_level+1:])
651 parent = self._nest_stack[-1]
652
653 # Create a stream name for this step. Even though step names are unique,
654 # the normalized LogDog step name may overlap with a different step name.
655 # We keep track of the step names we've issued to this step space and
656 # add indexes if a conflict is identified.
657 stream_name_base = parent.stream_name_base.append('steps',
658 step_config.base_name)
659 index = parent.substream_name_index.setdefault(str(stream_name_base), 0)
660 parent.substream_name_index[str(stream_name_base)] += 1
661 if index > 0:
662 stream_name_base += '_%d' % (index,)
663
664 # Create and populate our new step.
665 msg = parent.msg.substep.add().step
666 msg.name = step_config.base_name
667 msg.status = pb.PENDING
668 if step_config.cmd:
669 msg.command.command_line.extend(step_config.cmd)
670 if step_config.cwd:
671 msg.command.cwd = step_config.cwd
672 if step_config.env:
673 msg.command.environ = step_config.env
674
675 step = self.Step(
676 msg=msg,
677 stream_name_base=stream_name_base,
678 substream_name_index={})
679 self._nest_stack.append(step)
680 return step
681
682 def update_properties(self, **kwargs):
683 """Updates a set's property values to incorporate kwargs."""
iannucci 2016/10/10 19:05:27 s/set/step?
dnj 2016/10/10 22:56:23 Done.
684 for k, v in sorted(kwargs.iteritems()):
685 cur = self._properties.get(k)
686 if cur is None:
687 cur = self.base.property.add(name=k, value=str(v))
688 self._properties[k] = cur
689 continue
690
691 # A Property message already exists for this key, so update its value.
692 if cur.value != v:
693 cur.value = str(v)
694
695
696 class _StreamName(object):
697 """An immutable validated wrapper for a LogDog stream name."""
698
699 def __init__(self, base):
700 if base is not None:
701 libs.logdog.streamname.validate_stream_name(base)
702 self._base = base
703
704 def append(self, *components):
705 """Returns (_StreamName): A new _StreamName instance with components added.
706
707 Each component in "components" will become a new normalized stream name
708 component. Conseqeuntly, any separators (/) in the components will be
709 replaced with underscores.
710
711 Args:
712 components: the path components to append to this _StreamName.
713 """
714 if len(components) == 0:
715 return self
716
717 components = [self._normalize(self._flatten(p))
718 for p in reversed(components)]
719 if self._base:
720 components.append(self._base)
721 return type(self)('/'.join(reversed(components)))
722
723 def augment(self, val):
724 """Returns (_StreamName): A new _StreamName with "val" appended.
725
726 This generates a new, normalized _StreamName with the contents of "val"
727 appended to the end. For example:
728
729 Original: "foo/bar"
730 Append "baz qux": "foo/barbaz_qux"
731 """
732 if not val:
733 return self
734 val = self._flatten(val)
735 if self._base:
736 val = self._base + val
737 return type(self)(self._normalize(val))
738
739 def __iadd__(self, val):
740 return self.augment(val)
741
742 @staticmethod
743 def _flatten(v):
744 return v.replace('/', '_')
745
746 @staticmethod
747 def _normalize(v):
748 return libs.logdog.streamname.normalize(v, prefix='s_')
749
750 def __str__(self):
751 if not self._base:
752 raise ValueError('Cannot generate string from empty StreamName.')
753 return self._base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698