OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # Copyright 2015 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. |
| 5 |
| 6 import collections |
| 7 import contextlib |
| 8 import datetime |
| 9 import json |
| 10 import threading |
| 11 import time |
| 12 import unittest |
| 13 import StringIO |
| 14 |
| 15 import test_env |
| 16 |
| 17 import libs.logdog.stream |
| 18 import libs.logdog.varint |
| 19 from google.protobuf import json_format as jsonpb |
| 20 from recipe_engine import recipe_api |
| 21 from recipe_engine import stream |
| 22 from recipe_engine import stream_logdog |
| 23 |
| 24 |
| 25 import annotations_pb2 as pb |
| 26 |
| 27 |
| 28 def _translate_annotation_datagram(dg): |
| 29 """Translate annotation datagram binary data into a Python dict modeled after |
| 30 the JSONPB projection of the datagram. |
| 31 |
| 32 This is chosen because it allows for easy idiomatic equality assertions in |
| 33 test cases. |
| 34 |
| 35 Args: |
| 36 dg (str): The serialized annotation pb.Step datagram. |
| 37 """ |
| 38 msg = pb.Step() |
| 39 msg.ParseFromString(dg) |
| 40 return json.loads(jsonpb.MessageToJson(msg)) |
| 41 |
| 42 |
| 43 class _TestStreamClient(libs.logdog.stream.StreamClient): |
| 44 """A testing StreamClient that retains all data written to it.""" |
| 45 |
| 46 class Stream(object): |
| 47 """A file-like object that is explicitly aware of LogDog stream protocol.""" |
| 48 |
| 49 def __init__(self, stream_client): |
| 50 self._client = stream_client |
| 51 self._buf = StringIO.StringIO() |
| 52 self._header = None |
| 53 self._final_data = None |
| 54 self._data_offset = None |
| 55 |
| 56 def write(self, data): |
| 57 self._buf.write(data) |
| 58 self._attempt_registration() |
| 59 |
| 60 def close(self): |
| 61 # If we never parsed our header, register that we are incomplete. |
| 62 if self._header is None: |
| 63 self._client._register_incomplete(self) |
| 64 |
| 65 self._final_data = self.data |
| 66 self._buf.close() |
| 67 |
| 68 @contextlib.contextmanager |
| 69 def _read_from(self, offset): |
| 70 # Seek to the specified offset. |
| 71 self._buf.seek(offset, mode=0) |
| 72 try: |
| 73 yield self._buf |
| 74 finally: |
| 75 # Seek back to he end of the stream so future writes will append. |
| 76 self._buf.seek(0, mode=2) |
| 77 |
| 78 def _attempt_registration(self): |
| 79 # Only need to register once. |
| 80 if self._header is not None: |
| 81 return |
| 82 |
| 83 # Can we parse a full LogDog stream header? |
| 84 # |
| 85 # This means pulling: |
| 86 # - The LogDog Butler header. |
| 87 # - The header size varint. |
| 88 # - The header JSON blob, which needs to be decoded. |
| 89 with self._read_from(0) as fd: |
| 90 # Read 'result' bytes. |
| 91 magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC)) |
| 92 if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC): |
| 93 # Incomplete magic number, cannot complete registration. |
| 94 return |
| 95 count = len(magic_data) |
| 96 |
| 97 try: |
| 98 size, varint_count = libs.logdog.varint.read_uvarint(fd) |
| 99 except ValueError: |
| 100 # Incomplete varint, cannot complete registration. |
| 101 return |
| 102 count += varint_count |
| 103 |
| 104 header_data = fd.read(size) |
| 105 if len(header_data) != size: |
| 106 # Incomplete header, cannot complete registration. |
| 107 return |
| 108 count += size |
| 109 |
| 110 # Parse the header as JSON. |
| 111 self._header = json.loads(header_data) |
| 112 self._data_offset = count # (varint + header size) |
| 113 self._client._register_stream(self, self._header) |
| 114 |
| 115 @property |
| 116 def data(self): |
| 117 # If we have already cached our data (on close), return it directly. |
| 118 if self._final_data is not None: |
| 119 return self._final_data |
| 120 |
| 121 # Load our data from our live buffer. |
| 122 if self._data_offset is None: |
| 123 # No header has been read, so there is no data. |
| 124 return None |
| 125 with self._read_from(self._data_offset) as fd: |
| 126 return fd.read() |
| 127 |
| 128 |
| 129 _StreamEntry = collections.namedtuple('_StreamEntry', ( |
| 130 's', 'type', 'content_type')) |
| 131 |
| 132 _DATAGRAM_CONTENT_TRANSLATE = { |
| 133 stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram, |
| 134 } |
| 135 |
| 136 |
| 137 def __init__(self): |
| 138 super(_TestStreamClient, self).__init__() |
| 139 self.streams = {} |
| 140 self.incomplete = [] |
| 141 self.unregistered = {} |
| 142 |
| 143 @classmethod |
| 144 def _create(cls, value): |
| 145 raise NotImplementedError('Instances must be created manually.') |
| 146 |
| 147 def _connect_raw(self): |
| 148 s = self.Stream(self) |
| 149 self.unregistered[id(s)] = s |
| 150 return s |
| 151 |
| 152 def get(self, name): |
| 153 se = self.streams[name] |
| 154 data = se.s.data |
| 155 |
| 156 if se.type == libs.logdog.stream.StreamParams.TEXT: |
| 157 # Return text stream data as a list of lines. We use unicode because it |
| 158 # fits in with the JSON dump from 'all_streams'. |
| 159 return [unicode(l) for l in data.splitlines()] |
| 160 elif se.type == libs.logdog.stream.StreamParams.BINARY: |
| 161 raise NotImplementedError('No support for fetching binary stream data.') |
| 162 elif se.type == libs.logdog.stream.StreamParams.DATAGRAM: |
| 163 # Return datagram stream data as a list of datagrams. |
| 164 sio = StringIO.StringIO(data) |
| 165 datagrams = [] |
| 166 while sio.tell() < sio.len: |
| 167 size, _ = libs.logdog.varint.read_uvarint(sio) |
| 168 dg = sio.read(size) |
| 169 if len(dg) != size: |
| 170 raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size)) |
| 171 |
| 172 # If this datagram is a known type (e.g., protobuf), transform it into |
| 173 # JSONPB. |
| 174 translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type) |
| 175 if translator is not None: |
| 176 dg = translator(dg) |
| 177 datagrams.append(dg) |
| 178 |
| 179 sio.close() |
| 180 return dg |
| 181 else: |
| 182 raise ValueError('Unknown stream type [%s]' % (se.type,)) |
| 183 |
| 184 def all_streams(self): |
| 185 return dict((name, self.get(name)) for name in self.streams.iterkeys()) |
| 186 |
| 187 @property |
| 188 def stream_names(self): |
| 189 return set(self.streams.iterkeys()) |
| 190 |
| 191 def _remove_from_unregistered(self, s): |
| 192 if id(s) not in self.unregistered: |
| 193 raise KeyError('Stream is not known to be unregistered.') |
| 194 del(self.unregistered[id(s)]) |
| 195 |
| 196 def _register_stream(self, s, header): |
| 197 name = header.get('name') |
| 198 if name in self.streams: |
| 199 raise KeyError('Duplicate stream [%s]' % (name,)) |
| 200 |
| 201 self._remove_from_unregistered(s) |
| 202 self.streams[name] = self._StreamEntry( |
| 203 s=s, |
| 204 type=header['type'], |
| 205 content_type=header.get('contentType'), |
| 206 ) |
| 207 |
| 208 def _register_incomplete(self, s): |
| 209 self._remove_from_unregistered(s) |
| 210 self.incomplete.append(s) |
| 211 |
| 212 |
| 213 class EnvironmentTest(unittest.TestCase): |
| 214 """Simple test to assert that _Environment, which is stubbed during our tests, |
| 215 actually works.""" |
| 216 |
| 217 def testEnvironmentProbes(self): |
| 218 stream_logdog._Environment.probe() |
| 219 |
| 220 |
| 221 class StreamEngineTest(unittest.TestCase): |
| 222 |
| 223 def setUp(self): |
| 224 self.client = _TestStreamClient() |
| 225 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3) |
| 226 self.env = stream_logdog._Environment( |
| 227 now_fn=lambda: self.now, |
| 228 argv=[], |
| 229 environ={}, |
| 230 cwd=None, |
| 231 ) |
| 232 self.maxDiff = 1024*1024 |
| 233 |
| 234 |
| 235 @contextlib.contextmanager |
| 236 def _new_stream_engine(self, **kwargs): |
| 237 kwargs.setdefault('client', self.client) |
| 238 kwargs.setdefault('environment', self.env) |
| 239 |
| 240 # Initialize and open a StreamEngine. |
| 241 se = stream_logdog.StreamEngine(**kwargs) |
| 242 se.open() |
| 243 yield se |
| 244 |
| 245 # Close the StreamEngine after we're done with it. |
| 246 self._advance_time() |
| 247 se.close() |
| 248 |
| 249 @contextlib.contextmanager |
| 250 def _step_stream(self, se, **kwargs): |
| 251 # Initialize and yield a new step stream. |
| 252 self._advance_time() |
| 253 step_stream = se.new_step_stream(recipe_api.StepConfig.create(**kwargs)) |
| 254 yield step_stream |
| 255 |
| 256 # Close the step stream when we're done with it. |
| 257 self._advance_time() |
| 258 step_stream.close() |
| 259 |
| 260 @contextlib.contextmanager |
| 261 def _log_stream(self, step_stream, name): |
| 262 # Initialize and yield a new log stream. |
| 263 log_stream = step_stream.new_log_stream(name) |
| 264 yield log_stream |
| 265 |
| 266 # Close the log stream when we're done with it. |
| 267 log_stream.close() |
| 268 |
| 269 def _advance_time(self): |
| 270 self.now += datetime.timedelta(seconds=1) |
| 271 |
| 272 def testEmptyStreamEngine(self): |
| 273 self.env.argv = ['fake_program', 'arg0', 'arg1'] |
| 274 self.env.environ['foo'] = 'bar' |
| 275 self.env.cwd = 'CWD' |
| 276 |
| 277 with self._new_stream_engine() as se: |
| 278 pass |
| 279 |
| 280 self.assertEqual(self.client.all_streams(), { |
| 281 u'annotations': { |
| 282 u'name': u'steps', |
| 283 u'status': u'SUCCESS', |
| 284 u'started': u'2106-06-12T01:02:03Z', |
| 285 u'ended': u'2106-06-12T01:02:04Z', |
| 286 u'command': { |
| 287 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 288 u'cwd': u'CWD', |
| 289 u'environ': {u'foo': u'bar'}, |
| 290 }, |
| 291 }, |
| 292 }) |
| 293 |
| 294 def testBasicStream(self): |
| 295 self.env.argv = ['fake_program', 'arg0', 'arg1'] |
| 296 self.env.environ['foo'] = 'bar' |
| 297 self.env.cwd = 'CWD' |
| 298 |
| 299 with self._new_stream_engine(name_base='test/base') as se: |
| 300 with self._step_stream(se, |
| 301 name='first step', |
| 302 cmd=['first', 'step'], |
| 303 cwd='FIRST_CWD') as step: |
| 304 step.add_step_text('Sup') |
| 305 step.add_step_text('Dawg?') |
| 306 step.write_line('STDOUT for first step.') |
| 307 step.write_line('(Another line)') |
| 308 step.add_step_summary_text('Everything is great.') |
| 309 step.add_step_link('example 1', 'http://example.com/1') |
| 310 step.add_step_link('example 2', 'http://example.com/2') |
| 311 step.set_step_status('SUCCESS') |
| 312 |
| 313 with self._step_stream(se, name='second step') as step: |
| 314 step.set_step_status('SUCCESS') |
| 315 step.write_split('multiple\nlines\nof\ntext') |
| 316 |
| 317 # Create two log streams with the same name to test indexing. |
| 318 # |
| 319 # Note that "log stream" is an invalid LogDog stream name, so this |
| 320 # will also test normalization. |
| 321 with self._log_stream(step, 'log stream') as ls: |
| 322 ls.write_split('foo\nbar\nbaz\n') |
| 323 with self._log_stream(step, 'log stream') as ls: |
| 324 ls.write_split('qux\nquux\n') |
| 325 |
| 326 # This is a different stream name, but will normalize to the same log |
| 327 # stream name as 'second/step', so this will test that we disambiguate |
| 328 # the log stream names. |
| 329 with self._step_stream(se, name='second/step') as step: |
| 330 pass |
| 331 |
| 332 self.assertEqual(self.client.all_streams(), { |
| 333 u'test/base/annotations': { |
| 334 u'name': u'steps', |
| 335 u'status': u'SUCCESS', |
| 336 u'started': u'2106-06-12T01:02:03Z', |
| 337 u'ended': u'2106-06-12T01:02:10Z', |
| 338 u'command': { |
| 339 u'commandLine': [u'fake_program', u'arg0', u'arg1'], |
| 340 u'cwd': u'CWD', |
| 341 u'environ': {u'foo': u'bar'}, |
| 342 }, |
| 343 u'substep': [ |
| 344 |
| 345 {u'step': { |
| 346 u'name': u'first step', |
| 347 u'status': u'SUCCESS', |
| 348 u'started': u'2106-06-12T01:02:04Z', |
| 349 u'ended': u'2106-06-12T01:02:05Z', |
| 350 u'command': { |
| 351 u'commandLine': [u'first', u'step'], |
| 352 u'cwd': u'FIRST_CWD', |
| 353 }, |
| 354 u'stdoutStream': { |
| 355 u'name': u'test/base/steps/first_step/stdout', |
| 356 }, |
| 357 u'text': [u'Everything is great.', u'Sup', u'Dawg?'], |
| 358 u'otherLinks': [ |
| 359 { |
| 360 u'label': u'example 1', |
| 361 u'url': u'http://example.com/1', |
| 362 }, |
| 363 { |
| 364 u'label': u'example 2', |
| 365 u'url': u'http://example.com/2', |
| 366 }, |
| 367 ], |
| 368 }}, |
| 369 |
| 370 {u'step': { |
| 371 u'name': u'second step', |
| 372 u'status': u'SUCCESS', |
| 373 u'started': u'2106-06-12T01:02:06Z', |
| 374 u'ended': u'2106-06-12T01:02:07Z', |
| 375 u'stdoutStream': { |
| 376 u'name': u'test/base/steps/second_step/stdout', |
| 377 }, |
| 378 u'otherLinks': [ |
| 379 { |
| 380 u'label': u'log stream', |
| 381 u'logdogStream': { |
| 382 u'name': u'test/base/steps/second_step/logs/log_stream/0', |
| 383 }, |
| 384 }, |
| 385 { |
| 386 u'label': u'log stream', |
| 387 u'logdogStream': { |
| 388 u'name': u'test/base/steps/second_step/logs/log_stream/1', |
| 389 }, |
| 390 }, |
| 391 ], |
| 392 }}, |
| 393 |
| 394 {u'step': { |
| 395 u'name': u'second/step', |
| 396 u'status': u'SUCCESS', |
| 397 u'started': u'2106-06-12T01:02:08Z', |
| 398 u'ended': u'2106-06-12T01:02:09Z', |
| 399 }}, |
| 400 ], |
| 401 }, |
| 402 |
| 403 u'test/base/steps/first_step/stdout': [ |
| 404 u'STDOUT for first step.', |
| 405 u'(Another line)', |
| 406 ], |
| 407 |
| 408 u'test/base/steps/second_step/stdout': [ |
| 409 u'multiple', |
| 410 u'lines', |
| 411 u'of', |
| 412 u'text', |
| 413 ], |
| 414 |
| 415 u'test/base/steps/second_step/logs/log_stream/0': [ |
| 416 u'foo', |
| 417 u'bar', |
| 418 u'baz', |
| 419 ], |
| 420 |
| 421 u'test/base/steps/second_step/logs/log_stream/1': [ |
| 422 u'qux', |
| 423 u'quux', |
| 424 ], |
| 425 }) |
| 426 |
| 427 def testWarningBasicStream(self): |
| 428 with self._new_stream_engine() as se: |
| 429 with self._step_stream(se, name='isuck') as step: |
| 430 step.add_step_summary_text('Something went wrong.') |
| 431 step.set_step_status('WARNING') |
| 432 |
| 433 self.assertEqual(self.client.all_streams(), { |
| 434 u'annotations': { |
| 435 u'name': u'steps', |
| 436 u'status': u'SUCCESS', |
| 437 u'started': u'2106-06-12T01:02:03Z', |
| 438 u'ended': u'2106-06-12T01:02:06Z', |
| 439 u'substep': [ |
| 440 |
| 441 {u'step': { |
| 442 u'name': u'isuck', |
| 443 u'status': u'SUCCESS', |
| 444 u'failureDetails': { |
| 445 u'text': u'Something went wrong.', |
| 446 }, |
| 447 u'started': u'2106-06-12T01:02:04Z', |
| 448 u'ended': u'2106-06-12T01:02:05Z', |
| 449 u'text': [u'Something went wrong.'], |
| 450 }}, |
| 451 ], |
| 452 }, |
| 453 }) |
| 454 |
| 455 def testFailedBasicStream(self): |
| 456 with self._new_stream_engine() as se: |
| 457 with self._step_stream(se, name='isuck') as step: |
| 458 step.add_step_summary_text('Oops I failed.') |
| 459 step.set_step_status('FAILURE') |
| 460 |
| 461 with self._step_stream(se, name='irock') as step: |
| 462 pass |
| 463 |
| 464 self.assertEqual(self.client.all_streams(), { |
| 465 u'annotations': { |
| 466 u'name': u'steps', |
| 467 u'status': u'FAILURE', |
| 468 u'started': u'2106-06-12T01:02:03Z', |
| 469 u'ended': u'2106-06-12T01:02:08Z', |
| 470 u'substep': [ |
| 471 |
| 472 {u'step': { |
| 473 u'name': u'isuck', |
| 474 u'status': u'FAILURE', |
| 475 u'failureDetails': { |
| 476 u'text': u'Oops I failed.', |
| 477 }, |
| 478 u'started': u'2106-06-12T01:02:04Z', |
| 479 u'ended': u'2106-06-12T01:02:05Z', |
| 480 u'text': [u'Oops I failed.'], |
| 481 }}, |
| 482 |
| 483 {u'step': { |
| 484 u'name': u'irock', |
| 485 u'status': u'SUCCESS', |
| 486 u'started': u'2106-06-12T01:02:06Z', |
| 487 u'ended': u'2106-06-12T01:02:07Z', |
| 488 }}, |
| 489 ], |
| 490 }, |
| 491 }) |
| 492 |
| 493 def testNestedStream(self): |
| 494 with self._new_stream_engine() as se: |
| 495 # parent |
| 496 with self._step_stream(se, name='parent') as step: |
| 497 step.write_line('I am the parent.') |
| 498 |
| 499 # parent."child 1" |
| 500 with self._step_stream(se, |
| 501 name='child 1', |
| 502 step_nest_level=1) as step: |
| 503 step.write_line('I am child #1.') |
| 504 |
| 505 # parent."child 1"."grandchild" |
| 506 with self._step_stream(se, |
| 507 name='grandchild', |
| 508 step_nest_level=2) as step: |
| 509 step.write_line("I am child #1's child.") |
| 510 |
| 511 # parent."child 2". Mark this child as failed. This should not propagate |
| 512 # to the parent, since it has an explicit status. |
| 513 with self._step_stream(se, |
| 514 name='child 2', |
| 515 step_nest_level=1) as step: |
| 516 step.write_line('I am child #2.') |
| 517 |
| 518 # parent."child 2". Mark this child as failed. This should not propagate |
| 519 # to the parent, since it has an explicit status. |
| 520 with self._step_stream(se, name='friend') as step: |
| 521 step.write_line("I am the parent's friend.") |
| 522 |
| 523 self.assertEqual(self.client.all_streams(), { |
| 524 u'annotations': { |
| 525 u'name': u'steps', |
| 526 u'status': u'SUCCESS', |
| 527 u'started': u'2106-06-12T01:02:03Z', |
| 528 u'ended': u'2106-06-12T01:02:14Z', |
| 529 u'substep': [ |
| 530 |
| 531 {u'step': { |
| 532 u'name': u'parent', |
| 533 u'status': u'SUCCESS', |
| 534 u'started': u'2106-06-12T01:02:04Z', |
| 535 u'ended': u'2106-06-12T01:02:05Z', |
| 536 u'stdoutStream': { |
| 537 u'name': u'steps/parent/stdout', |
| 538 }, |
| 539 u'substep': [ |
| 540 |
| 541 {u'step': { |
| 542 u'name': u'child 1', |
| 543 u'status': u'SUCCESS', |
| 544 u'started': u'2106-06-12T01:02:06Z', |
| 545 u'ended': u'2106-06-12T01:02:07Z', |
| 546 u'stdoutStream': { |
| 547 u'name': u'steps/parent/steps/child_1/stdout', |
| 548 }, |
| 549 u'substep': [ |
| 550 |
| 551 {u'step': { |
| 552 u'name': u'grandchild', |
| 553 u'status': u'SUCCESS', |
| 554 u'started': u'2106-06-12T01:02:08Z', |
| 555 u'ended': u'2106-06-12T01:02:09Z', |
| 556 u'stdoutStream': { |
| 557 u'name': u'steps/parent/steps/child_1/' |
| 558 'steps/grandchild/stdout', |
| 559 }, |
| 560 }}, |
| 561 ], |
| 562 }}, |
| 563 |
| 564 {u'step': { |
| 565 u'name': u'child 2', |
| 566 u'status': u'SUCCESS', |
| 567 u'started': u'2106-06-12T01:02:10Z', |
| 568 u'ended': u'2106-06-12T01:02:11Z', |
| 569 u'stdoutStream': { |
| 570 u'name': u'steps/parent/steps/child_2/stdout', |
| 571 }, |
| 572 }}, |
| 573 ], |
| 574 }}, |
| 575 |
| 576 {u'step': { |
| 577 u'name': u'friend', |
| 578 u'status': u'SUCCESS', |
| 579 u'started': u'2106-06-12T01:02:12Z', |
| 580 u'ended': u'2106-06-12T01:02:13Z', |
| 581 u'stdoutStream': { |
| 582 u'name': u'steps/friend/stdout', |
| 583 }, |
| 584 }}, |
| 585 ], |
| 586 }, |
| 587 |
| 588 u'steps/parent/stdout': [u'I am the parent.'], |
| 589 u'steps/parent/steps/child_1/stdout': [u'I am child #1.'], |
| 590 u'steps/parent/steps/child_1/steps/grandchild/stdout': [ |
| 591 u"I am child #1's child."], |
| 592 u'steps/parent/steps/child_2/stdout': [u'I am child #2.'], |
| 593 u'steps/friend/stdout': [u"I am the parent's friend."], |
| 594 }) |
| 595 |
| 596 def testTriggersRaiseException(self): |
| 597 with self._new_stream_engine() as se: |
| 598 with self._step_stream(se, name='trigger') as step: |
| 599 with self.assertRaises(NotImplementedError): |
| 600 step.trigger('trigger spec') |
| 601 |
| 602 def testTriggersIgnored(self): |
| 603 with self._new_stream_engine(ignore_triggers=True) as se: |
| 604 with self._step_stream(se, name='trigger') as step: |
| 605 step.trigger('trigger spec') |
| 606 |
| 607 def testNoSubannotations(self): |
| 608 with self._new_stream_engine(ignore_triggers=True) as se: |
| 609 with self.assertRaises(NotImplementedError): |
| 610 se.new_step_stream(recipe_api.StepConfig.create( |
| 611 name='uses subannotations', |
| 612 allow_subannotations=True, |
| 613 )) |
| 614 |
| 615 def testInvalidStepStatusRaisesValueError(self): |
| 616 with self._new_stream_engine() as se: |
| 617 with self._step_stream(se, name='trigger') as step: |
| 618 with self.assertRaises(ValueError): |
| 619 step.set_step_status('OHAI') |
| 620 |
| 621 |
| 622 class AnnotationMonitorTest(unittest.TestCase): |
| 623 """Tests the stream_logdog._AnnotationMonitor directly.""" |
| 624 |
| 625 # A small timedelta, sufficient to block but fast enough to not make the |
| 626 # test slow. |
| 627 _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5) |
| 628 |
| 629 class _DatagramBuffer(object): |
| 630 |
| 631 def __init__(self): |
| 632 self.datagrams = [] |
| 633 self.data_event = threading.Event() |
| 634 |
| 635 def send(self, dg): |
| 636 self.datagrams.append(dg) |
| 637 self.data_event.set() |
| 638 |
| 639 def __len__(self): |
| 640 return len(self.datagrams) |
| 641 |
| 642 @property |
| 643 def latest(self): |
| 644 if self.datagrams: |
| 645 return self.datagrams[-1] |
| 646 return None |
| 647 |
| 648 def wait_for_data(self): |
| 649 self.data_event.wait() |
| 650 self.data_event.clear() |
| 651 return self.latest |
| 652 |
| 653 |
| 654 @contextlib.contextmanager |
| 655 def _annotation_monitor(self, **kwargs): |
| 656 # Default to a really high flush period. This should never naturally trigger |
| 657 # during a test case. |
| 658 kwargs.setdefault('flush_period', datetime.timedelta(hours=1)) |
| 659 |
| 660 am = stream_logdog._AnnotationMonitor(self.db, **kwargs) |
| 661 try: |
| 662 yield am |
| 663 finally: |
| 664 am.flush_and_join() |
| 665 |
| 666 with am._lock: |
| 667 # Assert that our timer has been shut down. |
| 668 self.assertIsNone(am._flush_timer) |
| 669 # Assert that there is no buffered data. |
| 670 self.assertIsNone(am._current_data) |
| 671 |
| 672 def setUp(self): |
| 673 self.db = self._DatagramBuffer() |
| 674 |
| 675 def testMonitorStartsAndJoinsWithNoData(self): |
| 676 with self._annotation_monitor() as am: |
| 677 pass |
| 678 |
| 679 # No datagrams should have been sent. |
| 680 self.assertIsNone(self.db.latest) |
| 681 self.assertEqual(len(self.db.datagrams), 0) |
| 682 |
| 683 def testMonitorBuffersAndSendsData(self): |
| 684 with self._annotation_monitor() as am: |
| 685 # The first piece of data should have been immediately sent. |
| 686 am.signal_update('initial') |
| 687 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 688 |
| 689 # Subsequent iterations should not send data, but should start the flush |
| 690 # timer and buffer the latest data. |
| 691 with am._lock: |
| 692 self.assertIsNone(am._flush_timer) |
| 693 for i in xrange(10): |
| 694 am.signal_update('test%d' % (i,)) |
| 695 time.sleep(self._SMALL_TIME_DELTA.total_seconds()) |
| 696 with am._lock: |
| 697 self.assertEqual(am._current_data, 'test9') |
| 698 self.assertIsNotNone(am._flush_timer) |
| 699 |
| 700 # Pretend the timer triggered. We should receive the latest buffered data. |
| 701 am._flush_timer_expired() |
| 702 self.assertEqual(self.db.wait_for_data(), 'test9') |
| 703 with am._lock: |
| 704 # No more timer or buffered data. |
| 705 self.assertIsNone(am._flush_timer) |
| 706 self.assertIsNone(am._current_data) |
| 707 |
| 708 # Send one last chunk of data, but don't let the timer expire. This will |
| 709 # be sent on final flush. |
| 710 am.signal_update('final') |
| 711 with am._lock: |
| 712 self.assertIsNotNone(am._flush_timer) |
| 713 |
| 714 # Assert that the final chunk of data was sent. |
| 715 self.assertEqual(self.db.latest, 'final') |
| 716 |
| 717 # Only three datagrams should have been sent. |
| 718 self.assertEqual(len(self.db.datagrams), 3) |
| 719 |
| 720 def testMonitorIgnoresDuplicateData(self): |
| 721 with self._annotation_monitor() as am: |
| 722 # Get initial data out of the way. |
| 723 am.signal_update('initial') |
| 724 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 725 |
| 726 # Send the same thing. It should not be buffered. |
| 727 am.signal_update('initial') |
| 728 with am._lock: |
| 729 self.assertIsNone(am._flush_timer) |
| 730 self.assertIsNone(am._current_data) |
| 731 |
| 732 # Only one datagrams should have been sent. |
| 733 self.assertEqual(len(self.db.datagrams), 1) |
| 734 |
| 735 def testStructuralUpdateSendsImmediately(self): |
| 736 with self._annotation_monitor() as am: |
| 737 # Get initial data out of the way. |
| 738 am.signal_update('initial') |
| 739 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 740 |
| 741 # Send a structural update. It should send immediately. |
| 742 am.signal_update('test', structural=True) |
| 743 self.assertEqual(self.db.wait_for_data(), 'test') |
| 744 |
| 745 # Send a duplicate structural update. It should be ignored. |
| 746 am.signal_update('test', structural=True) |
| 747 with am._lock: |
| 748 self.assertIsNone(am._flush_timer) |
| 749 self.assertIsNone(am._current_data) |
| 750 |
| 751 # Only two datagrams should have been sent. |
| 752 self.assertEqual(len(self.db.datagrams), 2) |
| 753 |
| 754 def testFlushesPeriodically(self): |
| 755 with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am: |
| 756 # Get initial data out of the way. |
| 757 am.signal_update('initial') |
| 758 self.assertEqual(self.db.wait_for_data(), 'initial') |
| 759 |
| 760 # Send a structural update. It should send immediately. |
| 761 am.signal_update('test') |
| 762 self.assertEqual(self.db.wait_for_data(), 'test') |
| 763 |
| 764 # Only two datagrams should have been sent. |
| 765 self.assertEqual(len(self.db.datagrams), 2) |
| 766 |
| 767 |
| 768 class AnnotationStateTest(unittest.TestCase): |
| 769 """Tests the stream_logdog._AnnotationState directly.""" |
| 770 |
| 771 def setUp(self): |
| 772 self.env = stream_logdog._Environment( |
| 773 None, |
| 774 argv=['command', 'arg0', 'arg1'], |
| 775 cwd='path/to/cwd', |
| 776 environ={ |
| 777 'foo': 'bar', |
| 778 'FOO': 'baz', |
| 779 }, |
| 780 ) |
| 781 self.astate = stream_logdog._AnnotationState.create( |
| 782 stream_logdog._StreamName('strean/name'), |
| 783 environment=self.env, |
| 784 properties={'foo': 'bar'}, |
| 785 ) |
| 786 |
| 787 def testFirstCheckReturnsData(self): |
| 788 # The first check should return data. |
| 789 self.assertIsNotNone(self.astate.check()) |
| 790 # The second will, since nothing has changed. |
| 791 self.assertIsNone(self.astate.check()) |
| 792 |
| 793 def testCanCreateAndGetStep(self): |
| 794 # Root step. |
| 795 base = self.astate.base |
| 796 self.astate.create_step(recipe_api.StepConfig.create(name='first')) |
| 797 self.assertEqual(len(base.substep), 1) |
| 798 self.assertEqual(base.substep[0].step.name, 'first') |
| 799 self.assertIsNotNone(self.astate.check()) |
| 800 |
| 801 # Child step. |
| 802 self.astate.create_step(recipe_api.StepConfig.create( |
| 803 name='first child', |
| 804 step_nest_level=1)) |
| 805 self.assertEqual(len(base.substep), 1) |
| 806 self.assertEqual(len(base.substep[0].step.substep), 1) |
| 807 self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child') |
| 808 self.assertIsNotNone(self.astate.check()) |
| 809 |
| 810 # Sibling step to 'first'. |
| 811 self.astate.create_step(recipe_api.StepConfig.create(name='second')) |
| 812 self.assertEqual(len(base.substep), 2) |
| 813 self.assertEqual(base.substep[1].step.name, 'second') |
| 814 self.assertIsNotNone(self.astate.check()) |
| 815 |
| 816 def testCanUpdateProperties(self): |
| 817 self.astate.update_properties(foo='baz', qux='quux') |
| 818 self.assertEqual(list(self.astate.base.property), [ |
| 819 pb.Step.Property(name='foo', value='baz'), |
| 820 pb.Step.Property(name='qux', value='quux'), |
| 821 ]) |
| 822 |
| 823 |
| 824 class StreamNameTest(unittest.TestCase): |
| 825 """Tests the stream_logdog._StreamName directly.""" |
| 826 |
| 827 def testEmptyStreamNameRaisesValueError(self): |
| 828 sn = stream_logdog._StreamName(None) |
| 829 with self.assertRaises(ValueError): |
| 830 str(sn) |
| 831 |
| 832 def testInvalidBaseRaisesValueError(self): |
| 833 with self.assertRaises(ValueError): |
| 834 stream_logdog._StreamName('!!! invalid !!!') |
| 835 |
| 836 def testAppendComponents(self): |
| 837 sn = stream_logdog._StreamName('base') |
| 838 self.assertEqual(str(sn.append()), 'base') |
| 839 self.assertEqual(str(sn.append('foo')), 'base/foo') |
| 840 self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar') |
| 841 self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz') |
| 842 |
| 843 def testAugment(self): |
| 844 sn = stream_logdog._StreamName('base') |
| 845 self.assertEqual(str(sn.augment('')), 'base') |
| 846 self.assertEqual(str(sn.augment('foo')), 'basefoo') |
| 847 self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz') |
| 848 |
| 849 def testAppendInvalidStreamNameNormalizes(self): |
| 850 sn = stream_logdog._StreamName('base') |
| 851 sn = sn.append('#!!! stream name !!!') |
| 852 self.assertEqual(str(sn), 'base/s______stream_name____') |
| 853 |
| 854 def testAugmentInvalidStreamNameNormalizes(self): |
| 855 sn = stream_logdog._StreamName('base') |
| 856 self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____') |
| 857 |
| 858 |
| 859 if __name__ == '__main__': |
| 860 unittest.main() |
OLD | NEW |