Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(371)

Side by Side Diff: recipe_engine/unittests/stream_logdog_test.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Code review comments. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2015 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file.
5
6 import collections
7 import contextlib
8 import datetime
9 import json
10 import threading
11 import time
12 import unittest
13 import StringIO
14
15 import test_env
16
17 import libs.logdog.stream
18 import libs.logdog.varint
19 from google.protobuf import json_format as jsonpb
20 from recipe_engine import recipe_api
21 from recipe_engine import stream
22 from recipe_engine import stream_logdog
23
24
25 import annotations_pb2 as pb
26
27
28 def _translate_annotation_datagram(dg):
29 """Translate annotation datagram binary data into a Python dict modeled after
30 the JSONPB projection of the datagram.
31
32 This is chosen because it allows for easy idiomatic equality assertions in
33 test cases.
34
35 Args:
36 dg (str): The serialized annotation pb.Step datagram.
37 """
38 msg = pb.Step()
39 msg.ParseFromString(dg)
40 return json.loads(jsonpb.MessageToJson(msg))
41
42
43 class _TestStreamClient(libs.logdog.stream.StreamClient):
44 """A testing StreamClient that retains all data written to it."""
45
46 class Stream(object):
47 """A file-like object that is explicitly aware of LogDog stream protocol."""
48
49 def __init__(self, stream_client):
50 self._client = stream_client
51 self._buf = StringIO.StringIO()
52 self._header = None
53 self._final_data = None
54 self._data_offset = None
55
56 def write(self, data):
57 self._buf.write(data)
58 self._attempt_registration()
59
60 def close(self):
61 # If we never parsed our header, register that we are incomplete.
62 if self._header is None:
63 self._client._register_incomplete(self)
64
65 self._final_data = self.data
66 self._buf.close()
67
68 @contextlib.contextmanager
69 def _read_from(self, offset):
70 # Seek to the specified offset.
71 self._buf.seek(offset, mode=0)
72 try:
73 yield self._buf
74 finally:
75 # Seek back to he end of the stream so future writes will append.
76 self._buf.seek(0, mode=2)
77
78 def _attempt_registration(self):
79 # Only need to register once.
80 if self._header is not None:
81 return
82
83 # Can we parse a full LogDog stream header?
84 #
85 # This means pulling:
86 # - The LogDog Butler header.
87 # - The header size varint.
88 # - The header JSON blob, which needs to be decoded.
89 with self._read_from(0) as fd:
90 # Read 'result' bytes.
91 magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC))
92 if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC):
93 # Incomplete magic number, cannot complete registration.
94 return
95 count = len(magic_data)
96
97 try:
98 size, varint_count = libs.logdog.varint.read_uvarint(fd)
99 except ValueError:
100 # Incomplete varint, cannot complete registration.
101 return
102 count += varint_count
103
104 header_data = fd.read(size)
105 if len(header_data) != size:
106 # Incomplete header, cannot complete registration.
107 return
108 count += size
109
110 # Parse the header as JSON.
111 self._header = json.loads(header_data)
112 self._data_offset = count # (varint + header size)
113 self._client._register_stream(self, self._header)
114
115 @property
116 def data(self):
117 # If we have already cached our data (on close), return it directly.
118 if self._final_data is not None:
119 return self._final_data
120
121 # Load our data from our live buffer.
122 if self._data_offset is None:
123 # No header has been read, so there is no data.
124 return None
125 with self._read_from(self._data_offset) as fd:
126 return fd.read()
127
128
129 _StreamEntry = collections.namedtuple('_StreamEntry', (
130 's', 'type', 'content_type'))
131
132 _DATAGRAM_CONTENT_TRANSLATE = {
133 stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram,
134 }
135
136
137 def __init__(self):
138 super(_TestStreamClient, self).__init__()
139 self.streams = {}
140 self.incomplete = []
141 self.unregistered = {}
142
143 @classmethod
144 def _create(cls, value):
145 raise NotImplementedError('Instances must be created manually.')
146
147 def _connect_raw(self):
148 s = self.Stream(self)
149 self.unregistered[id(s)] = s
150 return s
151
152 def get(self, name):
153 se = self.streams[name]
154 data = se.s.data
155
156 if se.type == libs.logdog.stream.StreamParams.TEXT:
157 # Return text stream data as a list of lines. We use unicode because it
158 # fits in with the JSON dump from 'all_streams'.
159 return [unicode(l) for l in data.splitlines()]
160 elif se.type == libs.logdog.stream.StreamParams.BINARY:
161 raise NotImplementedError('No support for fetching binary stream data.')
162 elif se.type == libs.logdog.stream.StreamParams.DATAGRAM:
163 # Return datagram stream data as a list of datagrams.
164 sio = StringIO.StringIO(data)
165 datagrams = []
166 while sio.tell() < sio.len:
167 size, _ = libs.logdog.varint.read_uvarint(sio)
168 dg = sio.read(size)
169 if len(dg) != size:
170 raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size))
171
172 # If this datagram is a known type (e.g., protobuf), transform it into
173 # JSONPB.
174 translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type)
175 if translator is not None:
176 dg = translator(dg)
177 datagrams.append(dg)
178
179 sio.close()
180 return dg
181 else:
182 raise ValueError('Unknown stream type [%s]' % (se.type,))
183
184 def all_streams(self):
185 return dict((name, self.get(name)) for name in self.streams.iterkeys())
186
187 @property
188 def stream_names(self):
189 return set(self.streams.iterkeys())
190
191 def _remove_from_unregistered(self, s):
192 if id(s) not in self.unregistered:
193 raise KeyError('Stream is not known to be unregistered.')
194 del(self.unregistered[id(s)])
195
196 def _register_stream(self, s, header):
197 name = header.get('name')
198 if name in self.streams:
199 raise KeyError('Duplicate stream [%s]' % (name,))
200
201 self._remove_from_unregistered(s)
202 self.streams[name] = self._StreamEntry(
203 s=s,
204 type=header['type'],
205 content_type=header.get('contentType'),
206 )
207
208 def _register_incomplete(self, s):
209 self._remove_from_unregistered(s)
210 self.incomplete.append(s)
211
212
213 class EnvironmentTest(unittest.TestCase):
214 """Simple test to assert that _Environment, which is stubbed during our tests,
215 actually works."""
216
217 def testEnvironmentProbes(self):
218 stream_logdog._Environment.probe()
219
220
221 class StreamEngineTest(unittest.TestCase):
222
223 def setUp(self):
224 self.client = _TestStreamClient()
225 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3)
226 self.env = stream_logdog._Environment(
227 now_fn=lambda: self.now,
228 argv=[],
229 environ={},
230 cwd=None,
231 )
232 self.maxDiff = 1024*1024
233
234
235 @contextlib.contextmanager
236 def _new_stream_engine(self, **kwargs):
237 kwargs.setdefault('client', self.client)
238 kwargs.setdefault('environment', self.env)
239
240 # Initialize and open a StreamEngine.
241 se = stream_logdog.StreamEngine(**kwargs)
242 se.open()
243 yield se
244
245 # Close the StreamEngine after we're done with it.
246 self._advance_time()
247 se.close()
248
249 @contextlib.contextmanager
250 def _step_stream(self, se, **kwargs):
251 # Initialize and yield a new step stream.
252 self._advance_time()
253 step_stream = se.new_step_stream(recipe_api.StepConfig.create(**kwargs))
254 yield step_stream
255
256 # Close the step stream when we're done with it.
257 self._advance_time()
258 step_stream.close()
259
260 @contextlib.contextmanager
261 def _log_stream(self, step_stream, name):
262 # Initialize and yield a new log stream.
263 log_stream = step_stream.new_log_stream(name)
264 yield log_stream
265
266 # Close the log stream when we're done with it.
267 log_stream.close()
268
269 def _advance_time(self):
270 self.now += datetime.timedelta(seconds=1)
271
272 def testEmptyStreamEngine(self):
273 self.env.argv = ['fake_program', 'arg0', 'arg1']
274 self.env.environ['foo'] = 'bar'
275 self.env.cwd = 'CWD'
276
277 with self._new_stream_engine() as se:
278 pass
279
280 self.assertEqual(self.client.all_streams(), {
281 u'annotations': {
282 u'name': u'steps',
283 u'status': u'SUCCESS',
284 u'started': u'2106-06-12T01:02:03Z',
285 u'ended': u'2106-06-12T01:02:04Z',
286 u'command': {
287 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
288 u'cwd': u'CWD',
289 u'environ': {u'foo': u'bar'},
290 },
291 },
292 })
293
294 def testBasicStream(self):
295 self.env.argv = ['fake_program', 'arg0', 'arg1']
296 self.env.environ['foo'] = 'bar'
297 self.env.cwd = 'CWD'
298
299 with self._new_stream_engine(name_base='test/base') as se:
300 with self._step_stream(se,
301 name='first step',
302 cmd=['first', 'step'],
303 cwd='FIRST_CWD') as step:
304 step.add_step_text('Sup')
305 step.add_step_text('Dawg?')
306 step.write_line('STDOUT for first step.')
307 step.write_line('(Another line)')
308 step.add_step_summary_text('Everything is great.')
309 step.add_step_link('example 1', 'http://example.com/1')
310 step.add_step_link('example 2', 'http://example.com/2')
311 step.set_step_status('SUCCESS')
312
313 with self._step_stream(se, name='second step') as step:
314 step.set_step_status('SUCCESS')
315 step.write_split('multiple\nlines\nof\ntext')
316
317 # Create two log streams with the same name to test indexing.
318 #
319 # Note that "log stream" is an invalid LogDog stream name, so this
320 # will also test normalization.
321 with self._log_stream(step, 'log stream') as ls:
322 ls.write_split('foo\nbar\nbaz\n')
323 with self._log_stream(step, 'log stream') as ls:
324 ls.write_split('qux\nquux\n')
325
326 # This is a different stream name, but will normalize to the same log
327 # stream name as 'second/step', so this will test that we disambiguate
328 # the log stream names.
329 with self._step_stream(se, name='second/step') as step:
330 pass
331
332 self.assertEqual(self.client.all_streams(), {
333 u'test/base/annotations': {
334 u'name': u'steps',
335 u'status': u'SUCCESS',
336 u'started': u'2106-06-12T01:02:03Z',
337 u'ended': u'2106-06-12T01:02:10Z',
338 u'command': {
339 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
340 u'cwd': u'CWD',
341 u'environ': {u'foo': u'bar'},
342 },
343 u'substep': [
344
345 {u'step': {
346 u'name': u'first step',
347 u'status': u'SUCCESS',
348 u'started': u'2106-06-12T01:02:04Z',
349 u'ended': u'2106-06-12T01:02:05Z',
350 u'command': {
351 u'commandLine': [u'first', u'step'],
352 u'cwd': u'FIRST_CWD',
353 },
354 u'stdoutStream': {
355 u'name': u'test/base/steps/first_step/stdout',
356 },
357 u'text': [u'Everything is great.', u'Sup', u'Dawg?'],
358 u'otherLinks': [
359 {
360 u'label': u'example 1',
361 u'url': u'http://example.com/1',
362 },
363 {
364 u'label': u'example 2',
365 u'url': u'http://example.com/2',
366 },
367 ],
368 }},
369
370 {u'step': {
371 u'name': u'second step',
372 u'status': u'SUCCESS',
373 u'started': u'2106-06-12T01:02:06Z',
374 u'ended': u'2106-06-12T01:02:07Z',
375 u'stdoutStream': {
376 u'name': u'test/base/steps/second_step/stdout',
377 },
378 u'otherLinks': [
379 {
380 u'label': u'log stream',
381 u'logdogStream': {
382 u'name': u'test/base/steps/second_step/logs/log_stream/0',
383 },
384 },
385 {
386 u'label': u'log stream',
387 u'logdogStream': {
388 u'name': u'test/base/steps/second_step/logs/log_stream/1',
389 },
390 },
391 ],
392 }},
393
394 {u'step': {
395 u'name': u'second/step',
396 u'status': u'SUCCESS',
397 u'started': u'2106-06-12T01:02:08Z',
398 u'ended': u'2106-06-12T01:02:09Z',
399 }},
400 ],
401 },
402
403 u'test/base/steps/first_step/stdout': [
404 u'STDOUT for first step.',
405 u'(Another line)',
406 ],
407
408 u'test/base/steps/second_step/stdout': [
409 u'multiple',
410 u'lines',
411 u'of',
412 u'text',
413 ],
414
415 u'test/base/steps/second_step/logs/log_stream/0': [
416 u'foo',
417 u'bar',
418 u'baz',
419 ],
420
421 u'test/base/steps/second_step/logs/log_stream/1': [
422 u'qux',
423 u'quux',
424 ],
425 })
426
427 def testWarningBasicStream(self):
428 with self._new_stream_engine() as se:
429 with self._step_stream(se, name='isuck') as step:
430 step.add_step_summary_text('Something went wrong.')
431 step.set_step_status('WARNING')
432
433 self.assertEqual(self.client.all_streams(), {
434 u'annotations': {
435 u'name': u'steps',
436 u'status': u'SUCCESS',
437 u'started': u'2106-06-12T01:02:03Z',
438 u'ended': u'2106-06-12T01:02:06Z',
439 u'substep': [
440
441 {u'step': {
442 u'name': u'isuck',
443 u'status': u'SUCCESS',
444 u'failureDetails': {
445 u'text': u'Something went wrong.',
446 },
447 u'started': u'2106-06-12T01:02:04Z',
448 u'ended': u'2106-06-12T01:02:05Z',
449 u'text': [u'Something went wrong.'],
450 }},
451 ],
452 },
453 })
454
455 def testFailedBasicStream(self):
456 with self._new_stream_engine() as se:
457 with self._step_stream(se, name='isuck') as step:
458 step.add_step_summary_text('Oops I failed.')
459 step.set_step_status('FAILURE')
460
461 with self._step_stream(se, name='irock') as step:
462 pass
463
464 self.assertEqual(self.client.all_streams(), {
465 u'annotations': {
466 u'name': u'steps',
467 u'status': u'FAILURE',
468 u'started': u'2106-06-12T01:02:03Z',
469 u'ended': u'2106-06-12T01:02:08Z',
470 u'substep': [
471
472 {u'step': {
473 u'name': u'isuck',
474 u'status': u'FAILURE',
475 u'failureDetails': {
476 u'text': u'Oops I failed.',
477 },
478 u'started': u'2106-06-12T01:02:04Z',
479 u'ended': u'2106-06-12T01:02:05Z',
480 u'text': [u'Oops I failed.'],
481 }},
482
483 {u'step': {
484 u'name': u'irock',
485 u'status': u'SUCCESS',
486 u'started': u'2106-06-12T01:02:06Z',
487 u'ended': u'2106-06-12T01:02:07Z',
488 }},
489 ],
490 },
491 })
492
493 def testNestedStream(self):
494 with self._new_stream_engine() as se:
495 # parent
496 with self._step_stream(se, name='parent') as step:
497 step.write_line('I am the parent.')
498
499 # parent."child 1"
500 with self._step_stream(se,
501 name='child 1',
502 step_nest_level=1) as step:
503 step.write_line('I am child #1.')
504
505 # parent."child 1"."grandchild"
506 with self._step_stream(se,
507 name='grandchild',
508 step_nest_level=2) as step:
509 step.write_line("I am child #1's child.")
510
511 # parent."child 2". Mark this child as failed. This should not propagate
512 # to the parent, since it has an explicit status.
513 with self._step_stream(se,
514 name='child 2',
515 step_nest_level=1) as step:
516 step.write_line('I am child #2.')
517
518 # parent."child 2". Mark this child as failed. This should not propagate
519 # to the parent, since it has an explicit status.
520 with self._step_stream(se, name='friend') as step:
521 step.write_line("I am the parent's friend.")
522
523 self.assertEqual(self.client.all_streams(), {
524 u'annotations': {
525 u'name': u'steps',
526 u'status': u'SUCCESS',
527 u'started': u'2106-06-12T01:02:03Z',
528 u'ended': u'2106-06-12T01:02:14Z',
529 u'substep': [
530
531 {u'step': {
532 u'name': u'parent',
533 u'status': u'SUCCESS',
534 u'started': u'2106-06-12T01:02:04Z',
535 u'ended': u'2106-06-12T01:02:05Z',
536 u'stdoutStream': {
537 u'name': u'steps/parent/stdout',
538 },
539 u'substep': [
540
541 {u'step': {
542 u'name': u'child 1',
543 u'status': u'SUCCESS',
544 u'started': u'2106-06-12T01:02:06Z',
545 u'ended': u'2106-06-12T01:02:07Z',
546 u'stdoutStream': {
547 u'name': u'steps/parent/steps/child_1/stdout',
548 },
549 u'substep': [
550
551 {u'step': {
552 u'name': u'grandchild',
553 u'status': u'SUCCESS',
554 u'started': u'2106-06-12T01:02:08Z',
555 u'ended': u'2106-06-12T01:02:09Z',
556 u'stdoutStream': {
557 u'name': u'steps/parent/steps/child_1/'
558 'steps/grandchild/stdout',
559 },
560 }},
561 ],
562 }},
563
564 {u'step': {
565 u'name': u'child 2',
566 u'status': u'SUCCESS',
567 u'started': u'2106-06-12T01:02:10Z',
568 u'ended': u'2106-06-12T01:02:11Z',
569 u'stdoutStream': {
570 u'name': u'steps/parent/steps/child_2/stdout',
571 },
572 }},
573 ],
574 }},
575
576 {u'step': {
577 u'name': u'friend',
578 u'status': u'SUCCESS',
579 u'started': u'2106-06-12T01:02:12Z',
580 u'ended': u'2106-06-12T01:02:13Z',
581 u'stdoutStream': {
582 u'name': u'steps/friend/stdout',
583 },
584 }},
585 ],
586 },
587
588 u'steps/parent/stdout': [u'I am the parent.'],
589 u'steps/parent/steps/child_1/stdout': [u'I am child #1.'],
590 u'steps/parent/steps/child_1/steps/grandchild/stdout': [
591 u"I am child #1's child."],
592 u'steps/parent/steps/child_2/stdout': [u'I am child #2.'],
593 u'steps/friend/stdout': [u"I am the parent's friend."],
594 })
595
596 def testTriggersRaiseException(self):
597 with self._new_stream_engine() as se:
598 with self._step_stream(se, name='trigger') as step:
599 with self.assertRaises(NotImplementedError):
600 step.trigger('trigger spec')
601
602 def testTriggersIgnored(self):
603 with self._new_stream_engine(ignore_triggers=True) as se:
604 with self._step_stream(se, name='trigger') as step:
605 step.trigger('trigger spec')
606
607 def testNoSubannotations(self):
608 with self._new_stream_engine(ignore_triggers=True) as se:
609 with self.assertRaises(NotImplementedError):
610 se.new_step_stream(recipe_api.StepConfig.create(
611 name='uses subannotations',
612 allow_subannotations=True,
613 ))
614
615 def testInvalidStepStatusRaisesValueError(self):
616 with self._new_stream_engine() as se:
617 with self._step_stream(se, name='trigger') as step:
618 with self.assertRaises(ValueError):
619 step.set_step_status('OHAI')
620
621
622 class AnnotationMonitorTest(unittest.TestCase):
623 """Tests the stream_logdog._AnnotationMonitor directly."""
624
625 # A small timedelta, sufficient to block but fast enough to not make the
626 # test slow.
627 _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5)
628
629 class _DatagramBuffer(object):
630
631 def __init__(self):
632 self.datagrams = []
633 self.data_event = threading.Event()
634
635 def send(self, dg):
636 self.datagrams.append(dg)
637 self.data_event.set()
638
639 def __len__(self):
640 return len(self.datagrams)
641
642 @property
643 def latest(self):
644 if self.datagrams:
645 return self.datagrams[-1]
646 return None
647
648 def wait_for_data(self):
649 self.data_event.wait()
650 self.data_event.clear()
651 return self.latest
652
653
654 @contextlib.contextmanager
655 def _annotation_monitor(self, **kwargs):
656 # Default to a really high flush period. This should never naturally trigger
657 # during a test case.
658 kwargs.setdefault('flush_period', datetime.timedelta(hours=1))
659
660 am = stream_logdog._AnnotationMonitor(self.db, **kwargs)
661 try:
662 yield am
663 finally:
664 am.flush_and_join()
665
666 with am._lock:
667 # Assert that our timer has been shut down.
668 self.assertIsNone(am._flush_timer)
669 # Assert that there is no buffered data.
670 self.assertIsNone(am._current_data)
671
672 def setUp(self):
673 self.db = self._DatagramBuffer()
674
675 def testMonitorStartsAndJoinsWithNoData(self):
676 with self._annotation_monitor() as am:
677 pass
678
679 # No datagrams should have been sent.
680 self.assertIsNone(self.db.latest)
681 self.assertEqual(len(self.db.datagrams), 0)
682
683 def testMonitorBuffersAndSendsData(self):
684 with self._annotation_monitor() as am:
685 # The first piece of data should have been immediately sent.
686 am.signal_update('initial')
687 self.assertEqual(self.db.wait_for_data(), 'initial')
688
689 # Subsequent iterations should not send data, but should start the flush
690 # timer and buffer the latest data.
691 with am._lock:
692 self.assertIsNone(am._flush_timer)
693 for i in xrange(10):
694 am.signal_update('test%d' % (i,))
695 time.sleep(self._SMALL_TIME_DELTA.total_seconds())
696 with am._lock:
697 self.assertEqual(am._current_data, 'test9')
698 self.assertIsNotNone(am._flush_timer)
699
700 # Pretend the timer triggered. We should receive the latest buffered data.
701 am._flush_timer_expired()
702 self.assertEqual(self.db.wait_for_data(), 'test9')
703 with am._lock:
704 # No more timer or buffered data.
705 self.assertIsNone(am._flush_timer)
706 self.assertIsNone(am._current_data)
707
708 # Send one last chunk of data, but don't let the timer expire. This will
709 # be sent on final flush.
710 am.signal_update('final')
711 with am._lock:
712 self.assertIsNotNone(am._flush_timer)
713
714 # Assert that the final chunk of data was sent.
715 self.assertEqual(self.db.latest, 'final')
716
717 # Only three datagrams should have been sent.
718 self.assertEqual(len(self.db.datagrams), 3)
719
720 def testMonitorIgnoresDuplicateData(self):
721 with self._annotation_monitor() as am:
722 # Get initial data out of the way.
723 am.signal_update('initial')
724 self.assertEqual(self.db.wait_for_data(), 'initial')
725
726 # Send the same thing. It should not be buffered.
727 am.signal_update('initial')
728 with am._lock:
729 self.assertIsNone(am._flush_timer)
730 self.assertIsNone(am._current_data)
731
732 # Only one datagrams should have been sent.
733 self.assertEqual(len(self.db.datagrams), 1)
734
735 def testStructuralUpdateSendsImmediately(self):
736 with self._annotation_monitor() as am:
737 # Get initial data out of the way.
738 am.signal_update('initial')
739 self.assertEqual(self.db.wait_for_data(), 'initial')
740
741 # Send a structural update. It should send immediately.
742 am.signal_update('test', structural=True)
743 self.assertEqual(self.db.wait_for_data(), 'test')
744
745 # Send a duplicate structural update. It should be ignored.
746 am.signal_update('test', structural=True)
747 with am._lock:
748 self.assertIsNone(am._flush_timer)
749 self.assertIsNone(am._current_data)
750
751 # Only two datagrams should have been sent.
752 self.assertEqual(len(self.db.datagrams), 2)
753
754 def testFlushesPeriodically(self):
755 with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am:
756 # Get initial data out of the way.
757 am.signal_update('initial')
758 self.assertEqual(self.db.wait_for_data(), 'initial')
759
760 # Send a structural update. It should send immediately.
761 am.signal_update('test')
762 self.assertEqual(self.db.wait_for_data(), 'test')
763
764 # Only two datagrams should have been sent.
765 self.assertEqual(len(self.db.datagrams), 2)
766
767
768 class AnnotationStateTest(unittest.TestCase):
769 """Tests the stream_logdog._AnnotationState directly."""
770
771 def setUp(self):
772 self.env = stream_logdog._Environment(
773 None,
774 argv=['command', 'arg0', 'arg1'],
775 cwd='path/to/cwd',
776 environ={
777 'foo': 'bar',
778 'FOO': 'baz',
779 },
780 )
781 self.astate = stream_logdog._AnnotationState.create(
782 stream_logdog._StreamName('strean/name'),
783 environment=self.env,
784 properties={'foo': 'bar'},
785 )
786
787 def testFirstCheckReturnsData(self):
788 # The first check should return data.
789 self.assertIsNotNone(self.astate.check())
790 # The second will, since nothing has changed.
791 self.assertIsNone(self.astate.check())
792
793 def testCanCreateAndGetStep(self):
794 # Root step.
795 base = self.astate.base
796 self.astate.create_step(recipe_api.StepConfig.create(name='first'))
797 self.assertEqual(len(base.substep), 1)
798 self.assertEqual(base.substep[0].step.name, 'first')
799 self.assertIsNotNone(self.astate.check())
800
801 # Child step.
802 self.astate.create_step(recipe_api.StepConfig.create(
803 name='first child',
804 step_nest_level=1))
805 self.assertEqual(len(base.substep), 1)
806 self.assertEqual(len(base.substep[0].step.substep), 1)
807 self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child')
808 self.assertIsNotNone(self.astate.check())
809
810 # Sibling step to 'first'.
811 self.astate.create_step(recipe_api.StepConfig.create(name='second'))
812 self.assertEqual(len(base.substep), 2)
813 self.assertEqual(base.substep[1].step.name, 'second')
814 self.assertIsNotNone(self.astate.check())
815
816 def testCanUpdateProperties(self):
817 self.astate.update_properties(foo='baz', qux='quux')
818 self.assertEqual(list(self.astate.base.property), [
819 pb.Step.Property(name='foo', value='baz'),
820 pb.Step.Property(name='qux', value='quux'),
821 ])
822
823
824 class StreamNameTest(unittest.TestCase):
825 """Tests the stream_logdog._StreamName directly."""
826
827 def testEmptyStreamNameRaisesValueError(self):
828 sn = stream_logdog._StreamName(None)
829 with self.assertRaises(ValueError):
830 str(sn)
831
832 def testInvalidBaseRaisesValueError(self):
833 with self.assertRaises(ValueError):
834 stream_logdog._StreamName('!!! invalid !!!')
835
836 def testAppendComponents(self):
837 sn = stream_logdog._StreamName('base')
838 self.assertEqual(str(sn.append()), 'base')
839 self.assertEqual(str(sn.append('foo')), 'base/foo')
840 self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar')
841 self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz')
842
843 def testAugment(self):
844 sn = stream_logdog._StreamName('base')
845 self.assertEqual(str(sn.augment('')), 'base')
846 self.assertEqual(str(sn.augment('foo')), 'basefoo')
847 self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz')
848
849 def testAppendInvalidStreamNameNormalizes(self):
850 sn = stream_logdog._StreamName('base')
851 sn = sn.append('#!!! stream name !!!')
852 self.assertEqual(str(sn), 'base/s______stream_name____')
853
854 def testAugmentInvalidStreamNameNormalizes(self):
855 sn = stream_logdog._StreamName('base')
856 self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____')
857
858
859 if __name__ == '__main__':
860 unittest.main()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698