OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. |
| 5 |
| 6 import collections |
| 7 import contextlib |
| 8 import datetime |
| 9 import json |
| 10 import threading |
| 11 import time |
| 12 import unittest |
| 13 import StringIO |
| 14 |
| 15 import test_env |
| 16 |
| 17 import libs.logdog.stream |
| 18 import libs.logdog.varint |
| 19 from google.protobuf import json_format as jsonpb |
| 20 from recipe_engine import recipe_api |
| 21 from recipe_engine import stream |
| 22 from recipe_engine import stream_logdog |
| 23 |
| 24 |
| 25 import annotations_pb2 as pb |
| 26 |
| 27 |
| 28 def _translate_annotation_datagram(dg): |
| 29 """Translate annotation datagram binary data into a Python dict modeled after |
| 30 the JSONPB projection of the datagram. |
| 31 |
| 32 This is chosen because it allows for easy idiomatic equality assertions in |
| 33 test cases. |
| 34 |
| 35 Args: |
| 36 dg (str): The serialized annotation pb.Step datagram. |
| 37 """ |
| 38 msg = pb.Step() |
| 39 msg.ParseFromString(dg) |
| 40 return json.loads(jsonpb.MessageToJson(msg)) |
| 41 |
| 42 |
| 43 class _TestStreamClient(libs.logdog.stream.StreamClient): |
| 44 """A testing StreamClient that retains all data written to it.""" |
| 45 |
| 46 class Stream(object): |
| 47 """A file-like object that is explicitly aware of LogDog stream protocol.""" |
| 48 |
| 49 def __init__(self, stream_client): |
| 50 self._client = stream_client |
| 51 self._buf = StringIO.StringIO() |
| 52 self._header = None |
| 53 self._final_data = None |
| 54 self._data_offset = None |
| 55 |
| 56 def write(self, data): |
| 57 self._buf.write(data) |
| 58 self._attempt_registration() |
| 59 |
| 60 def close(self): |
| 61 # If we never parsed our header, register that we are incomplete. |
| 62 if self._header is None: |
| 63 self._client._register_incomplete(self) |
| 64 |
| 65 self._final_data = self.data |
| 66 self._buf.close() |
| 67 |
| 68 @contextlib.contextmanager |
| 69 def _read_from(self, offset): |
| 70 # Seek to the specified offset. |
| 71 self._buf.seek(offset, mode=0) |
| 72 try: |
| 73 yield self._buf |
| 74 finally: |
| 75 # Seek back to he end of the stream so future writes will append. |
| 76 self._buf.seek(0, mode=2) |
| 77 |
| 78 def _attempt_registration(self): |
| 79 # Only need to register once. |
| 80 if self._header is not None: |
| 81 return |
| 82 |
| 83 # Can we parse a full LogDog stream header? |
| 84 # |
| 85 # This means pulling: |
| 86 # - The LogDog Butler header. |
| 87 # - The header size varint. |
| 88 # - The header JSON blob, which needs to be decoded. |
| 89 with self._read_from(0) as fd: |
| 90 # Read 'result' bytes. |
| 91 magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC)) |
| 92 if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC): |
| 93 # Incomplete magic number, cannot complete registration. |
| 94 return |
| 95 count = len(magic_data) |
| 96 |
| 97 try: |
| 98 size, varint_count = libs.logdog.varint.read_uvarint(fd) |
| 99 except ValueError: |
| 100 # Incomplete varint, cannot complete registration. |
| 101 return |
| 102 count += varint_count |
| 103 |
| 104 header_data = fd.read(size) |
| 105 if len(header_data) != size: |
| 106 # Incomplete header, cannot complete registration. |
| 107 return |
| 108 count += size |
| 109 |
| 110 # Parse the header as JSON. |
| 111 self._header = json.loads(header_data) |
| 112 self._data_offset = count # (varint + header size) |
| 113 self._client._register_stream(self, self._header) |
| 114 |
| 115 @property |
| 116 def data(self): |
| 117 # If we have already cached our data (on close), return it directly. |
| 118 if self._final_data is not None: |
| 119 return self._final_data |
| 120 |
| 121 # Load our data from our live buffer. |
| 122 if self._data_offset is None: |
| 123 # No header has been read, so there is no data. |
| 124 return None |
| 125 with self._read_from(self._data_offset) as fd: |
| 126 return fd.read() |
| 127 |
| 128 |
| 129 _StreamEntry = collections.namedtuple('_StreamEntry', ( |
| 130 's', 'type', 'content_type')) |
| 131 |
| 132 _DATAGRAM_CONTENT_TRANSLATE = { |
| 133 stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram, |
| 134 } |
| 135 |
| 136 |
| 137 def __init__(self): |
| 138 super(_TestStreamClient, self).__init__() |
| 139 self.streams = {} |
| 140 self.incomplete = [] |
| 141 self.unregistered = {} |
| 142 |
| 143 @classmethod |
| 144 def _create(cls, value): |
| 145 raise NotImplementedError('Instances must be created manually.') |
| 146 |
| 147 def _connect_raw(self): |
| 148 s = self.Stream(self) |
| 149 self.unregistered[id(s)] = s |
| 150 return s |
| 151 |
| 152 def get(self, name): |
| 153 se = self.streams[name] |
| 154 data = se.s.data |
| 155 |
| 156 if se.type == libs.logdog.stream.StreamParams.TEXT: |
| 157 # Return text stream data as a list of lines. We use unicode because it |
| 158 # fits in with the JSON dump from 'all_streams'. |
| 159 return [unicode(l) for l in data.splitlines()] |
| 160 elif se.type == libs.logdog.stream.StreamParams.BINARY: |
| 161 raise NotImplementedError('No support for fetching binary stream data.') |
| 162 elif se.type == libs.logdog.stream.StreamParams.DATAGRAM: |
| 163 # Return datagram stream data as a list of datagrams. |
| 164 sio = StringIO.StringIO(data) |
| 165 datagrams = [] |
| 166 while sio.tell() < sio.len: |
| 167 size, _ = libs.logdog.varint.read_uvarint(sio) |
| 168 dg = sio.read(size) |
| 169 if len(dg) != size: |
| 170 raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size)) |
| 171 |
| 172 # If this datagram is a known type (e.g., protobuf), transform it into |
| 173 # JSONPB. |
| 174 translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type) |
| 175 if translator is not None: |
| 176 dg = translator(dg) |
| 177 datagrams.append(dg) |
| 178 |
| 179 sio.close() |
| 180 return dg |
| 181 else: |
| 182 raise ValueError('Unknown stream type [%s]' % (se.type,)) |
| 183 |
| 184 def all_streams(self): |
| 185 return dict((name, self.get(name)) for name in self.streams.iterkeys()) |
| 186 |
| 187 @property |
| 188 def stream_names(self): |
| 189 return set(self.streams.iterkeys()) |
| 190 |
| 191 def _remove_from_unregistered(self, s): |
| 192 if id(s) not in self.unregistered: |
| 193 raise KeyError('Stream is not known to be unregistered.') |
| 194 del(self.unregistered[id(s)]) |
| 195 |
| 196 def _register_stream(self, s, header): |
| 197 name = header.get('name') |
| 198 if name in self.streams: |
| 199 raise KeyError('Duplicate stream [%s]' % (name,)) |
| 200 |
| 201 self._remove_from_unregistered(s) |
| 202 self.streams[name] = self._StreamEntry( |
| 203 s=s, |
| 204 type=header['type'], |
| 205 content_type=header.get('contentType'), |
| 206 ) |
| 207 |
| 208 def _register_incomplete(self, s): |
| 209 self._remove_from_unregistered(s) |
| 210 self.incomplete.append(s) |
| 211 |
| 212 |
| 213 class EnvironmentTest(unittest.TestCase): |
| 214 """Simple test to assert that _Environment, which is stubbed during our tests, |
| 215 actually works.""" |
| 216 |
| 217 def testRealEnvironment(self): |
| 218 stream_logdog._Environment.real() |
| 219 |
| 220 |
| 221 class StreamEngineTest(unittest.TestCase): |
| 222 |
| 223 def setUp(self): |
| 224 self.client = _TestStreamClient() |
| 225 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3) |
| 226 self.env = stream_logdog._Environment( |
| 227 now_fn=lambda: self.now, |
| 228 argv=[], |
| 229 environ={}, |
| 230 cwd=None, |
| 231 ) |
| 232 self.maxDiff = 1024*1024 |
| 233 |
| 234 |
| 235 @contextlib.contextmanager |
| 236 def _new_stream_engine(self, **kwargs): |
| 237 kwargs.setdefault('client', self.client) |
| 238 kwargs.setdefault('environment', self.env) |
| 239 |
| 240 # Initialize and open a StreamEngine. |
| 241 se = stream_logdog.StreamEngine(**kwargs) |
| 242 se.open() |
| 243 yield se |
| 244 |
| 245 # Close the StreamEngine after we're done with it. |
| 246 self._advance_time() |
| 247 se.close() |
| 248 |
| 249 @contextlib.contextmanager |
| 250 def _step_stream(self, se, **kwargs): |
| 251 # Initialize and yield a new step stream. |
| 252 self._advance_time() |
| 253 step_stream = se.new_step_stream(recipe_api.StepConfig.create(**kwargs)) |
| 254 yield step_stream |
| 255 |
| 256 # Close the step stream when we're done with it. |
| 257 self._advance_time() |
| 258 step_stream.close() |
| 259 |
| 260 @contextlib.contextmanager |
| 261 def _log_stream(self, step_stream, name): |
| 262 # Initialize and yield a new log stream. |
| 263 log_stream = step_stream.new_log_stream(name) |
| 264 yield log_stream |
| 265 |
| 266 # Close the log stream when we're done with it. |
| 267 log_stream.close() |
| 268 |
| 269 def _advance_time(self): |
| 270 self.now += datetime.timedelta(seconds=1) |
| 271 |
| 272 def testEmptyStreamEngine(self): |
| 273 self.env.argv = ['fake_program', 'arg0', 'arg1'] |
| 274 self.env.environ['foo'] = 'bar' |
| 275 self.env.cwd = 'CWD' |
| 276 |
| 277 with self._new_stream_engine() as se: |
| 278 pass |
| 279 |
| 280 self.assertEqual(self.client.all_streams(), { |
| 281 u'annotations': { |
| 282 u'name': u'steps', |
| 283 u'status': u'SUCCESS', |
| 284 u'started': u'2106-06-12T01:02:03Z', |
| 285 u'ended': u'2106-06-12T01:02:04Z', |
| 286 u'command': { |
| 287 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 288 u'cwd': u'CWD', |
| 289 u'environ': {u'foo': u'bar'}, |
| 290 }, |
| 291 }, |
| 292 }) |
| 293 |
| 294 def testIncrementalUpdates(self): |
| 295 self.env.argv = ['fake_program', 'arg0', 'arg1'] |
| 296 self.env.environ['foo'] = 'bar' |
| 297 self.env.cwd = 'CWD' |
| 298 |
| 299 # Create a StreamEngine with an update interval that will trigger each time |
| 300 # _advance_time is called. |
| 301 with self._new_stream_engine( |
| 302 update_interval=datetime.timedelta(seconds=1)) as se: |
| 303 # Initial stream state (no steps). |
| 304 self.assertEqual(self.client.all_streams(), { |
| 305 u'annotations': { |
| 306 u'name': u'steps', |
| 307 u'started': u'2106-06-12T01:02:03Z', |
| 308 u'command': { |
| 309 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 310 u'cwd': u'CWD', |
| 311 u'environ': {u'foo': u'bar'}, |
| 312 }, |
| 313 }, |
| 314 }) |
| 315 |
| 316 with self._step_stream(se, name='foo') as st: |
| 317 pass |
| 318 |
| 319 # Stream state (foo). |
| 320 self.assertEqual(self.client.all_streams(), { |
| 321 u'annotations': { |
| 322 u'name': u'steps', |
| 323 u'started': u'2106-06-12T01:02:03Z', |
| 324 u'command': { |
| 325 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 326 u'cwd': u'CWD', |
| 327 u'environ': {u'foo': u'bar'}, |
| 328 }, |
| 329 |
| 330 u'substep': [ |
| 331 |
| 332 {u'step': { |
| 333 u'name': u'foo', |
| 334 u'status': u'SUCCESS', |
| 335 u'started': u'2106-06-12T01:02:04Z', |
| 336 u'ended': u'2106-06-12T01:02:05Z', |
| 337 }}, |
| 338 ], |
| 339 }, |
| 340 }) |
| 341 |
| 342 with self._step_stream(se, name='bar') as st: |
| 343 pass |
| 344 |
| 345 # Stream state (bar). |
| 346 self.assertEqual(self.client.all_streams(), { |
| 347 u'annotations': { |
| 348 u'name': u'steps', |
| 349 u'started': u'2106-06-12T01:02:03Z', |
| 350 u'command': { |
| 351 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 352 u'cwd': u'CWD', |
| 353 u'environ': {u'foo': u'bar'}, |
| 354 }, |
| 355 |
| 356 u'substep': [ |
| 357 |
| 358 {u'step': { |
| 359 u'name': u'foo', |
| 360 u'status': u'SUCCESS', |
| 361 u'started': u'2106-06-12T01:02:04Z', |
| 362 u'ended': u'2106-06-12T01:02:05Z', |
| 363 }}, |
| 364 |
| 365 {u'step': { |
| 366 u'name': u'bar', |
| 367 u'status': u'SUCCESS', |
| 368 u'started': u'2106-06-12T01:02:06Z', |
| 369 u'ended': u'2106-06-12T01:02:07Z', |
| 370 }}, |
| 371 ], |
| 372 }, |
| 373 }) |
| 374 |
| 375 # Final stream state. |
| 376 self.assertEqual(self.client.all_streams(), { |
| 377 u'annotations': { |
| 378 u'name': u'steps', |
| 379 u'status': u'SUCCESS', |
| 380 u'started': u'2106-06-12T01:02:03Z', |
| 381 u'ended': u'2106-06-12T01:02:08Z', |
| 382 u'command': { |
| 383 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 384 u'cwd': u'CWD', |
| 385 u'environ': {u'foo': u'bar'}, |
| 386 }, |
| 387 |
| 388 u'substep': [ |
| 389 |
| 390 {u'step': { |
| 391 u'name': u'foo', |
| 392 u'status': u'SUCCESS', |
| 393 u'started': u'2106-06-12T01:02:04Z', |
| 394 u'ended': u'2106-06-12T01:02:05Z', |
| 395 }}, |
| 396 |
| 397 {u'step': { |
| 398 u'name': u'bar', |
| 399 u'status': u'SUCCESS', |
| 400 u'started': u'2106-06-12T01:02:06Z', |
| 401 u'ended': u'2106-06-12T01:02:07Z', |
| 402 }}, |
| 403 ], |
| 404 }, |
| 405 }) |
| 406 |
| 407 def testBasicStream(self): |
| 408 self.env.argv = ['fake_program', 'arg0', 'arg1'] |
| 409 self.env.environ['foo'] = 'bar' |
| 410 self.env.cwd = 'CWD' |
| 411 |
| 412 with self._new_stream_engine(name_base='test/base') as se: |
| 413 with self._step_stream(se, |
| 414 name='first step', |
| 415 cmd=['first', 'step'], |
| 416 cwd='FIRST_CWD') as step: |
| 417 step.add_step_text('Sup') |
| 418 step.add_step_text('Dawg?') |
| 419 step.write_line('STDOUT for first step.') |
| 420 step.write_line('(Another line)') |
| 421 step.add_step_summary_text('Everything is great.') |
| 422 step.add_step_link('example 1', 'http://example.com/1') |
| 423 step.add_step_link('example 2', 'http://example.com/2') |
| 424 step.set_step_status('SUCCESS') |
| 425 |
| 426 with self._step_stream(se, name='second step') as step: |
| 427 step.set_step_status('SUCCESS') |
| 428 step.write_split('multiple\nlines\nof\ntext') |
| 429 |
| 430 # Create two log streams with the same name to test indexing. |
| 431 # |
| 432 # Note that "log stream" is an invalid LogDog stream name, so this |
| 433 # will also test normalization. |
| 434 with self._log_stream(step, 'log stream') as ls: |
| 435 ls.write_split('foo\nbar\nbaz\n') |
| 436 with self._log_stream(step, 'log stream') as ls: |
| 437 ls.write_split('qux\nquux\n') |
| 438 |
| 439 # This is a different stream name, but will normalize to the same log |
| 440 # stream name as 'second/step', so this will test that we disambiguate |
| 441 # the log stream names. |
| 442 with self._step_stream(se, name='second/step') as step: |
| 443 pass |
| 444 |
| 445 self.assertEqual(self.client.all_streams(), { |
| 446 u'test/base/annotations': { |
| 447 u'name': u'steps', |
| 448 u'status': u'SUCCESS', |
| 449 u'started': u'2106-06-12T01:02:03Z', |
| 450 u'ended': u'2106-06-12T01:02:10Z', |
| 451 u'command': { |
| 452 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 453 u'cwd': u'CWD', |
| 454 u'environ': {u'foo': u'bar'}, |
| 455 }, |
| 456 u'substep': [ |
| 457 |
| 458 {u'step': { |
| 459 u'name': u'first step', |
| 460 u'status': u'SUCCESS', |
| 461 u'started': u'2106-06-12T01:02:04Z', |
| 462 u'ended': u'2106-06-12T01:02:05Z', |
| 463 u'command': { |
| 464 u'commandLine': [u'first', u'step'], |
| 465 u'cwd': u'FIRST_CWD', |
| 466 }, |
| 467 u'stdoutStream': { |
| 468 u'name': u'test/base/steps/first_step/stdout', |
| 469 }, |
| 470 u'text': [u'Everything is great.', u'Sup', u'Dawg?'], |
| 471 u'otherLinks': [ |
| 472 { |
| 473 u'label': u'example 1', |
| 474 u'url': u'http://example.com/1', |
| 475 }, |
| 476 { |
| 477 u'label': u'example 2', |
| 478 u'url': u'http://example.com/2', |
| 479 }, |
| 480 ], |
| 481 }}, |
| 482 |
| 483 {u'step': { |
| 484 u'name': u'second step', |
| 485 u'status': u'SUCCESS', |
| 486 u'started': u'2106-06-12T01:02:06Z', |
| 487 u'ended': u'2106-06-12T01:02:07Z', |
| 488 u'stdoutStream': { |
| 489 u'name': u'test/base/steps/second_step/stdout', |
| 490 }, |
| 491 u'otherLinks': [ |
| 492 { |
| 493 u'label': u'log stream', |
| 494 u'logdogStream': { |
| 495 u'name': u'test/base/steps/second_step/logs/log_stream/0', |
| 496 }, |
| 497 }, |
| 498 { |
| 499 u'label': u'log stream', |
| 500 u'logdogStream': { |
| 501 u'name': u'test/base/steps/second_step/logs/log_stream/1', |
| 502 }, |
| 503 }, |
| 504 ], |
| 505 }}, |
| 506 |
| 507 {u'step': { |
| 508 u'name': u'second/step', |
| 509 u'status': u'SUCCESS', |
| 510 u'started': u'2106-06-12T01:02:08Z', |
| 511 u'ended': u'2106-06-12T01:02:09Z', |
| 512 }}, |
| 513 ], |
| 514 }, |
| 515 |
| 516 u'test/base/steps/first_step/stdout': [ |
| 517 u'STDOUT for first step.', |
| 518 u'(Another line)', |
| 519 ], |
| 520 |
| 521 u'test/base/steps/second_step/stdout': [ |
| 522 u'multiple', |
| 523 u'lines', |
| 524 u'of', |
| 525 u'text', |
| 526 ], |
| 527 |
| 528 u'test/base/steps/second_step/logs/log_stream/0': [ |
| 529 u'foo', |
| 530 u'bar', |
| 531 u'baz', |
| 532 ], |
| 533 |
| 534 u'test/base/steps/second_step/logs/log_stream/1': [ |
| 535 u'qux', |
| 536 u'quux', |
| 537 ], |
| 538 }) |
| 539 |
| 540 def testWarningBasicStream(self): |
| 541 with self._new_stream_engine() as se: |
| 542 with self._step_stream(se, name='isuck') as step: |
| 543 step.add_step_summary_text('Something went wrong.') |
| 544 step.set_step_status('WARNING') |
| 545 |
| 546 self.assertEqual(self.client.all_streams(), { |
| 547 u'annotations': { |
| 548 u'name': u'steps', |
| 549 u'status': u'SUCCESS', |
| 550 u'started': u'2106-06-12T01:02:03Z', |
| 551 u'ended': u'2106-06-12T01:02:06Z', |
| 552 u'substep': [ |
| 553 |
| 554 {u'step': { |
| 555 u'name': u'isuck', |
| 556 u'status': u'SUCCESS', |
| 557 u'failureDetails': { |
| 558 u'text': u'Something went wrong.', |
| 559 }, |
| 560 u'started': u'2106-06-12T01:02:04Z', |
| 561 u'ended': u'2106-06-12T01:02:05Z', |
| 562 u'text': [u'Something went wrong.'], |
| 563 }}, |
| 564 ], |
| 565 }, |
| 566 }) |
| 567 |
| 568 def testFailedBasicStream(self): |
| 569 with self._new_stream_engine() as se: |
| 570 with self._step_stream(se, name='isuck') as step: |
| 571 step.add_step_summary_text('Oops I failed.') |
| 572 step.set_step_status('FAILURE') |
| 573 |
| 574 with self._step_stream(se, name='irock') as step: |
| 575 pass |
| 576 |
| 577 self.assertEqual(self.client.all_streams(), { |
| 578 u'annotations': { |
| 579 u'name': u'steps', |
| 580 u'status': u'FAILURE', |
| 581 u'started': u'2106-06-12T01:02:03Z', |
| 582 u'ended': u'2106-06-12T01:02:08Z', |
| 583 u'substep': [ |
| 584 |
| 585 {u'step': { |
| 586 u'name': u'isuck', |
| 587 u'status': u'FAILURE', |
| 588 u'failureDetails': { |
| 589 u'text': u'Oops I failed.', |
| 590 }, |
| 591 u'started': u'2106-06-12T01:02:04Z', |
| 592 u'ended': u'2106-06-12T01:02:05Z', |
| 593 u'text': [u'Oops I failed.'], |
| 594 }}, |
| 595 |
| 596 {u'step': { |
| 597 u'name': u'irock', |
| 598 u'status': u'SUCCESS', |
| 599 u'started': u'2106-06-12T01:02:06Z', |
| 600 u'ended': u'2106-06-12T01:02:07Z', |
| 601 }}, |
| 602 ], |
| 603 }, |
| 604 }) |
| 605 |
| 606 def testNestedStream(self): |
| 607 with self._new_stream_engine() as se: |
| 608 # parent |
| 609 with self._step_stream(se, name='parent') as step: |
| 610 step.write_line('I am the parent.') |
| 611 |
| 612 # parent."child 1" |
| 613 with self._step_stream(se, |
| 614 name='child 1', |
| 615 step_nest_level=1) as step: |
| 616 step.write_line('I am child #1.') |
| 617 |
| 618 # parent."child 1"."grandchild" |
| 619 with self._step_stream(se, |
| 620 name='grandchild', |
| 621 step_nest_level=2) as step: |
| 622 step.write_line("I am child #1's child.") |
| 623 |
| 624 # parent."child 2". Mark this child as failed. This should not propagate |
| 625 # to the parent, since it has an explicit status. |
| 626 with self._step_stream(se, |
| 627 name='child 2', |
| 628 step_nest_level=1) as step: |
| 629 step.write_line('I am child #2.') |
| 630 |
| 631 # parent."child 2". Mark this child as failed. This should not propagate |
| 632 # to the parent, since it has an explicit status. |
| 633 with self._step_stream(se, name='friend') as step: |
| 634 step.write_line("I am the parent's friend.") |
| 635 |
| 636 self.assertEqual(self.client.all_streams(), { |
| 637 u'annotations': { |
| 638 u'name': u'steps', |
| 639 u'status': u'SUCCESS', |
| 640 u'started': u'2106-06-12T01:02:03Z', |
| 641 u'ended': u'2106-06-12T01:02:14Z', |
| 642 u'substep': [ |
| 643 |
| 644 {u'step': { |
| 645 u'name': u'parent', |
| 646 u'status': u'SUCCESS', |
| 647 u'started': u'2106-06-12T01:02:04Z', |
| 648 u'ended': u'2106-06-12T01:02:05Z', |
| 649 u'stdoutStream': { |
| 650 u'name': u'steps/parent/stdout', |
| 651 }, |
| 652 u'substep': [ |
| 653 |
| 654 {u'step': { |
| 655 u'name': u'child 1', |
| 656 u'status': u'SUCCESS', |
| 657 u'started': u'2106-06-12T01:02:06Z', |
| 658 u'ended': u'2106-06-12T01:02:07Z', |
| 659 u'stdoutStream': { |
| 660 u'name': u'steps/parent/steps/child_1/stdout', |
| 661 }, |
| 662 u'substep': [ |
| 663 |
| 664 {u'step': { |
| 665 u'name': u'grandchild', |
| 666 u'status': u'SUCCESS', |
| 667 u'started': u'2106-06-12T01:02:08Z', |
| 668 u'ended': u'2106-06-12T01:02:09Z', |
| 669 u'stdoutStream': { |
| 670 u'name': u'steps/parent/steps/child_1/' |
| 671 'steps/grandchild/stdout', |
| 672 }, |
| 673 }}, |
| 674 ], |
| 675 }}, |
| 676 |
| 677 {u'step': { |
| 678 u'name': u'child 2', |
| 679 u'status': u'SUCCESS', |
| 680 u'started': u'2106-06-12T01:02:10Z', |
| 681 u'ended': u'2106-06-12T01:02:11Z', |
| 682 u'stdoutStream': { |
| 683 u'name': u'steps/parent/steps/child_2/stdout', |
| 684 }, |
| 685 }}, |
| 686 ], |
| 687 }}, |
| 688 |
| 689 {u'step': { |
| 690 u'name': u'friend', |
| 691 u'status': u'SUCCESS', |
| 692 u'started': u'2106-06-12T01:02:12Z', |
| 693 u'ended': u'2106-06-12T01:02:13Z', |
| 694 u'stdoutStream': { |
| 695 u'name': u'steps/friend/stdout', |
| 696 }, |
| 697 }}, |
| 698 ], |
| 699 }, |
| 700 |
| 701 u'steps/parent/stdout': [u'I am the parent.'], |
| 702 u'steps/parent/steps/child_1/stdout': [u'I am child #1.'], |
| 703 u'steps/parent/steps/child_1/steps/grandchild/stdout': [ |
| 704 u"I am child #1's child."], |
| 705 u'steps/parent/steps/child_2/stdout': [u'I am child #2.'], |
| 706 u'steps/friend/stdout': [u"I am the parent's friend."], |
| 707 }) |
| 708 |
| 709 def testTriggersRaiseException(self): |
| 710 with self._new_stream_engine() as se: |
| 711 with self._step_stream(se, name='trigger') as step: |
| 712 with self.assertRaises(NotImplementedError): |
| 713 step.trigger('trigger spec') |
| 714 |
| 715 def testTriggersIgnored(self): |
| 716 with self._new_stream_engine(ignore_triggers=True) as se: |
| 717 with self._step_stream(se, name='trigger') as step: |
| 718 step.trigger('trigger spec') |
| 719 |
| 720 def testNoSubannotations(self): |
| 721 with self._new_stream_engine(ignore_triggers=True) as se: |
| 722 with self.assertRaises(NotImplementedError): |
| 723 se.new_step_stream(recipe_api.StepConfig.create( |
| 724 name='uses subannotations', |
| 725 allow_subannotations=True, |
| 726 )) |
| 727 |
| 728 def testInvalidStepStatusRaisesValueError(self): |
| 729 with self._new_stream_engine() as se: |
| 730 with self._step_stream(se, name='trigger') as step: |
| 731 with self.assertRaises(ValueError): |
| 732 step.set_step_status('OHAI') |
| 733 |
| 734 |
| 735 class AnnotationMonitorTest(unittest.TestCase): |
| 736 """Tests the stream_logdog._AnnotationMonitor directly.""" |
| 737 |
| 738 # A small timedelta, sufficient to block but fast enough to not make the |
| 739 # test slow. |
| 740 _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5) |
| 741 |
| 742 class _DatagramBuffer(object): |
| 743 |
| 744 def __init__(self): |
| 745 self.datagrams = [] |
| 746 self.data_event = threading.Event() |
| 747 |
| 748 def send(self, dg): |
| 749 self.datagrams.append(dg) |
| 750 self.data_event.set() |
| 751 |
| 752 def __len__(self): |
| 753 return len(self.datagrams) |
| 754 |
| 755 @property |
| 756 def latest(self): |
| 757 if self.datagrams: |
| 758 return self.datagrams[-1] |
| 759 return None |
| 760 |
| 761 def wait_for_data(self): |
| 762 self.data_event.wait() |
| 763 self.data_event.clear() |
| 764 return self.latest |
| 765 |
| 766 |
| 767 def setUp(self): |
| 768 self.db = self._DatagramBuffer() |
| 769 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3) |
| 770 self.env = stream_logdog._Environment( |
| 771 now_fn=lambda: self.now, |
| 772 argv=[], |
| 773 environ={}, |
| 774 cwd=None, |
| 775 ) |
| 776 |
| 777 @contextlib.contextmanager |
| 778 def _annotation_monitor(self, flush_period=None): |
| 779 # Use a really high flush period. This should never naturally trigger during |
| 780 # a test case. |
| 781 flush_period = flush_period or datetime.timedelta(hours=1) |
| 782 |
| 783 am = stream_logdog._AnnotationMonitor(self.env, self.db, flush_period) |
| 784 try: |
| 785 yield am |
| 786 finally: |
| 787 am.flush_and_join() |
| 788 |
| 789 with am._lock: |
| 790 # Assert that our timer has been shut down. |
| 791 self.assertIsNone(am._flush_timer) |
| 792 # Assert that there is no buffered data. |
| 793 self.assertIsNone(am._current_data) |
| 794 |
| 795 def testMonitorStartsAndJoinsWithNoData(self): |
| 796 with self._annotation_monitor() as am: |
| 797 pass |
| 798 |
| 799 # No datagrams should have been sent. |
| 800 self.assertIsNone(self.db.latest) |
| 801 self.assertEqual(len(self.db.datagrams), 0) |
| 802 |
| 803 def testMonitorBuffersAndSendsData(self): |
| 804 with self._annotation_monitor() as am: |
| 805 # The first piece of data should have been immediately sent. |
| 806 am.signal_update('initial') |
| 807 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 808 |
| 809 # Subsequent iterations should not send data, but should start the flush |
| 810 # timer and buffer the latest data. |
| 811 with am._lock: |
| 812 self.assertIsNone(am._flush_timer) |
| 813 for i in xrange(10): |
| 814 am.signal_update('test%d' % (i,)) |
| 815 time.sleep(self._SMALL_TIME_DELTA.total_seconds()) |
| 816 with am._lock: |
| 817 self.assertEqual(am._current_data, 'test9') |
| 818 self.assertIsNotNone(am._flush_timer) |
| 819 |
| 820 # Pretend the timer triggered. We should receive the latest buffered data. |
| 821 am._flush_timer_expired() |
| 822 self.assertEqual(self.db.wait_for_data(), 'test9') |
| 823 with am._lock: |
| 824 # No more timer or buffered data. |
| 825 self.assertIsNone(am._flush_timer) |
| 826 self.assertIsNone(am._current_data) |
| 827 |
| 828 # Send one last chunk of data, but don't let the timer expire. This will |
| 829 # be sent on final flush. |
| 830 am.signal_update('final') |
| 831 with am._lock: |
| 832 self.assertIsNotNone(am._flush_timer) |
| 833 |
| 834 # Assert that the final chunk of data was sent. |
| 835 self.assertEqual(self.db.latest, 'final') |
| 836 |
| 837 # Only three datagrams should have been sent. |
| 838 self.assertEqual(len(self.db.datagrams), 3) |
| 839 |
| 840 def testMonitorIgnoresDuplicateData(self): |
| 841 with self._annotation_monitor() as am: |
| 842 # Get initial data out of the way. |
| 843 am.signal_update('initial') |
| 844 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 845 |
| 846 # Send the same thing. It should not be buffered. |
| 847 am.signal_update('initial') |
| 848 with am._lock: |
| 849 self.assertIsNone(am._flush_timer) |
| 850 self.assertIsNone(am._current_data) |
| 851 |
| 852 # Only one datagrams should have been sent. |
| 853 self.assertEqual(len(self.db.datagrams), 1) |
| 854 |
| 855 def testStructuralUpdateSendsImmediately(self): |
| 856 with self._annotation_monitor() as am: |
| 857 # Get initial data out of the way. |
| 858 am.signal_update('initial') |
| 859 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 860 |
| 861 # Send a structural update. It should send immediately. |
| 862 am.signal_update('test', structural=True) |
| 863 self.assertEqual(self.db.wait_for_data(), 'test') |
| 864 |
| 865 # Send a duplicate structural update. It should be ignored. |
| 866 am.signal_update('test', structural=True) |
| 867 with am._lock: |
| 868 self.assertIsNone(am._flush_timer) |
| 869 self.assertIsNone(am._current_data) |
| 870 |
| 871 # Only two datagrams should have been sent. |
| 872 self.assertEqual(len(self.db.datagrams), 2) |
| 873 |
| 874 def testFlushesPeriodically(self): |
| 875 with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am: |
| 876 # Get initial data out of the way. |
| 877 am.signal_update('initial') |
| 878 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 879 |
| 880 # Send a structural update. It should send immediately. |
| 881 am.signal_update('test') |
| 882 self.assertEqual(self.db.wait_for_data(), 'test') |
| 883 |
| 884 # Only two datagrams should have been sent. |
| 885 self.assertEqual(len(self.db.datagrams), 2) |
| 886 |
| 887 |
| 888 class AnnotationStateTest(unittest.TestCase): |
| 889 """Tests the stream_logdog._AnnotationState directly.""" |
| 890 |
| 891 def setUp(self): |
| 892 self.env = stream_logdog._Environment( |
| 893 None, |
| 894 argv=['command', 'arg0', 'arg1'], |
| 895 cwd='path/to/cwd', |
| 896 environ={ |
| 897 'foo': 'bar', |
| 898 'FOO': 'baz', |
| 899 }, |
| 900 ) |
| 901 self.astate = stream_logdog._AnnotationState.create( |
| 902 stream_logdog._StreamName('strean/name'), |
| 903 environment=self.env, |
| 904 properties={'foo': 'bar'}, |
| 905 ) |
| 906 |
| 907 def testFirstCheckReturnsData(self): |
| 908 # The first check should return data. |
| 909 self.assertIsNotNone(self.astate.check()) |
| 910 # The second will, since nothing has changed. |
| 911 self.assertIsNone(self.astate.check()) |
| 912 |
| 913 def testCanCreateAndGetStep(self): |
| 914 # Root step. |
| 915 base = self.astate.base |
| 916 self.astate.create_step(recipe_api.StepConfig.create(name='first')) |
| 917 self.assertEqual(len(base.substep), 1) |
| 918 self.assertEqual(base.substep[0].step.name, 'first') |
| 919 self.assertIsNotNone(self.astate.check()) |
| 920 |
| 921 # Child step. |
| 922 self.astate.create_step(recipe_api.StepConfig.create( |
| 923 name='first child', |
| 924 step_nest_level=1)) |
| 925 self.assertEqual(len(base.substep), 1) |
| 926 self.assertEqual(len(base.substep[0].step.substep), 1) |
| 927 self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child') |
| 928 self.assertIsNotNone(self.astate.check()) |
| 929 |
| 930 # Sibling step to 'first'. |
| 931 self.astate.create_step(recipe_api.StepConfig.create(name='second')) |
| 932 self.assertEqual(len(base.substep), 2) |
| 933 self.assertEqual(base.substep[1].step.name, 'second') |
| 934 self.assertIsNotNone(self.astate.check()) |
| 935 |
| 936 def testCanUpdateProperties(self): |
| 937 self.astate.update_properties(foo='baz', qux='quux') |
| 938 self.assertEqual(list(self.astate.base.property), [ |
| 939 pb.Step.Property(name='foo', value='baz'), |
| 940 pb.Step.Property(name='qux', value='quux'), |
| 941 ]) |
| 942 |
| 943 |
| 944 class StreamNameTest(unittest.TestCase): |
| 945 """Tests the stream_logdog._StreamName directly.""" |
| 946 |
| 947 def testEmptyStreamNameRaisesValueError(self): |
| 948 sn = stream_logdog._StreamName(None) |
| 949 with self.assertRaises(ValueError): |
| 950 str(sn) |
| 951 |
| 952 def testInvalidBaseRaisesValueError(self): |
| 953 with self.assertRaises(ValueError): |
| 954 stream_logdog._StreamName('!!! invalid !!!') |
| 955 |
| 956 def testAppendComponents(self): |
| 957 sn = stream_logdog._StreamName('base') |
| 958 self.assertEqual(str(sn.append()), 'base') |
| 959 self.assertEqual(str(sn.append('foo')), 'base/foo') |
| 960 self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar') |
| 961 self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz') |
| 962 |
| 963 def testAugment(self): |
| 964 sn = stream_logdog._StreamName('base') |
| 965 self.assertEqual(str(sn.augment('')), 'base') |
| 966 self.assertEqual(str(sn.augment('foo')), 'basefoo') |
| 967 self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz') |
| 968 |
| 969 def testAppendInvalidStreamNameNormalizes(self): |
| 970 sn = stream_logdog._StreamName('base') |
| 971 sn = sn.append('#!!! stream name !!!') |
| 972 self.assertEqual(str(sn), 'base/s______stream_name____') |
| 973 |
| 974 def testAugmentInvalidStreamNameNormalizes(self): |
| 975 sn = stream_logdog._StreamName('base') |
| 976 self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____') |
| 977 |
| 978 |
| 979 if __name__ == '__main__': |
| 980 unittest.main() |
OLD | NEW |