OLD | NEW |
(Empty) | |
| 1 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. |
| 4 |
| 5 """stream.StreamEngine implementation for LogDog, using Milo annotation |
| 6 protobuf. |
| 7 """ |
| 8 |
| 9 import collections |
| 10 import contextlib |
| 11 import copy |
| 12 import datetime |
| 13 import functools |
| 14 import itertools |
| 15 import os |
| 16 import threading |
| 17 import sys |
| 18 |
| 19 from . import env |
| 20 from . import stream |
| 21 from . import util |
| 22 |
| 23 import google.protobuf.message |
| 24 import google.protobuf.timestamp_pb2 as timestamp_pb2 |
| 25 import libs.logdog.bootstrap |
| 26 import libs.logdog.stream |
| 27 import libs.logdog.streamname |
| 28 import annotations_pb2 as pb |
| 29 |
| 30 |
| 31 # The datetime for the epoch. |
| 32 _EPOCH = datetime.datetime.utcfromtimestamp(0) |
| 33 |
| 34 # The annotation stream ContentType. |
| 35 # |
| 36 # This must match the ContentType for the annotation binary protobuf, which |
| 37 # is specified in "<luci-go>/common/proto/milo/util.go". |
| 38 ANNOTATION_CONTENT_TYPE = 'text/x-chrome-infra-annotations; version=2' |
| 39 |
| 40 |
| 41 class _Environment(object): |
| 42 """Simulated system environment. The StreamEngine uses this to probe for |
| 43 system parameters. By default, the environment will be derived from the |
| 44 actual system. |
| 45 """ |
| 46 |
| 47 def __init__(self, now_fn, argv, environ, cwd): |
| 48 self._now_fn = now_fn |
| 49 self.argv = argv |
| 50 self.environ = environ |
| 51 self.cwd = cwd |
| 52 |
| 53 @property |
| 54 def now(self): |
| 55 return self._now_fn() |
| 56 |
| 57 @classmethod |
| 58 def real(cls): |
| 59 """Returns (_Environment): An _Environment bound to the real system""" |
| 60 return cls( |
| 61 now_fn=datetime.datetime.now, |
| 62 argv=sys.argv[:], |
| 63 environ=dict(os.environ), |
| 64 cwd=os.getcwd(), |
| 65 ) |
| 66 |
| 67 |
| 68 class _StepStreamMeta(type): |
| 69 """Metaclass for StepStream that automatically notifies the engine of |
| 70 potential step data change if the step variable is accessed by an overridden |
| 71 (callback) method. |
| 72 |
| 73 The alternative to this is to hope that implementors remember to perform this |
| 74 check each time they modify the step, which could be a problem if the ABC |
| 75 methods change at some point in the future. This option concentrates all of |
| 76 the checking evil in one place. |
| 77 """ |
| 78 |
| 79 def __new__(mcs, name, parents, attrs): |
| 80 assert len(parents) == 1 |
| 81 |
| 82 # Wrap implemented methods so that if they modify the StepStream's embedded |
| 83 # Step, they will automatically notify the StepStream's engine. |
| 84 attrs['_step'] = property( |
| 85 mcs._wrap_set_step_referenced(attrs['_step'].fget), |
| 86 doc=attrs['_step'].__doc__) |
| 87 for k, v in attrs.iteritems(): |
| 88 if k.startswith('__') or not (callable(v) and hasattr(parents[0], k)): |
| 89 continue |
| 90 attrs[k] = mcs._wrap_notify_annotation_changed(v) |
| 91 return super(_StepStreamMeta, mcs).__new__(mcs, name, parents, attrs) |
| 92 |
| 93 @classmethod |
| 94 def _wrap_set_step_referenced(mcs, fn): |
| 95 """Wraps the "_step" property to set a flag if referenced. |
| 96 |
| 97 This flag is used by functions wrapped via "_wrap_notify_annotated_changed" |
| 98 to detect if they should consider notifying the StreamEngine. |
| 99 """ |
| 100 @functools.wraps(fn) |
| 101 def mark_referenced(wrapSelf, *args, **kwargs): |
| 102 wrapSelf._meta_step_referenced = True |
| 103 return fn(wrapSelf, *args, **kwargs) |
| 104 return mark_referenced |
| 105 |
| 106 @classmethod |
| 107 def _wrap_notify_annotation_changed(mcs, fn): |
| 108 """Wraps a function to automatically notify the StreamEngine when the Step |
| 109 changes. |
| 110 """ |
| 111 @functools.wraps(fn) |
| 112 def notify_after(wrapSelf, *args, **kwargs): |
| 113 wrapSelf._meta_step_referenced = False |
| 114 try: |
| 115 return fn(wrapSelf, *args, **kwargs) |
| 116 finally: |
| 117 if wrapSelf._meta_step_referenced: |
| 118 wrapSelf._engine._notify_annotation_changed() |
| 119 return notify_after |
| 120 |
| 121 |
| 122 class StreamEngine(stream.StreamEngine): |
| 123 """A stream.StreamEngine implementation that uses Logdog streams and Milo |
| 124 annotation protobufs. |
| 125 |
| 126 The generated LogDog streams will be (relative to "name_base"): |
| 127 /annotations |
| 128 The base annotations stream. |
| 129 |
| 130 /steps/<step_name>/ |
| 131 Base stream name for a given step. Note that if multiple steps normalize |
| 132 to the same <step_name> value, an index will be appended, such as |
| 133 <step_name>_0. This can happen because the stream name component of a |
| 134 step is normalized, so two validly-independent steps ("My Thing" and |
| 135 "My_Thing") will both normalize to "My_Thing". In this case, the second |
| 136 one would have the stream name component, "My_Thing_1". |
| 137 |
| 138 /steps/<step_name>/stdout |
| 139 STDOUT stream for step "<step_name>". |
| 140 /steps/<step_name>/stderr |
| 141 STDOUT stream for step "<step_name>". |
| 142 |
| 143 /steps/<step_name>/logs/<log_name>/<log_name_index> |
| 144 Stream name for a given step's logs. <log_name_index> is the index of the |
| 145 log with the given normalized name. This is similar to <step_name>, only |
| 146 the index is added as a separate stream name component. |
| 147 """ |
| 148 |
| 149 # The name of the annotation stream. |
| 150 ANNOTATION_NAME = 'annotations' |
| 151 |
| 152 # The default amount of time in between anotation pushes. |
| 153 DEFAULT_UPDATE_INTERVAL = datetime.timedelta(seconds=30) |
| 154 |
| 155 |
| 156 def __init__(self, client=None, streamserver_uri=None, name_base=None, |
| 157 ignore_triggers=False, environment=None, update_interval=None): |
| 158 """Initializes a new LogDog/Annotation StreamEngine. |
| 159 |
| 160 Args: |
| 161 client (libs.logdog.stream.StreamClient or None): the LogDog stream client |
| 162 to use. If this is None, a new StreamClient will be instantiated when |
| 163 this StreamEngine is opened. |
| 164 streamserver_uri (str or None): The LogDog Butler stream server URI. See |
| 165 LogDog client library docs for details on supported protocols and |
| 166 format. This will only be used when "client" is None. If this is also |
| 167 None, a StreamClient will be created through probing. |
| 168 name_base (str or None): The default stream name prefix that will be added |
| 169 to generated LogDog stream names. If None, no prefix will be applied. |
| 170 ignore_triggers (bool): Triggers are not supported in LogDog annotation |
| 171 streams. If True, attempts to trigger will be silently ignored. If |
| 172 False, they will cause a NotImplementedError to be raised. |
| 173 environment (_Environment or None): The _Environment instance to use for |
| 174 operations. This will be None at production, but can be overridden |
| 175 here for testing. |
| 176 update_interval (datetime.timedelta or None): The interval of time between |
| 177 annotation data pushes. If None, DEFAULT_UPDATE_INTERVAL will be |
| 178 used. |
| 179 """ |
| 180 |
| 181 self._client = client |
| 182 self._streamserver_uri = streamserver_uri |
| 183 self._name_base = _StreamName(name_base) |
| 184 self._ignore_triggers = ignore_triggers |
| 185 self._env = environment or _Environment.real() |
| 186 self._update_interval = update_interval or self.DEFAULT_UPDATE_INTERVAL |
| 187 |
| 188 self._astate = None |
| 189 |
| 190 self._annotation_stream = None |
| 191 self._annotation_monitor = None |
| 192 self._streams = collections.OrderedDict() |
| 193 |
| 194 |
| 195 class TextStream(stream.StreamEngine.Stream): |
| 196 |
| 197 def __init__(self, fd): |
| 198 super(StreamEngine.TextStream, self).__init__() |
| 199 self._fd = fd |
| 200 |
| 201 ## |
| 202 # Implement stream.StreamEngine.Stream |
| 203 ## |
| 204 |
| 205 def write_line(self, line): |
| 206 self._fd.write(line) |
| 207 self._fd.write('\n') |
| 208 |
| 209 def write_split(self, string): |
| 210 self._fd.write(string) |
| 211 if not string.endswith('\n'): |
| 212 self._fd.write('\n') |
| 213 |
| 214 def close(self): |
| 215 self._fd.close() |
| 216 |
| 217 |
| 218 class StepStream(stream.StreamEngine.StepStream): |
| 219 """An individual step stream.""" |
| 220 |
| 221 __metaclass__ = _StepStreamMeta |
| 222 |
| 223 def __init__(self, engine, step): |
| 224 """Initialize a new StepStream. |
| 225 |
| 226 Args: |
| 227 engine (StreamEngine): The StreamEngine that owns this StepStream. |
| 228 step (pb.Step): The Step instance that this Stream is managing. |
| 229 """ |
| 230 # We will lazily create the STDOUT stream when the first data is written. |
| 231 super(StreamEngine.StepStream, self).__init__() |
| 232 |
| 233 self._engine = engine |
| 234 |
| 235 self._step_value = step |
| 236 self._step_referenced = False |
| 237 |
| 238 # We keep track of the log streams associated with this step. |
| 239 self._log_stream_index = {} |
| 240 |
| 241 # We will lazily instantiate our stdout stream when content is actually |
| 242 # written to it. |
| 243 self._stdout_stream = None |
| 244 |
| 245 # The retained step summary text. When generating failure details, this |
| 246 # will be consumed to populate their text field. |
| 247 self._summary_text = None |
| 248 |
| 249 @classmethod |
| 250 def create(cls, engine, step): |
| 251 strm = cls(engine, step) |
| 252 |
| 253 # Start our step. |
| 254 strm._step.msg.status = pb.RUNNING |
| 255 engine._set_timestamp(strm._step.msg.started) |
| 256 |
| 257 return strm |
| 258 |
| 259 @property |
| 260 def _step(self): |
| 261 return self._step_value |
| 262 |
| 263 def _get_stdout(self): |
| 264 if self._stdout_stream is None: |
| 265 # Create a new STDOUT text stream. |
| 266 stream_name = self._step.stream_name_base.append('stdout') |
| 267 self._stdout_stream = self._engine._client.open_text(str(stream_name)) |
| 268 |
| 269 self._step.msg.stdout_stream.name = str(stream_name) |
| 270 return self._stdout_stream |
| 271 |
| 272 ## |
| 273 # Implement stream.StreamEngine.Stream |
| 274 ## |
| 275 |
| 276 def write_line(self, line): |
| 277 stdout = self._get_stdout() |
| 278 stdout.write(line) |
| 279 stdout.write('\n') |
| 280 |
| 281 def write_split(self, string): |
| 282 stdout = self._get_stdout() |
| 283 stdout.write(string) |
| 284 if not string.endswith('\n'): |
| 285 stdout.write('\n') |
| 286 |
| 287 def close(self): |
| 288 if self._stdout_stream is not None: |
| 289 self._stdout_stream.close() |
| 290 |
| 291 # If we still have retained summary text, a failure, and no failure detail |
| 292 # text, copy it there. |
| 293 if self._summary_text is not None: |
| 294 if (self._step.msg.HasField('failure_details') and |
| 295 not self._step.msg.failure_details.text): |
| 296 self._step.msg.failure_details.text = self._summary_text |
| 297 |
| 298 # Close our Step. |
| 299 self._engine._close_step(self._step.msg) |
| 300 |
| 301 ## |
| 302 # Implement stream.StreamEngine.StepStream |
| 303 ## |
| 304 |
| 305 def new_log_stream(self, log_name): |
| 306 # Generate the base normalized log stream name for this log. |
| 307 stream_name = self._step.stream_name_base.append('logs', log_name) |
| 308 |
| 309 # Add the log stream index to the end of the stream name. |
| 310 index = self._log_stream_index.setdefault(str(stream_name), 0) |
| 311 self._log_stream_index[str(stream_name)] = index + 1 |
| 312 stream_name = stream_name.append(str(index)) |
| 313 |
| 314 # Create a new log stream for this name. |
| 315 fd = self._engine._client.open_text(str(stream_name)) |
| 316 |
| 317 # Update our step to include the log stream. |
| 318 link = self._step.msg.other_links.add(label=log_name) |
| 319 link.logdog_stream.name = str(stream_name) |
| 320 |
| 321 return self._engine.TextStream(fd) |
| 322 |
| 323 def add_step_text(self, text): |
| 324 self._step.msg.text.append(text) |
| 325 |
| 326 def add_step_summary_text(self, text): |
| 327 self._step.msg.text.insert(0, text) |
| 328 self._summary_text = text |
| 329 |
| 330 def add_step_link(self, name, url): |
| 331 self._step.msg.other_links.add(label=name, url=url) |
| 332 |
| 333 def reset_subannotation_state(self): |
| 334 pass |
| 335 |
| 336 def set_step_status(self, status): |
| 337 if status == 'SUCCESS': |
| 338 self._step.msg.status = pb.SUCCESS |
| 339 elif status == 'WARNING': |
| 340 self._step.msg.status = pb.SUCCESS |
| 341 self._step.msg.failure_details.type = pb.FailureDetails.GENERAL |
| 342 elif status == 'FAILURE': |
| 343 self._step.msg.status = pb.FAILURE |
| 344 self._step.msg.failure_details.type=pb.FailureDetails.GENERAL |
| 345 elif status == 'EXCEPTION': |
| 346 self._step.msg.status = pb.FAILURE |
| 347 self._step.msg.failure_details.type = pb.FailureDetails.EXCEPTION |
| 348 else: |
| 349 raise ValueError('Unknown status [%s]' % (status,)) |
| 350 |
| 351 def set_build_property(self, key, value): |
| 352 self._engine._anno.update_properties(key=value) |
| 353 |
| 354 def trigger(self, trigger_spec): |
| 355 if self._engine._ignore_triggers: |
| 356 return |
| 357 raise NotImplementedError( |
| 358 'Stream-based triggering is not supported for LogDog. Please use ' |
| 359 'a recipe module (e.g., buildbucket) directly for build scheduling.') |
| 360 |
| 361 |
| 362 def new_step_stream(self, step_config): |
| 363 # TODO(dnj): In the current iteration, subannotations are NOT supported. |
| 364 # In order to support them, they would have to be parsed out of the stream |
| 365 # and converted into Milo Annotation protobuf. This is a non-trivial effort |
| 366 # and may be a waste of time, as in a LogDog-enabled world, the component |
| 367 # emitting sub-annotations would actually just create its own annotation |
| 368 # stream and emit its own Milo protobuf. |
| 369 # |
| 370 # Components that emit subannotations and don't want to be converted to use |
| 371 # LogDog streams could bootstrap themselves through Annotee and let it do |
| 372 # the work. |
| 373 # |
| 374 # For now, though, we explicitly do NOT support LogDog running with |
| 375 # subannotations enabled. |
| 376 if step_config.allow_subannotations: |
| 377 raise NotImplementedError('Subannotations are not supported with LogDog ' |
| 378 'output.') |
| 379 |
| 380 strm = self.StepStream.create(self, self._astate.create_step(step_config)) |
| 381 self._notify_annotation_changed() |
| 382 return strm |
| 383 |
| 384 def open(self): |
| 385 # Initialize our client, if one is not provided. |
| 386 if self._client is None: |
| 387 if self._streamserver_uri: |
| 388 self._client = libs.logdog.stream.create(self._streamserver_uri) |
| 389 else: |
| 390 # Probe the stream client via Bootstrap. |
| 391 bootstrap = libs.logdog.bootstrap.probe() |
| 392 self._client = bootstrap.stream_client() |
| 393 |
| 394 annotation_stream_name = self._name_base.append(self.ANNOTATION_NAME) |
| 395 self._annotation_stream = self._client.open_datagram( |
| 396 str(annotation_stream_name), |
| 397 content_type=ANNOTATION_CONTENT_TYPE) |
| 398 |
| 399 self._annotation_monitor = _AnnotationMonitor( |
| 400 self._env, self._annotation_stream, self._update_interval) |
| 401 |
| 402 # Initialize our open streams list. |
| 403 self._streams.clear() |
| 404 |
| 405 # Initialize our annotation state. |
| 406 self._astate = _AnnotationState.create(self._name_base, |
| 407 environment=self._env) |
| 408 self._astate.base.status = pb.RUNNING |
| 409 self._set_timestamp(self._astate.base.started) |
| 410 self._notify_annotation_changed() |
| 411 |
| 412 def close(self): |
| 413 assert self._astate is not None, 'StreamEngine is not open.' |
| 414 |
| 415 # Shut down any outstanding streams that may not have been closed for |
| 416 # whatever reason. |
| 417 for s in reversed(self._streams.values()): |
| 418 s.close() |
| 419 |
| 420 # Close out our root Step. Manually check annotation state afterwards. |
| 421 self._close_step(self._astate.base) |
| 422 self._notify_annotation_changed() |
| 423 |
| 424 # Shut down our annotation monitor and close our annotation stream. |
| 425 self._annotation_monitor.flush_and_join() |
| 426 self._annotation_stream.close() |
| 427 |
| 428 # Clear our client and state. We are now closed. |
| 429 self._streams.clear() |
| 430 self._client = None |
| 431 self._astate = None |
| 432 |
| 433 def _notify_annotation_changed(self): |
| 434 if self._astate is None: |
| 435 return |
| 436 |
| 437 step_data = self._astate.check() |
| 438 if step_data is not None: |
| 439 self._annotation_monitor.signal_update(step_data) |
| 440 |
| 441 def _set_timestamp(self, dst, dt=None): |
| 442 """Populates a timestamp_pb2.Timestamp, dst, with a datetime. |
| 443 |
| 444 Args: |
| 445 dst (timestamp_pb2.Timestamp): the timestamp protobuf that will be loaded |
| 446 with the time. |
| 447 dt (datetime.datetime or None): If not None, the datetime to load. If |
| 448 None, the current time (via now) will be used. |
| 449 """ |
| 450 dt = (dt) if dt else (self._env.now) |
| 451 |
| 452 # Convert to milliseconds from epoch. |
| 453 v = (dt - _EPOCH).total_seconds() |
| 454 |
| 455 dst.seconds = int(v) |
| 456 dst.nanos = int((v - dst.seconds) * 1000000000.0) # Remainder as nanos. |
| 457 |
| 458 def _close_step(self, step): |
| 459 """Closes a step, and any open substeps, propagating status. |
| 460 |
| 461 If all of the substeps are already closed, this will do nothing. However, if |
| 462 any are open, it will close them with an infra failure state. |
| 463 |
| 464 If any substeps failed, the failure will be propagated to step. |
| 465 |
| 466 Args: |
| 467 step (pb.Step): The Step message to close. |
| 468 """ |
| 469 # Close any open substeps, in case some of them didn't close. |
| 470 failed = [] |
| 471 incomplete = [] |
| 472 for sub in step.substep: |
| 473 if not sub.HasField('step'): |
| 474 # Not an embedded substep. |
| 475 continue |
| 476 |
| 477 # Did this step actually complete? It should have, by now, so if it didn't |
| 478 # we'll be reporting an infra failure in "step". |
| 479 if sub.step.status not in (pb.SUCCESS, pb.FAILURE): |
| 480 incomplete.append(sub.step) |
| 481 |
| 482 # Close this substep. This may be a no-op, if the substep is already |
| 483 # closed. |
| 484 self._close_step(sub.step) |
| 485 |
| 486 # If a substep failed, propagate its failure status to "step". |
| 487 if sub.step.status == pb.FAILURE: |
| 488 failed.append(sub.step) |
| 489 |
| 490 # If we had any incomplete steps, mark that we failed. |
| 491 if incomplete: |
| 492 step.status = pb.FAILURE |
| 493 if step.failure_details is None: |
| 494 step.failure_details = pb.FailureDetails( |
| 495 type=pb.FailureDetails.INFRA, |
| 496 text='Some substeps did not complete: %s' % ( |
| 497 ', '.join(s.name for s in incomplete)), |
| 498 ) |
| 499 elif failed: |
| 500 step.status = pb.FAILURE |
| 501 if step.failure_details is None: |
| 502 # This step didn't successfully close, so propagate an infra failure. |
| 503 step.failure_details = pb.FailureDetails( |
| 504 type=pb.FailureDetails.GENERAL, |
| 505 text='Some substeps failed: %s' % ( |
| 506 ', '.join(s.name for s in failed)), |
| 507 ) |
| 508 |
| 509 # Now close "step". If it's still RUNNING, assume that it was successful. |
| 510 if step.status == pb.RUNNING: |
| 511 step.status = pb.SUCCESS |
| 512 if not step.HasField('ended'): |
| 513 self._set_timestamp(step.ended) |
| 514 |
| 515 |
| 516 |
| 517 class _AnnotationMonitor(object): |
| 518 """The owner of the annotation datagram stream, sending annotation updates in |
| 519 a controlled manner and buffering them when the content hasn't changed. |
| 520 |
| 521 By default, since annotation state can change rapidly, minor annotation |
| 522 changes are throttled such that they are only actually sent periodically. |
| 523 |
| 524 New annotation state updates can be installed by calling `signal_update`. |
| 525 After being started, the _AnnotationMonitor thread must be shut down by |
| 526 calling its `flush_and_join` method. |
| 527 """ |
| 528 |
| 529 def __init__(self, env, fd, flush_period): |
| 530 self._env = env |
| 531 self._fd = fd |
| 532 self._flush_period = flush_period |
| 533 |
| 534 # The following group of variables is protected by "_lock". |
| 535 self._lock = threading.Lock() |
| 536 self._current_data = None |
| 537 self._flush_timer = None |
| 538 self._last_flush_time = None |
| 539 self._last_flush_data = None |
| 540 |
| 541 def signal_update(self, step_data, structural=False): |
| 542 """Updates the annotation state with new step data. |
| 543 |
| 544 This updates our state to include new step data. The annotation monitor |
| 545 will pick this up and dispatch it, either: |
| 546 - Eventually, when the flush period completes, or |
| 547 - Immediately, if this is a structural change. |
| 548 |
| 549 TODO(dnj): Re-examine the use case for "structural" based on actual usage |
| 550 and decide to remove / use it. |
| 551 |
| 552 Args: |
| 553 step_data (str): The updated binary annotation protobuf step data. |
| 554 structural (bool): If True, this is a structural update and should be |
| 555 pushed immediately. |
| 556 """ |
| 557 with self._lock: |
| 558 # Did our data actually change? |
| 559 if step_data == self._last_flush_data: |
| 560 # Nope, leave things as-is. |
| 561 return |
| 562 |
| 563 # This is new data. Is it structural? If so, flush immediately. |
| 564 # If not, make sure our timer is running so it will eventually be flushed. |
| 565 # Note that the timer may also suggest that we flush immediately if we're |
| 566 # already past our last flush interval. |
| 567 now = self._env.now |
| 568 self._current_data = step_data |
| 569 if structural or self._set_flush_timer_locked(now): |
| 570 # We should flush immediately. |
| 571 self._flush_now_locked(now) |
| 572 |
| 573 def flush_and_join(self): |
| 574 """Flushes any remaining updates and blocks until the monitor is complete. |
| 575 """ |
| 576 # Mark that we're finished and signal our event. |
| 577 with self._lock: |
| 578 self._flush_now_locked(self._env.now) |
| 579 |
| 580 @property |
| 581 def latest(self): |
| 582 with self._lock: |
| 583 return self._last_flush_data |
| 584 |
| 585 def _flush_now_locked(self, now): |
| 586 # Clear any current flush timer. |
| 587 self._clear_flush_timer_locked() |
| 588 |
| 589 # Record this flush. |
| 590 # |
| 591 # We set the last flush time to now because even if we don't actually send |
| 592 # data, we have responded to the flush request. |
| 593 flush_data, self._current_data = self._current_data, None |
| 594 self._last_flush_time = now |
| 595 |
| 596 # If the data hasn't changed since the last flush, then don't actually |
| 597 # do anything. |
| 598 if flush_data is None or flush_data == self._last_flush_data: |
| 599 return |
| 600 |
| 601 self._last_flush_data = flush_data |
| 602 self._fd.send(flush_data) |
| 603 |
| 604 def _clear_flush_timer_locked(self): |
| 605 if self._flush_timer is not None: |
| 606 self._flush_timer.cancel() |
| 607 self._flush_timer = None |
| 608 |
| 609 def _set_flush_timer_locked(self, now): |
| 610 if self._flush_timer is not None: |
| 611 # Our flush timer is already running. |
| 612 return False |
| 613 |
| 614 if self._last_flush_time is None: |
| 615 # We have never flushed before, so flush immediately. |
| 616 return True |
| 617 |
| 618 deadline = self._last_flush_time + self._flush_period |
| 619 if deadline <= now: |
| 620 # We're past our flush deadline, and should flush immediately. |
| 621 return True |
| 622 |
| 623 # Start our flush timer. |
| 624 self._flush_timer = threading.Timer((deadline - now).total_seconds(), |
| 625 self._flush_timer_expired) |
| 626 self._flush_timer.daemon = True |
| 627 self._flush_timer.start() |
| 628 |
| 629 def _flush_timer_expired(self): |
| 630 with self._lock: |
| 631 self._flush_now_locked(self._env.now) |
| 632 |
| 633 |
| 634 class _AnnotationState(object): |
| 635 """Manages an outer Milo annotation protobuf Step.""" |
| 636 |
| 637 Step = collections.namedtuple('Step', ( |
| 638 'msg', 'stream_name_base', 'substream_name_index')) |
| 639 |
| 640 def __init__(self, base_step, stream_name_base): |
| 641 self._base = self.Step( |
| 642 msg=base_step, |
| 643 stream_name_base=stream_name_base, |
| 644 substream_name_index={}) |
| 645 self._check_snapshot = None |
| 646 |
| 647 # The current step stack. This is built by updating state after new steps' |
| 648 # nesting levels. |
| 649 self._nest_stack = [self._base] |
| 650 |
| 651 # Index initial properties. |
| 652 self._properties = {p.name: p for p in self._base.msg.property} |
| 653 |
| 654 @classmethod |
| 655 def create(cls, stream_name_base, environment=None, properties=None): |
| 656 base = pb.Step() |
| 657 base.name = 'steps' |
| 658 base.status = pb.PENDING |
| 659 if environment: |
| 660 if environment.argv: |
| 661 base.command.command_line.extend(environment.argv) |
| 662 if environment.cwd: |
| 663 base.command.cwd = environment.cwd |
| 664 if environment.environ: |
| 665 base.command.environ.update(environment.environ) |
| 666 if properties: |
| 667 for k, v in sorted(properties.iteritems()): |
| 668 base.property.add(name=k, value=v) |
| 669 return cls(base, stream_name_base) |
| 670 |
| 671 @property |
| 672 def base(self): |
| 673 return self._base.msg |
| 674 |
| 675 def check(self): |
| 676 """Checks if the annotation state has been updated and, if so, returns it. |
| 677 |
| 678 After check returns, the latest annotation state will be used as the current |
| 679 snapshot for future checks. |
| 680 |
| 681 Returns (str/None): A serialized binary Step protobuf if modified, None |
| 682 otherwise. |
| 683 """ |
| 684 if self._check_snapshot is None or self._check_snapshot != self.base: |
| 685 self._check_snapshot = copy.deepcopy(self.base) |
| 686 return self._check_snapshot.SerializeToString() |
| 687 return None |
| 688 |
| 689 def create_step(self, step_config): |
| 690 # Identify our parent Step by examining the nesting level. The first step |
| 691 # in the nest stack will always be the base (nesting level "-1", since it's |
| 692 # the parent of level 0). Since the step's "nest_level" is one more than the |
| 693 # parent, and we need to offset by 1 to reach the stack index, they cancel |
| 694 # each other out, so the nest level is the same as the parent's stack index. |
| 695 assert step_config.nest_level < len(self._nest_stack), ( |
| 696 'Invalid nest level %d (highest is %d)' % ( |
| 697 step_config.nest_level, len(self._nest_stack)-1)) |
| 698 |
| 699 # Clear any items in the nest stack that are deeper than the current |
| 700 # element. |
| 701 del(self._nest_stack[step_config.nest_level+1:]) |
| 702 parent = self._nest_stack[-1] |
| 703 |
| 704 # Create a stream name for this step. Even though step names are unique, |
| 705 # the normalized LogDog step name may overlap with a different step name. |
| 706 # We keep track of the step names we've issued to this step space and |
| 707 # add indexes if a conflict is identified. |
| 708 stream_name_base = parent.stream_name_base.append('steps', |
| 709 step_config.base_name) |
| 710 index = parent.substream_name_index.setdefault(str(stream_name_base), 0) |
| 711 parent.substream_name_index[str(stream_name_base)] += 1 |
| 712 if index > 0: |
| 713 stream_name_base += '_%d' % (index,) |
| 714 |
| 715 # Create and populate our new step. |
| 716 msg = parent.msg.substep.add().step |
| 717 msg.name = step_config.base_name |
| 718 msg.status = pb.PENDING |
| 719 if step_config.cmd: |
| 720 msg.command.command_line.extend(step_config.cmd) |
| 721 if step_config.cwd: |
| 722 msg.command.cwd = step_config.cwd |
| 723 if step_config.env: |
| 724 msg.command.environ = step_config.env |
| 725 |
| 726 step = self.Step( |
| 727 msg=msg, |
| 728 stream_name_base=stream_name_base, |
| 729 substream_name_index={}) |
| 730 self._nest_stack.append(step) |
| 731 return step |
| 732 |
| 733 def update_properties(self, **kwargs): |
| 734 """Updates a Step's property values to incorporate kwargs.""" |
| 735 for k, v in sorted(kwargs.iteritems()): |
| 736 cur = self._properties.get(k) |
| 737 if cur is None: |
| 738 cur = self.base.property.add(name=k, value=str(v)) |
| 739 self._properties[k] = cur |
| 740 continue |
| 741 |
| 742 # A Property message already exists for this key, so update its value. |
| 743 if cur.value != v: |
| 744 cur.value = str(v) |
| 745 |
| 746 |
| 747 class _StreamName(object): |
| 748 """An immutable validated wrapper for a LogDog stream name.""" |
| 749 |
| 750 def __init__(self, base): |
| 751 if base is not None: |
| 752 libs.logdog.streamname.validate_stream_name(base) |
| 753 self._base = base |
| 754 |
| 755 def append(self, *components): |
| 756 """Returns (_StreamName): A new _StreamName instance with components added. |
| 757 |
| 758 Each component in "components" will become a new normalized stream name |
| 759 component. Conseqeuntly, any separators (/) in the components will be |
| 760 replaced with underscores. |
| 761 |
| 762 Args: |
| 763 components: the path components to append to this _StreamName. |
| 764 """ |
| 765 if len(components) == 0: |
| 766 return self |
| 767 |
| 768 components = [self._normalize(self._flatten(p)) |
| 769 for p in reversed(components)] |
| 770 if self._base: |
| 771 components.append(self._base) |
| 772 return type(self)('/'.join(reversed(components))) |
| 773 |
| 774 def augment(self, val): |
| 775 """Returns (_StreamName): A new _StreamName with "val" appended. |
| 776 |
| 777 This generates a new, normalized _StreamName with the contents of "val" |
| 778 appended to the end. For example: |
| 779 |
| 780 Original: "foo/bar" |
| 781 Append "baz qux": "foo/barbaz_qux" |
| 782 """ |
| 783 if not val: |
| 784 return self |
| 785 val = self._flatten(val) |
| 786 if self._base: |
| 787 val = self._base + val |
| 788 return type(self)(self._normalize(val)) |
| 789 |
| 790 def __iadd__(self, val): |
| 791 return self.augment(val) |
| 792 |
| 793 @staticmethod |
| 794 def _flatten(v): |
| 795 return v.replace('/', '_') |
| 796 |
| 797 @staticmethod |
| 798 def _normalize(v): |
| 799 return libs.logdog.streamname.normalize(v, prefix='s_') |
| 800 |
| 801 def __str__(self): |
| 802 if not self._base: |
| 803 raise ValueError('Cannot generate string from empty StreamName.') |
| 804 return self._base |
OLD | NEW |