OLD | NEW |
---|---|
(Empty) | |
1 # Copyright 2015 The LUCI Authors. All rights reserved. | |
2 # Use of this source code is governed under the Apache License, Version 2.0 | |
3 # that can be found in the LICENSE file. | |
4 | |
5 """stream.StreamEngine implementation for LogDog, using Milo annotation | |
6 protobuf. | |
7 """ | |
8 | |
9 import collections | |
10 import contextlib | |
11 import copy | |
12 import datetime | |
13 import os | |
14 import threading | |
15 import sys | |
16 | |
17 from . import env | |
18 from . import stream | |
19 from . import util | |
20 | |
21 import google.protobuf.message | |
22 import google.protobuf.timestamp_pb2 as timestamp_pb2 | |
23 import libs.logdog.bootstrap | |
24 import libs.logdog.stream | |
25 import libs.logdog.streamname | |
26 import annotations_pb2 as pb | |
27 | |
28 | |
29 # The datetime for the epoch. | |
30 _EPOCH = datetime.datetime.utcfromtimestamp(0) | |
31 | |
32 # The annotation stream ContentType. | |
33 # | |
34 # This must match the ContentType for the annotation binary protobuf, which | |
35 # is specified in "<luci-go>/common/proto/milo/util.go". | |
36 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2' | |
37 | |
38 | |
39 class _Environment(object): | |
40 """Simulated system environment. The StreamEngine uses this to probe for | |
41 system parameters. By default, the environment will be derived from the | |
42 actual system. | |
43 """ | |
44 | |
45 def __init__(self, now_fn, argv, environ, cwd): | |
46 self._now_fn = now_fn | |
47 self.argv = argv | |
48 self.environ = environ | |
49 self.cwd = cwd | |
50 | |
51 @property | |
52 def now(self): | |
53 return self._now_fn() | |
54 | |
55 @classmethod | |
56 def probe(cls): | |
iannucci
2016/10/10 19:05:28
maybe `real`?
dnj
2016/10/10 22:56:23
Done.
| |
57 return cls( | |
58 now_fn=datetime.datetime.now, | |
59 argv=sys.argv[:], | |
60 environ=dict(os.environ), | |
61 cwd=os.getcwd(), | |
62 ) | |
63 | |
64 | |
65 def _check_annotation_changed(fn): | |
66 """Decorator that can be applied to a StepStream instance method to call | |
67 our bound Engine's _check after the function is finished. | |
68 | |
69 This should decorate any method that modifies annotation state. | |
iannucci
2016/10/10 19:05:28
why not just build this into the annotation state?
dnj
2016/10/10 22:56:23
The only way to do that would be to have the proto
| |
70 """ | |
71 def check_after(inner, *args, **kwargs): | |
72 v = fn(inner, *args, **kwargs) | |
73 inner._engine._check() | |
74 return v | |
75 return check_after | |
76 | |
77 | |
78 class StreamEngine(stream.StreamEngine): | |
79 """A stream.StreamEngine implementation that uses Logdog streams and Milo | |
80 annotation protobufs. | |
81 | |
82 The generated LogDog streams will be (relative to "name_base"): | |
83 /annotations | |
84 The base annotations stream. | |
85 | |
86 /steps/<step_name>/ | |
87 Base stream name for a given step. Note that if multiple steps normalize | |
88 to the same <step_name> value, an index will be appended, such as | |
89 <step_name>_0. This can happen because the stream name component of a | |
90 step is normalized, so two validly-independent steps ("My Thing" and | |
91 "My_Thing") will both normalize to "My_Thing". In this case, the second | |
92 one would have the stream name component, "My_Thing_1". | |
93 | |
94 /steps/<step_name>/stdout | |
95 STDOUT stream for step "<step_name>". | |
96 /steps/<step_name>/stderr | |
97 STDOUT stream for step "<step_name>". | |
98 | |
99 /steps/<step_name>/logs/<log_name>/<log_name_index> | |
100 Stream name for a given step's logs. <log_name_index> is the index of the | |
101 log with the given normalized name. This is similar to <step_name>, only | |
102 the index is added as a separate stream name component. | |
103 """ | |
104 | |
105 # The name of the annotation stream. | |
106 ANNOTATION_NAME = 'annotations' | |
107 | |
108 | |
109 def __init__(self, client=None, streamserver_uri=None, name_base=None, | |
110 ignore_triggers=False, environment=None): | |
111 """Initializes a new LogDog/Annotation StreamEngine. | |
112 | |
113 Args: | |
114 client (libs.logdog.stream.StreamClient or None): the LogDog stream client | |
115 to use. If this is None, a new StreamClient will be instantiated when | |
116 this StreamEngine is opened. | |
117 streamserver_uri (str or None): The LogDog Butler stream server URI. See | |
118 LogDog client library docs for details on supported protocols and | |
119 format. This will only be used when "client" is None. If this is also | |
120 None, a StreamClient will be created through probing. | |
121 name_base (str or None): The default stream name prefix that will be added | |
122 to generated LogDog stream names. If None, no prefix will be applied. | |
iannucci
2016/10/10 19:05:27
does it actually make sense for this to be None?
dnj
2016/10/10 22:56:23
Depends on the intended layout of the stream. ATM
| |
123 ignore_triggers (bool): Triggers are not supported in LogDog annotation | |
124 streams. If True, attempts to trigger will be silently ignored. If | |
125 False, they will cause a NotImplementedError to be raised. | |
126 environment (_Environment or None): The _Environment instance to use for | |
127 operations. This will be None at production, but can be overridden | |
128 here for testing. | |
129 """ | |
130 | |
131 self._client = client | |
132 self._streamserver_uri = streamserver_uri | |
133 self._name_base = _StreamName(name_base) | |
134 self._ignore_triggers = ignore_triggers | |
135 self._env = environment or _Environment.probe() | |
136 | |
137 self._astate = None | |
138 | |
139 self._annotation_stream = None | |
140 self._annotation_monitor = None | |
141 self._streams = collections.OrderedDict() | |
142 | |
143 | |
144 class TextStream(stream.StreamEngine.Stream): | |
145 | |
146 def __init__(self, fd): | |
147 super(StreamEngine.TextStream, self).__init__() | |
148 self._fd = fd | |
149 | |
150 ## | |
151 # Implement stream.StreamEngine.Stream | |
152 ## | |
153 | |
154 def write_line(self, line): | |
155 self._fd.write(line) | |
156 self._fd.write('\n') | |
157 | |
158 def write_split(self, string): | |
159 self._fd.write(string) | |
160 if not string.endswith('\n'): | |
161 self._fd.write('\n') | |
162 | |
163 def close(self): | |
164 self._fd.close() | |
165 | |
166 | |
167 class StepStream(stream.StreamEngine.StepStream): | |
168 """An individual step stream.""" | |
169 | |
170 def __init__(self, engine, step): | |
171 """Initialize a new StepStream. | |
172 | |
173 Args: | |
174 engine (StreamEngine): The StreamEngine that owns this StepStream. | |
175 step (pb.Step): The Step instance that this Stream is managing. | |
176 """ | |
177 # We will lazily create the STDOUT stream when the first data is written. | |
178 super(StreamEngine.StepStream, self).__init__() | |
179 | |
180 self._engine = engine | |
181 self._step = step | |
182 | |
183 # We keep track of the log streams associated with this step. | |
184 self._log_stream_index = {} | |
185 | |
186 # We will lazily instantiate our stdout stream when content is actually | |
187 # written to it. | |
188 self._stdout_stream = None | |
189 | |
190 # The retained step summary text. When generating failure details, this | |
191 # will be consumed to populate their text field. | |
192 self._summary_text = None | |
193 | |
194 @classmethod | |
195 def create(cls, engine, step): | |
196 strm = cls(engine, step) | |
197 | |
198 # Start our step. | |
199 strm._step.msg.status = pb.RUNNING | |
200 engine._set_timestamp(strm._step.msg.started) | |
201 | |
202 return strm | |
203 | |
204 @_check_annotation_changed | |
205 def _get_stdout(self): | |
206 if self._stdout_stream is None: | |
207 # Create a new STDOUT text stream. | |
208 stream_name = self._step.stream_name_base.append('stdout') | |
209 self._stdout_stream = self._engine._client.open_text(str(stream_name)) | |
210 | |
211 self._step.msg.stdout_stream.name = str(stream_name) | |
212 return self._stdout_stream | |
213 | |
214 ## | |
215 # Implement stream.StreamEngine.Stream | |
216 ## | |
217 | |
218 def write_line(self, line): | |
219 stdout = self._get_stdout() | |
220 stdout.write(line) | |
221 stdout.write('\n') | |
222 | |
223 def write_split(self, string): | |
224 stdout = self._get_stdout() | |
225 stdout.write(string) | |
226 if not string.endswith('\n'): | |
227 stdout.write('\n') | |
228 | |
229 @_check_annotation_changed | |
230 def close(self): | |
231 if self._stdout_stream is not None: | |
232 self._stdout_stream.close() | |
233 | |
234 # If we still have retained summary text, a failure, and no failure detail | |
235 # text, copy it there. | |
236 if self._summary_text is not None: | |
237 if (self._step.msg.HasField('failure_details') and | |
238 not self._step.msg.failure_details.text): | |
239 self._step.msg.failure_details.text = self._summary_text | |
240 | |
241 # Close our Step. | |
242 self._engine._close_step(self._step.msg) | |
243 | |
244 ## | |
245 # Implement stream.StreamEngine.StepStream | |
246 ## | |
247 | |
248 @_check_annotation_changed | |
249 def new_log_stream(self, log_name): | |
250 # Generate the base normalized log stream name for this log. | |
251 stream_name = self._step.stream_name_base.append('logs', log_name) | |
252 | |
253 # Add the log stream index to the end of the stream name. | |
254 index = self._log_stream_index.setdefault(str(stream_name), 0) | |
255 self._log_stream_index[str(stream_name)] = index + 1 | |
256 stream_name = stream_name.append(str(index)) | |
257 | |
258 # Create a new log stream for this name. | |
259 fd = self._engine._client.open_text(str(stream_name)) | |
260 | |
261 # Update our step to include the log stream. | |
262 link = self._step.msg.other_links.add(label=log_name) | |
263 link.logdog_stream.name = str(stream_name) | |
264 | |
265 return self._engine.TextStream(fd) | |
266 | |
267 @_check_annotation_changed | |
268 def add_step_text(self, text): | |
269 self._step.msg.text.append(text) | |
270 | |
271 @_check_annotation_changed | |
272 def add_step_summary_text(self, text): | |
273 self._step.msg.text.insert(0, text) | |
274 self._summary_text = text | |
275 | |
276 @_check_annotation_changed | |
277 def add_step_link(self, name, url): | |
278 self._step.msg.other_links.add(label=name, url=url) | |
279 | |
280 def reset_subannotation_state(self): | |
281 pass | |
282 | |
283 @_check_annotation_changed | |
284 def set_step_status(self, status): | |
285 if status == 'SUCCESS': | |
286 self._step.msg.status = pb.SUCCESS | |
287 elif status == 'WARNING': | |
288 self._step.msg.status = pb.SUCCESS | |
289 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL | |
290 elif status == 'FAILURE': | |
291 self._step.msg.status = pb.FAILURE | |
292 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL | |
293 elif status == 'EXCEPTION': | |
294 self._step.msg.status = pb.FAILURE | |
295 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION | |
296 else: | |
297 raise ValueError('Unknown status [%s]' % (status,)) | |
298 | |
299 @_check_annotation_changed | |
300 def set_build_property(self, key, value): | |
301 self._engine._anno.update_properties(key=value) | |
302 | |
303 def trigger(self, trigger_spec): | |
304 if self._engine._ignore_triggers: | |
305 return | |
306 raise NotImplementedError( | |
307 'Stream-based triggering is not supported for LogDog. Please use ' | |
308 'a recipe module (e.g., buildbucket) directly for build scheduling.') | |
309 | |
310 | |
311 def new_step_stream(self, step_config): | |
312 # TODO(dnj): In the current iteration, subannotations are NOT supported. | |
313 # In order to support them, they would have to be parsed out of the stream | |
314 # and converted into Milo Annotation protobuf. This is a non-trivial effort | |
315 # and may be a waste of time, as in a LogDog-enabled world, the component | |
316 # emitting sub-annotations would actually just create its own annotation | |
317 # stream and emit its own Milo protobuf. | |
318 # | |
319 # Components that emit subannotations and don't want to be converted to use | |
320 # LogDog streams could bootstrap themselves through Annotee and let it do | |
321 # the work. | |
322 # | |
323 # For now, though, we explicitly do NOT support LogDog running with | |
324 # subannotations enabled. | |
325 if step_config.allow_subannotations: | |
326 raise NotImplementedError('Subannotations are not supported with LogDog ' | |
327 'output.') | |
328 | |
329 strm = self.StepStream.create(self, self._astate.create_step(step_config)) | |
330 self._check() | |
331 return strm | |
332 | |
333 def open(self): | |
334 # Initialize our client, if one is not provided. | |
335 if self._client is None: | |
336 if self._streamserver_uri: | |
337 self._client = libs.logdog.stream.create(self._streamserver_uri) | |
338 else: | |
339 # Probe the stream client via Bootstrap. | |
340 bootstrap = libs.logdog.bootstrap.probe() | |
341 self._client = bootstrap.stream_client() | |
342 | |
343 annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME) | |
344 self._annotation_stream = self._client.open_datagram( | |
345 str(annotation_stream_name), | |
346 content_type=ANNOTATION_CONTENT_TYPE) | |
347 | |
348 self._annotation_monitor = _AnnotationMonitor(self._annotation_stream) | |
349 | |
350 # Initialize our open streams list. | |
351 self._streams.clear() | |
352 | |
353 # Initialize our annotation state. | |
354 self._astate = _AnnotationState.create(self._name_base, | |
355 environment=self._env) | |
356 self._astate.base.status = pb.RUNNING | |
357 self._set_timestamp(self._astate.base.started) | |
358 self._check() | |
359 | |
360 def close(self): | |
361 assert self._astate is not None, 'StreamEngine is not open.' | |
362 | |
363 # Shut down any outstanding streams that may not have been closed for | |
364 # whatever reason. | |
365 for s in reversed(self._streams.values()): | |
366 s.close() | |
367 | |
368 # Close out our root Step. Manually check annotation state afterwards. | |
369 self._close_step(self._astate.base) | |
370 self._check() | |
371 | |
372 # Shut down our annotation monitor and close our annotation stream. | |
373 self._annotation_monitor.flush_and_join() | |
374 self._annotation_stream.close() | |
375 | |
376 # Clear our client and state. We are now closed. | |
377 self._streams.clear() | |
378 self._client = None | |
379 self._astate = None | |
380 | |
381 def _check(self): | |
382 if self._astate is None: | |
383 return | |
384 | |
385 step_data = self._astate.check() | |
386 if step_data is not None: | |
387 self._annotation_monitor.signal_update(step_data) | |
388 | |
389 def _set_timestamp(self, dst, dt=None): | |
390 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. | |
391 | |
392 Args: | |
393 dst (timestamp_pb2.Timestamp): the timestamp protobuf that will be loaded | |
394 with the time. | |
395 dt (datetime.datetime or None): If not None, the datetime to load. If | |
396 None, the current time (via now) will be used. | |
397 """ | |
398 dt = (dt) if dt else (self._env.now) | |
iannucci
2016/10/10 19:05:27
why the extra parens?
dnj
2016/10/10 22:56:23
I think it makes the one-line "X if Y else Z" clea
| |
399 | |
400 # Convert to milliseconds from epoch. | |
401 v = (dt - _EPOCH).total_seconds() | |
402 | |
403 dst.seconds = int(v) | |
404 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos. | |
405 | |
406 def _close_step(self, step): | |
iannucci
2016/10/10 19:05:28
really unfortunate that this logic has to be here.
dnj
2016/10/10 22:56:23
Realistically the engine has already implemented t
| |
407 """Closes a step, and any open substeps, propagating status. | |
408 | |
409 If all of the substeps are already closed, this will do nothing. However, if | |
410 any are open, it will close them with an infra failure state. | |
411 | |
412 If any substeps failed, the failure will be propagated to step. | |
413 | |
414 Args: | |
415 step (pb.Step): The Step message to close. | |
416 """ | |
417 # Close any open substeps, in case some of them didn't close. | |
418 failed = [] | |
419 incomplete = [] | |
420 for sub in step.substep: | |
421 if not sub.HasField('step'): | |
422 # Not an embedded substep. | |
423 continue | |
424 | |
425 # Did this step actually complete? It should have, by now, so if it didn't | |
426 # we'll be reporting an infra failure in "step". | |
427 if sub.step.status not in (pb.SUCCESS, pb.FAILURE): | |
428 incomplete.append(sub.step) | |
429 | |
430 # Close this substep. This may be a no-op, if the substep is already | |
431 # closed. | |
432 self._close_step(sub.step) | |
433 | |
434 # If a substep failed, propagate its failure status to "step". | |
435 if sub.step.status == pb.FAILURE: | |
436 failed.append(sub.step) | |
437 | |
438 # If we had any incomplete steps, mark that we failed. | |
439 if incomplete: | |
440 step.status = pb.FAILURE | |
441 if step.failure_details is None: | |
442 step.failure_details = pb.FailureDetails( | |
443 type=pb.FailureDetails.INFRA, | |
444 text='Some substeps did not complete: %s' % ( | |
445 ', '.join(s.name for s in incomplete)), | |
446 ) | |
447 elif failed: | |
448 step.status = pb.FAILURE | |
449 if step.failure_details is None: | |
450 # This step didn't successfully close, so propagate an infra failure. | |
451 step.failure_details = pb.FailureDetails( | |
452 type=pb.FailureDetails.GENERAL, | |
453 text='Some substeps failed: %s' % ( | |
454 ', '.join(s.name for s in failed)), | |
455 ) | |
456 | |
457 # Now close "step". If it's still RUNNING, assume that it was successful. | |
458 if step.status == pb.RUNNING: | |
459 step.status = pb.SUCCESS | |
460 if not step.HasField('ended'): | |
461 self._set_timestamp(step.ended) | |
462 | |
463 | |
464 | |
465 class _AnnotationMonitor(object): | |
466 """The owner of the annotation datagram stream, sending annotation updates in | |
467 a controlled manner and buffering them when the content hasn't changed. | |
468 | |
469 By default, since annotation state can change rapidly, minor annotation | |
470 changes are throttled such that they are only actually sent periodically. | |
471 | |
472 New annotation state updates can be installed by calling `signal_update`. | |
473 After being started, the _AnnotationMonitor thread must be shut down by | |
474 calling its `flush_and_join` method. | |
475 """ | |
476 | |
477 # Flush interval for non-structural events. | |
478 _ANNOTATION_MONITOR_PERIOD = datetime.timedelta(seconds=30) | |
iannucci
2016/10/10 19:05:27
I think 5 might be better, if you're just trying t
dnj
2016/10/10 22:56:23
Yeah, or "+% increase" or rapidly iterating proper
| |
479 | |
480 def __init__(self, fd, flush_period=None): | |
481 self._fd = fd | |
482 self._flush_period = flush_period or self._ANNOTATION_MONITOR_PERIOD | |
483 | |
484 # The following group of variables is protected by "_lock". | |
485 self._lock = threading.Lock() | |
486 self._current_data = None | |
487 self._flush_timer = None | |
488 self._last_flush_time = None | |
489 self._last_flush_data = None | |
490 | |
491 def signal_update(self, step_data, structural=False): | |
492 """Updates the annotation state with new step data. | |
493 | |
494 This updates our state to include new step data. The annotation monitor | |
495 thread will pick this up and dispatch it, either: | |
496 - Eventually, when the flush period completes, or | |
497 - Immediately, if this is a structural change. | |
498 | |
499 TODO(dnj): Re-examine the use case for "structural" based on actual usage | |
500 and decide to remove / use it. | |
501 | |
502 Args: | |
503 step_data (str): The updated binary annotation protobuf step data. | |
504 structural (bool): If True, this is a structural update and should be | |
505 pushed immediately. | |
506 """ | |
507 with self._lock: | |
508 # Did our data actually change? | |
509 if step_data == self._last_flush_data: | |
510 # Nope, leave things as-is. | |
511 return | |
512 | |
513 # This is new data. Is it structural? If so, flush immediately. | |
514 # If not, make sure our timer is running so it will eventually be flushed. | |
515 # Note that the timer may also suggest that we flush immediately. | |
516 now = datetime.datetime.now() | |
517 self._current_data = step_data | |
518 if structural or self._set_flush_timer_locked(now): | |
519 # We should flush immediately. | |
520 self._flush_now_locked(now) | |
521 | |
522 def flush_and_join(self): | |
523 """Flushes any remaining updates and blocks until the monitor is complete. | |
524 """ | |
525 # Mark that we're finished and signal our event. | |
526 with self._lock: | |
527 self._flush_now_locked(datetime.datetime.now()) | |
528 | |
529 @property | |
530 def latest(self): | |
531 with self._lock: | |
532 return self._last_flush_data | |
533 | |
534 def _flush_now_locked(self, now): | |
535 # Clear any current flush timer. | |
536 self._clear_flush_timer_locked() | |
537 | |
538 # Record this flush. | |
539 # | |
540 # We set the last flush time to now because even if we don't actually send | |
541 # data, we have responded to the flush request. | |
542 flush_data, self._current_data = self._current_data, None | |
543 self._last_flush_time = now | |
544 | |
545 # If the data hasn't changed since the last flush, then don't actually | |
546 # do anything. | |
547 if flush_data is None or flush_data == self._last_flush_data: | |
548 return | |
549 | |
550 self._last_flush_data = flush_data | |
551 self._fd.send(flush_data) | |
552 | |
553 def _clear_flush_timer_locked(self): | |
554 if self._flush_timer is not None: | |
555 self._flush_timer.cancel() | |
556 self._flush_timer = None | |
557 | |
558 def _set_flush_timer_locked(self, now): | |
559 if self._flush_timer is not None: | |
560 # Our flush timer is already running. | |
561 return False | |
562 | |
563 if self._last_flush_time is None: | |
564 # We have never flushed before, so flush immediately. | |
565 return True | |
566 | |
567 deadline = self._last_flush_time + self._flush_period | |
568 if deadline <= now: | |
569 # We're past our flush deadline, and should flush immediately. | |
570 return True | |
571 | |
572 # Start our flush timer. | |
573 self._flush_timer = threading.Timer((deadline - now).total_seconds(), | |
574 self._flush_timer_expired) | |
575 self._flush_timer.daemon = True | |
576 self._flush_timer.start() | |
577 | |
578 def _flush_timer_expired(self): | |
579 with self._lock: | |
580 self._flush_now_locked(datetime.datetime.now()) | |
581 | |
582 | |
583 class _AnnotationState(object): | |
584 """Manages an outer Milo annotation protobuf Step.""" | |
585 | |
586 Step = collections.namedtuple('Step', ( | |
587 'msg', 'stream_name_base', 'substream_name_index')) | |
588 | |
589 def __init__(self, base_step, stream_name_base): | |
590 self._base = self.Step( | |
591 msg=base_step, | |
592 stream_name_base=stream_name_base, | |
593 substream_name_index={}) | |
594 self._check_snapshot = None | |
595 | |
596 # The current step stack. This is built by updating state after new steps' | |
597 # nesting levels. | |
598 self._nest_stack = [self._base] | |
599 | |
600 # Index initial properties. | |
601 self._properties = dict((p.name, p) for p in self._base.msg.property) | |
iannucci
2016/10/10 19:05:28
dict comprehensions work too btw.
Do we guarantee
dnj
2016/10/10 22:56:23
Acknowledged.
| |
602 | |
603 @classmethod | |
604 def create(cls, stream_name_base, environment=None, properties=None): | |
605 base = pb.Step() | |
606 base.name = 'steps' | |
607 base.status = pb.PENDING | |
608 if environment: | |
609 if environment.argv: | |
610 base.command.command_line.extend(environment.argv) | |
611 if environment.cwd: | |
612 base.command.cwd = environment.cwd | |
613 if environment.environ: | |
614 base.command.environ.update(environment.environ) | |
615 if properties: | |
616 for k, v in sorted(properties.iteritems()): | |
617 base.property.add(name=k, value=v) | |
618 return cls(base, stream_name_base) | |
619 | |
620 @property | |
621 def base(self): | |
622 return self._base.msg | |
623 | |
624 def check(self): | |
625 """Checks if the annotation state has been updated and, if so, returns it. | |
626 | |
627 After check returns, the latest annotation state will be used as the current | |
628 snapshot for future checks. | |
629 | |
630 Returns (str/None): A serialized binary Step protobuf if modified, None | |
631 otherwise. | |
632 """ | |
633 if self._check_snapshot is None or self._check_snapshot != self.base: | |
634 self._check_snapshot = copy.deepcopy(self.base) | |
635 return self._check_snapshot.SerializeToString() | |
636 return None | |
637 | |
638 def create_step(self, step_config): | |
639 # Identify our parent Step by examining the nesting level. The first step | |
640 # in the nest stack will always be the base (nesting level "-1", since it's | |
641 # the parent of level 0). Since the step's "nest_level" is one more than the | |
642 # parent, and we need to offset by 1 to reach the stack index, they cancel | |
643 # each other out, so the nest level is the same as the parent's stack index. | |
644 assert step_config.nest_level < len(self._nest_stack), ( | |
645 'Invalid nest level %d (highest is %d)' % ( | |
646 step_config.nest_level, len(self._nest_stack)-1)) | |
647 | |
648 # Clear any items in the nest stack that are deeper than the current | |
649 # element. | |
650 del(self._nest_stack[step_config.nest_level+1:]) | |
651 parent = self._nest_stack[-1] | |
652 | |
653 # Create a stream name for this step. Even though step names are unique, | |
654 # the normalized LogDog step name may overlap with a different step name. | |
655 # We keep track of the step names we've issued to this step space and | |
656 # add indexes if a conflict is identified. | |
657 stream_name_base = parent.stream_name_base.append('steps', | |
658 step_config.base_name) | |
659 index = parent.substream_name_index.setdefault(str(stream_name_base), 0) | |
660 parent.substream_name_index[str(stream_name_base)] += 1 | |
661 if index > 0: | |
662 stream_name_base += '_%d' % (index,) | |
663 | |
664 # Create and populate our new step. | |
665 msg = parent.msg.substep.add().step | |
666 msg.name = step_config.base_name | |
667 msg.status = pb.PENDING | |
668 if step_config.cmd: | |
669 msg.command.command_line.extend(step_config.cmd) | |
670 if step_config.cwd: | |
671 msg.command.cwd = step_config.cwd | |
672 if step_config.env: | |
673 msg.command.environ = step_config.env | |
674 | |
675 step = self.Step( | |
676 msg=msg, | |
677 stream_name_base=stream_name_base, | |
678 substream_name_index={}) | |
679 self._nest_stack.append(step) | |
680 return step | |
681 | |
682 def update_properties(self, **kwargs): | |
683 """Updates a set's property values to incorporate kwargs.""" | |
iannucci
2016/10/10 19:05:27
s/set/step?
dnj
2016/10/10 22:56:23
Done.
| |
684 for k, v in sorted(kwargs.iteritems()): | |
685 cur = self._properties.get(k) | |
686 if cur is None: | |
687 cur = self.base.property.add(name=k, value=str(v)) | |
688 self._properties[k] = cur | |
689 continue | |
690 | |
691 # A Property message already exists for this key, so update its value. | |
692 if cur.value != v: | |
693 cur.value = str(v) | |
694 | |
695 | |
696 class _StreamName(object): | |
697 """An immutable validated wrapper for a LogDog stream name.""" | |
698 | |
699 def __init__(self, base): | |
700 if base is not None: | |
701 libs.logdog.streamname.validate_stream_name(base) | |
702 self._base = base | |
703 | |
704 def append(self, *components): | |
705 """Returns (_StreamName): A new _StreamName instance with components added. | |
706 | |
707 Each component in "components" will become a new normalized stream name | |
708 component. Conseqeuntly, any separators (/) in the components will be | |
709 replaced with underscores. | |
710 | |
711 Args: | |
712 components: the path components to append to this _StreamName. | |
713 """ | |
714 if len(components) == 0: | |
715 return self | |
716 | |
717 components = [self._normalize(self._flatten(p)) | |
718 for p in reversed(components)] | |
719 if self._base: | |
720 components.append(self._base) | |
721 return type(self)('/'.join(reversed(components))) | |
722 | |
723 def augment(self, val): | |
724 """Returns (_StreamName): A new _StreamName with "val" appended. | |
725 | |
726 This generates a new, normalized _StreamName with the contents of "val" | |
727 appended to the end. For example: | |
728 | |
729 Original: "foo/bar" | |
730 Append "baz qux": "foo/barbaz_qux" | |
731 """ | |
732 if not val: | |
733 return self | |
734 val = self._flatten(val) | |
735 if self._base: | |
736 val = self._base + val | |
737 return type(self)(self._normalize(val)) | |
738 | |
739 def __iadd__(self, val): | |
740 return self.augment(val) | |
741 | |
742 @staticmethod | |
743 def _flatten(v): | |
744 return v.replace('/', '_') | |
745 | |
746 @staticmethod | |
747 def _normalize(v): | |
748 return libs.logdog.streamname.normalize(v, prefix='s_') | |
749 | |
750 def __str__(self): | |
751 if not self._base: | |
752 raise ValueError('Cannot generate string from empty StreamName.') | |
753 return self._base | |
OLD | NEW |