Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: recipe_engine/unittests/stream_logdog_test.py

Issue 2265673002: Add LogDog / annotation protobuf support. (Closed) Base URL: https://github.com/luci/recipes-py@step-formal-struct
Patch Set: Stronger flush meta logic, moar test. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2015 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file.
5
6 import collections
7 import contextlib
8 import datetime
9 import json
10 import threading
11 import time
12 import unittest
13 import StringIO
14
15 import test_env
16
17 import libs.logdog.stream
18 import libs.logdog.varint
19 from google.protobuf import json_format as jsonpb
20 from recipe_engine import recipe_api
21 from recipe_engine import stream
22 from recipe_engine import stream_logdog
23
24
25 import annotations_pb2 as pb
26
27
28 def _translate_annotation_datagram(dg):
29 """Translate annotation datagram binary data into a Python dict modeled after
30 the JSONPB projection of the datagram.
31
32 This is chosen because it allows for easy idiomatic equality assertions in
33 test cases.
34
35 Args:
36 dg (str): The serialized annotation pb.Step datagram.
37 """
38 msg = pb.Step()
39 msg.ParseFromString(dg)
40 return json.loads(jsonpb.MessageToJson(msg))
41
42
43 class _TestStreamClient(libs.logdog.stream.StreamClient):
44 """A testing StreamClient that retains all data written to it."""
45
46 class Stream(object):
47 """A file-like object that is explicitly aware of LogDog stream protocol."""
48
49 def __init__(self, stream_client):
50 self._client = stream_client
51 self._buf = StringIO.StringIO()
52 self._header = None
53 self._final_data = None
54 self._data_offset = None
55
56 def write(self, data):
57 self._buf.write(data)
58 self._attempt_registration()
59
60 def close(self):
61 # If we never parsed our header, register that we are incomplete.
62 if self._header is None:
63 self._client._register_incomplete(self)
64
65 self._final_data = self.data
66 self._buf.close()
67
68 @contextlib.contextmanager
69 def _read_from(self, offset):
70 # Seek to the specified offset.
71 self._buf.seek(offset, mode=0)
72 try:
73 yield self._buf
74 finally:
75 # Seek back to he end of the stream so future writes will append.
76 self._buf.seek(0, mode=2)
77
78 def _attempt_registration(self):
79 # Only need to register once.
80 if self._header is not None:
81 return
82
83 # Can we parse a full LogDog stream header?
84 #
85 # This means pulling:
86 # - The LogDog Butler header.
87 # - The header size varint.
88 # - The header JSON blob, which needs to be decoded.
89 with self._read_from(0) as fd:
90 # Read 'result' bytes.
91 magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC))
92 if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC):
93 # Incomplete magic number, cannot complete registration.
94 return
95 count = len(magic_data)
96
97 try:
98 size, varint_count = libs.logdog.varint.read_uvarint(fd)
99 except ValueError:
100 # Incomplete varint, cannot complete registration.
101 return
102 count += varint_count
103
104 header_data = fd.read(size)
105 if len(header_data) != size:
106 # Incomplete header, cannot complete registration.
107 return
108 count += size
109
110 # Parse the header as JSON.
111 self._header = json.loads(header_data)
112 self._data_offset = count # (varint + header size)
113 self._client._register_stream(self, self._header)
114
115 @property
116 def data(self):
117 # If we have already cached our data (on close), return it directly.
118 if self._final_data is not None:
119 return self._final_data
120
121 # Load our data from our live buffer.
122 if self._data_offset is None:
123 # No header has been read, so there is no data.
124 return None
125 with self._read_from(self._data_offset) as fd:
126 return fd.read()
127
128
129 _StreamEntry = collections.namedtuple('_StreamEntry', (
130 's', 'type', 'content_type'))
131
132 _DATAGRAM_CONTENT_TRANSLATE = {
133 stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram,
134 }
135
136
137 def __init__(self):
138 super(_TestStreamClient, self).__init__()
139 self.streams = {}
140 self.incomplete = []
141 self.unregistered = {}
142
143 @classmethod
144 def _create(cls, value):
145 raise NotImplementedError('Instances must be created manually.')
146
147 def _connect_raw(self):
148 s = self.Stream(self)
149 self.unregistered[id(s)] = s
150 return s
151
152 def get(self, name):
153 se = self.streams[name]
154 data = se.s.data
155
156 if se.type == libs.logdog.stream.StreamParams.TEXT:
157 # Return text stream data as a list of lines. We use unicode because it
158 # fits in with the JSON dump from 'all_streams'.
159 return [unicode(l) for l in data.splitlines()]
160 elif se.type == libs.logdog.stream.StreamParams.BINARY:
161 raise NotImplementedError('No support for fetching binary stream data.')
162 elif se.type == libs.logdog.stream.StreamParams.DATAGRAM:
163 # Return datagram stream data as a list of datagrams.
164 sio = StringIO.StringIO(data)
165 datagrams = []
166 while sio.tell() < sio.len:
167 size, _ = libs.logdog.varint.read_uvarint(sio)
168 dg = sio.read(size)
169 if len(dg) != size:
170 raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size))
171
172 # If this datagram is a known type (e.g., protobuf), transform it into
173 # JSONPB.
174 translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type)
175 if translator is not None:
176 dg = translator(dg)
177 datagrams.append(dg)
178
179 sio.close()
180 return dg
181 else:
182 raise ValueError('Unknown stream type [%s]' % (se.type,))
183
184 def all_streams(self):
185 return dict((name, self.get(name)) for name in self.streams.iterkeys())
186
187 @property
188 def stream_names(self):
189 return set(self.streams.iterkeys())
190
191 def _remove_from_unregistered(self, s):
192 if id(s) not in self.unregistered:
193 raise KeyError('Stream is not known to be unregistered.')
194 del(self.unregistered[id(s)])
195
196 def _register_stream(self, s, header):
197 name = header.get('name')
198 if name in self.streams:
199 raise KeyError('Duplicate stream [%s]' % (name,))
200
201 self._remove_from_unregistered(s)
202 self.streams[name] = self._StreamEntry(
203 s=s,
204 type=header['type'],
205 content_type=header.get('contentType'),
206 )
207
208 def _register_incomplete(self, s):
209 self._remove_from_unregistered(s)
210 self.incomplete.append(s)
211
212
213 class EnvironmentTest(unittest.TestCase):
214 """Simple test to assert that _Environment, which is stubbed during our tests,
215 actually works."""
216
217 def testRealEnvironment(self):
218 stream_logdog._Environment.real()
219
220
221 class StreamEngineTest(unittest.TestCase):
222
223 def setUp(self):
224 self.client = _TestStreamClient()
225 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3)
226 self.env = stream_logdog._Environment(
227 now_fn=lambda: self.now,
228 argv=[],
229 environ={},
230 cwd=None,
231 )
232 self.maxDiff = 1024*1024
233
234
235 @contextlib.contextmanager
236 def _new_stream_engine(self, **kwargs):
237 kwargs.setdefault('client', self.client)
238 kwargs.setdefault('environment', self.env)
239
240 # Initialize and open a StreamEngine.
241 se = stream_logdog.StreamEngine(**kwargs)
242 se.open()
243 yield se
244
245 # Close the StreamEngine after we're done with it.
246 self._advance_time()
247 se.close()
248
249 @contextlib.contextmanager
250 def _step_stream(self, se, **kwargs):
251 # Initialize and yield a new step stream.
252 self._advance_time()
253 step_stream = se.new_step_stream(recipe_api.StepConfig.create(**kwargs))
254 yield step_stream
255
256 # Close the step stream when we're done with it.
257 self._advance_time()
258 step_stream.close()
259
260 @contextlib.contextmanager
261 def _log_stream(self, step_stream, name):
262 # Initialize and yield a new log stream.
263 log_stream = step_stream.new_log_stream(name)
264 yield log_stream
265
266 # Close the log stream when we're done with it.
267 log_stream.close()
268
269 def _advance_time(self):
270 self.now += datetime.timedelta(seconds=1)
271
272 def testEmptyStreamEngine(self):
273 self.env.argv = ['fake_program', 'arg0', 'arg1']
274 self.env.environ['foo'] = 'bar'
275 self.env.cwd = 'CWD'
276
277 with self._new_stream_engine() as se:
278 pass
279
280 self.assertEqual(self.client.all_streams(), {
281 u'annotations': {
282 u'name': u'steps',
283 u'status': u'SUCCESS',
284 u'started': u'2106-06-12T01:02:03Z',
285 u'ended': u'2106-06-12T01:02:04Z',
286 u'command': {
287 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
288 u'cwd': u'CWD',
289 u'environ': {u'foo': u'bar'},
290 },
291 },
292 })
293
294 def testIncrementalUpdates(self):
295 self.env.argv = ['fake_program', 'arg0', 'arg1']
296 self.env.environ['foo'] = 'bar'
297 self.env.cwd = 'CWD'
298
299 # Create a StreamEngine with an update interval that will trigger each time
300 # _advance_time is called.
301 with self._new_stream_engine(
302 update_interval=datetime.timedelta(seconds=1)) as se:
303 # Initial stream state (no steps).
304 self.assertEqual(self.client.all_streams(), {
305 u'annotations': {
306 u'name': u'steps',
307 u'started': u'2106-06-12T01:02:03Z',
308 u'command': {
309 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
310 u'cwd': u'CWD',
311 u'environ': {u'foo': u'bar'},
312 },
313 },
314 })
315
316 with self._step_stream(se, name='foo') as st:
317 pass
318
319 # Stream state (foo).
320 self.assertEqual(self.client.all_streams(), {
321 u'annotations': {
322 u'name': u'steps',
323 u'started': u'2106-06-12T01:02:03Z',
324 u'command': {
325 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
326 u'cwd': u'CWD',
327 u'environ': {u'foo': u'bar'},
328 },
329
330 u'substep': [
331
332 {u'step': {
333 u'name': u'foo',
334 u'status': u'SUCCESS',
335 u'started': u'2106-06-12T01:02:04Z',
336 u'ended': u'2106-06-12T01:02:05Z',
337 }},
338 ],
339 },
340 })
341
342 with self._step_stream(se, name='bar') as st:
343 pass
344
345 # Stream state (bar).
346 self.assertEqual(self.client.all_streams(), {
347 u'annotations': {
348 u'name': u'steps',
349 u'started': u'2106-06-12T01:02:03Z',
350 u'command': {
351 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
352 u'cwd': u'CWD',
353 u'environ': {u'foo': u'bar'},
354 },
355
356 u'substep': [
357
358 {u'step': {
359 u'name': u'foo',
360 u'status': u'SUCCESS',
361 u'started': u'2106-06-12T01:02:04Z',
362 u'ended': u'2106-06-12T01:02:05Z',
363 }},
364
365 {u'step': {
366 u'name': u'bar',
367 u'status': u'SUCCESS',
368 u'started': u'2106-06-12T01:02:06Z',
369 u'ended': u'2106-06-12T01:02:07Z',
370 }},
371 ],
372 },
373 })
374
375 # Final stream state.
376 self.assertEqual(self.client.all_streams(), {
377 u'annotations': {
378 u'name': u'steps',
379 u'status': u'SUCCESS',
380 u'started': u'2106-06-12T01:02:03Z',
381 u'ended': u'2106-06-12T01:02:08Z',
382 u'command': {
383 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
384 u'cwd': u'CWD',
385 u'environ': {u'foo': u'bar'},
386 },
387
388 u'substep': [
389
390 {u'step': {
391 u'name': u'foo',
392 u'status': u'SUCCESS',
393 u'started': u'2106-06-12T01:02:04Z',
394 u'ended': u'2106-06-12T01:02:05Z',
395 }},
396
397 {u'step': {
398 u'name': u'bar',
399 u'status': u'SUCCESS',
400 u'started': u'2106-06-12T01:02:06Z',
401 u'ended': u'2106-06-12T01:02:07Z',
402 }},
403 ],
404 },
405 })
406
407 def testBasicStream(self):
408 self.env.argv = ['fake_program', 'arg0', 'arg1']
409 self.env.environ['foo'] = 'bar'
410 self.env.cwd = 'CWD'
411
412 with self._new_stream_engine(name_base='test/base') as se:
413 with self._step_stream(se,
414 name='first step',
415 cmd=['first', 'step'],
416 cwd='FIRST_CWD') as step:
417 step.add_step_text('Sup')
418 step.add_step_text('Dawg?')
419 step.write_line('STDOUT for first step.')
420 step.write_line('(Another line)')
421 step.add_step_summary_text('Everything is great.')
422 step.add_step_link('example 1', 'http://example.com/1')
423 step.add_step_link('example 2', 'http://example.com/2')
424 step.set_step_status('SUCCESS')
425
426 with self._step_stream(se, name='second step') as step:
427 step.set_step_status('SUCCESS')
428 step.write_split('multiple\nlines\nof\ntext')
429
430 # Create two log streams with the same name to test indexing.
431 #
432 # Note that "log stream" is an invalid LogDog stream name, so this
433 # will also test normalization.
434 with self._log_stream(step, 'log stream') as ls:
435 ls.write_split('foo\nbar\nbaz\n')
436 with self._log_stream(step, 'log stream') as ls:
437 ls.write_split('qux\nquux\n')
438
439 # This is a different stream name, but will normalize to the same log
440 # stream name as 'second/step', so this will test that we disambiguate
441 # the log stream names.
442 with self._step_stream(se, name='second/step') as step:
443 pass
444
445 self.assertEqual(self.client.all_streams(), {
446 u'test/base/annotations': {
447 u'name': u'steps',
448 u'status': u'SUCCESS',
449 u'started': u'2106-06-12T01:02:03Z',
450 u'ended': u'2106-06-12T01:02:10Z',
451 u'command': {
452 u'commandLine': [u'fake_program', u'arg0', u'arg1'],
453 u'cwd': u'CWD',
454 u'environ': {u'foo': u'bar'},
455 },
456 u'substep': [
457
458 {u'step': {
459 u'name': u'first step',
460 u'status': u'SUCCESS',
461 u'started': u'2106-06-12T01:02:04Z',
462 u'ended': u'2106-06-12T01:02:05Z',
463 u'command': {
464 u'commandLine': [u'first', u'step'],
465 u'cwd': u'FIRST_CWD',
466 },
467 u'stdoutStream': {
468 u'name': u'test/base/steps/first_step/stdout',
469 },
470 u'text': [u'Everything is great.', u'Sup', u'Dawg?'],
471 u'otherLinks': [
472 {
473 u'label': u'example 1',
474 u'url': u'http://example.com/1',
475 },
476 {
477 u'label': u'example 2',
478 u'url': u'http://example.com/2',
479 },
480 ],
481 }},
482
483 {u'step': {
484 u'name': u'second step',
485 u'status': u'SUCCESS',
486 u'started': u'2106-06-12T01:02:06Z',
487 u'ended': u'2106-06-12T01:02:07Z',
488 u'stdoutStream': {
489 u'name': u'test/base/steps/second_step/stdout',
490 },
491 u'otherLinks': [
492 {
493 u'label': u'log stream',
494 u'logdogStream': {
495 u'name': u'test/base/steps/second_step/logs/log_stream/0',
496 },
497 },
498 {
499 u'label': u'log stream',
500 u'logdogStream': {
501 u'name': u'test/base/steps/second_step/logs/log_stream/1',
502 },
503 },
504 ],
505 }},
506
507 {u'step': {
508 u'name': u'second/step',
509 u'status': u'SUCCESS',
510 u'started': u'2106-06-12T01:02:08Z',
511 u'ended': u'2106-06-12T01:02:09Z',
512 }},
513 ],
514 },
515
516 u'test/base/steps/first_step/stdout': [
517 u'STDOUT for first step.',
518 u'(Another line)',
519 ],
520
521 u'test/base/steps/second_step/stdout': [
522 u'multiple',
523 u'lines',
524 u'of',
525 u'text',
526 ],
527
528 u'test/base/steps/second_step/logs/log_stream/0': [
529 u'foo',
530 u'bar',
531 u'baz',
532 ],
533
534 u'test/base/steps/second_step/logs/log_stream/1': [
535 u'qux',
536 u'quux',
537 ],
538 })
539
540 def testWarningBasicStream(self):
541 with self._new_stream_engine() as se:
542 with self._step_stream(se, name='isuck') as step:
543 step.add_step_summary_text('Something went wrong.')
544 step.set_step_status('WARNING')
545
546 self.assertEqual(self.client.all_streams(), {
547 u'annotations': {
548 u'name': u'steps',
549 u'status': u'SUCCESS',
550 u'started': u'2106-06-12T01:02:03Z',
551 u'ended': u'2106-06-12T01:02:06Z',
552 u'substep': [
553
554 {u'step': {
555 u'name': u'isuck',
556 u'status': u'SUCCESS',
557 u'failureDetails': {
558 u'text': u'Something went wrong.',
559 },
560 u'started': u'2106-06-12T01:02:04Z',
561 u'ended': u'2106-06-12T01:02:05Z',
562 u'text': [u'Something went wrong.'],
563 }},
564 ],
565 },
566 })
567
568 def testFailedBasicStream(self):
569 with self._new_stream_engine() as se:
570 with self._step_stream(se, name='isuck') as step:
571 step.add_step_summary_text('Oops I failed.')
572 step.set_step_status('FAILURE')
573
574 with self._step_stream(se, name='irock') as step:
575 pass
576
577 self.assertEqual(self.client.all_streams(), {
578 u'annotations': {
579 u'name': u'steps',
580 u'status': u'FAILURE',
581 u'started': u'2106-06-12T01:02:03Z',
582 u'ended': u'2106-06-12T01:02:08Z',
583 u'substep': [
584
585 {u'step': {
586 u'name': u'isuck',
587 u'status': u'FAILURE',
588 u'failureDetails': {
589 u'text': u'Oops I failed.',
590 },
591 u'started': u'2106-06-12T01:02:04Z',
592 u'ended': u'2106-06-12T01:02:05Z',
593 u'text': [u'Oops I failed.'],
594 }},
595
596 {u'step': {
597 u'name': u'irock',
598 u'status': u'SUCCESS',
599 u'started': u'2106-06-12T01:02:06Z',
600 u'ended': u'2106-06-12T01:02:07Z',
601 }},
602 ],
603 },
604 })
605
606 def testNestedStream(self):
607 with self._new_stream_engine() as se:
608 # parent
609 with self._step_stream(se, name='parent') as step:
610 step.write_line('I am the parent.')
611
612 # parent."child 1"
613 with self._step_stream(se,
614 name='child 1',
615 step_nest_level=1) as step:
616 step.write_line('I am child #1.')
617
618 # parent."child 1"."grandchild"
619 with self._step_stream(se,
620 name='grandchild',
621 step_nest_level=2) as step:
622 step.write_line("I am child #1's child.")
623
624 # parent."child 2". Mark this child as failed. This should not propagate
625 # to the parent, since it has an explicit status.
626 with self._step_stream(se,
627 name='child 2',
628 step_nest_level=1) as step:
629 step.write_line('I am child #2.')
630
631 # parent."child 2". Mark this child as failed. This should not propagate
632 # to the parent, since it has an explicit status.
633 with self._step_stream(se, name='friend') as step:
634 step.write_line("I am the parent's friend.")
635
636 self.assertEqual(self.client.all_streams(), {
637 u'annotations': {
638 u'name': u'steps',
639 u'status': u'SUCCESS',
640 u'started': u'2106-06-12T01:02:03Z',
641 u'ended': u'2106-06-12T01:02:14Z',
642 u'substep': [
643
644 {u'step': {
645 u'name': u'parent',
646 u'status': u'SUCCESS',
647 u'started': u'2106-06-12T01:02:04Z',
648 u'ended': u'2106-06-12T01:02:05Z',
649 u'stdoutStream': {
650 u'name': u'steps/parent/stdout',
651 },
652 u'substep': [
653
654 {u'step': {
655 u'name': u'child 1',
656 u'status': u'SUCCESS',
657 u'started': u'2106-06-12T01:02:06Z',
658 u'ended': u'2106-06-12T01:02:07Z',
659 u'stdoutStream': {
660 u'name': u'steps/parent/steps/child_1/stdout',
661 },
662 u'substep': [
663
664 {u'step': {
665 u'name': u'grandchild',
666 u'status': u'SUCCESS',
667 u'started': u'2106-06-12T01:02:08Z',
668 u'ended': u'2106-06-12T01:02:09Z',
669 u'stdoutStream': {
670 u'name': u'steps/parent/steps/child_1/'
671 'steps/grandchild/stdout',
672 },
673 }},
674 ],
675 }},
676
677 {u'step': {
678 u'name': u'child 2',
679 u'status': u'SUCCESS',
680 u'started': u'2106-06-12T01:02:10Z',
681 u'ended': u'2106-06-12T01:02:11Z',
682 u'stdoutStream': {
683 u'name': u'steps/parent/steps/child_2/stdout',
684 },
685 }},
686 ],
687 }},
688
689 {u'step': {
690 u'name': u'friend',
691 u'status': u'SUCCESS',
692 u'started': u'2106-06-12T01:02:12Z',
693 u'ended': u'2106-06-12T01:02:13Z',
694 u'stdoutStream': {
695 u'name': u'steps/friend/stdout',
696 },
697 }},
698 ],
699 },
700
701 u'steps/parent/stdout': [u'I am the parent.'],
702 u'steps/parent/steps/child_1/stdout': [u'I am child #1.'],
703 u'steps/parent/steps/child_1/steps/grandchild/stdout': [
704 u"I am child #1's child."],
705 u'steps/parent/steps/child_2/stdout': [u'I am child #2.'],
706 u'steps/friend/stdout': [u"I am the parent's friend."],
707 })
708
709 def testTriggersRaiseException(self):
710 with self._new_stream_engine() as se:
711 with self._step_stream(se, name='trigger') as step:
712 with self.assertRaises(NotImplementedError):
713 step.trigger('trigger spec')
714
715 def testTriggersIgnored(self):
716 with self._new_stream_engine(ignore_triggers=True) as se:
717 with self._step_stream(se, name='trigger') as step:
718 step.trigger('trigger spec')
719
720 def testNoSubannotations(self):
721 with self._new_stream_engine(ignore_triggers=True) as se:
722 with self.assertRaises(NotImplementedError):
723 se.new_step_stream(recipe_api.StepConfig.create(
724 name='uses subannotations',
725 allow_subannotations=True,
726 ))
727
728 def testInvalidStepStatusRaisesValueError(self):
729 with self._new_stream_engine() as se:
730 with self._step_stream(se, name='trigger') as step:
731 with self.assertRaises(ValueError):
732 step.set_step_status('OHAI')
733
734
735 class AnnotationMonitorTest(unittest.TestCase):
736 """Tests the stream_logdog._AnnotationMonitor directly."""
737
738 # A small timedelta, sufficient to block but fast enough to not make the
739 # test slow.
740 _SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5)
741
742 class _DatagramBuffer(object):
743
744 def __init__(self):
745 self.datagrams = []
746 self.data_event = threading.Event()
747
748 def send(self, dg):
749 self.datagrams.append(dg)
750 self.data_event.set()
751
752 def __len__(self):
753 return len(self.datagrams)
754
755 @property
756 def latest(self):
757 if self.datagrams:
758 return self.datagrams[-1]
759 return None
760
761 def wait_for_data(self):
762 self.data_event.wait()
763 self.data_event.clear()
764 return self.latest
765
766
767 def setUp(self):
768 self.db = self._DatagramBuffer()
769 self.now = datetime.datetime(2106, 6, 12, 1, 2, 3)
770 self.env = stream_logdog._Environment(
771 now_fn=lambda: self.now,
772 argv=[],
773 environ={},
774 cwd=None,
775 )
776
777 @contextlib.contextmanager
778 def _annotation_monitor(self, flush_period=None):
779 # Use a really high flush period. This should never naturally trigger during
780 # a test case.
781 flush_period = flush_period or datetime.timedelta(hours=1)
782
783 am = stream_logdog._AnnotationMonitor(self.env, self.db, flush_period)
784 try:
785 yield am
786 finally:
787 am.flush_and_join()
788
789 with am._lock:
790 # Assert that our timer has been shut down.
791 self.assertIsNone(am._flush_timer)
792 # Assert that there is no buffered data.
793 self.assertIsNone(am._current_data)
794
795 def testMonitorStartsAndJoinsWithNoData(self):
796 with self._annotation_monitor() as am:
797 pass
798
799 # No datagrams should have been sent.
800 self.assertIsNone(self.db.latest)
801 self.assertEqual(len(self.db.datagrams), 0)
802
803 def testMonitorBuffersAndSendsData(self):
804 with self._annotation_monitor() as am:
805 # The first piece of data should have been immediately sent.
806 am.signal_update('initial')
807 self.assertEqual(self.db.wait_for_data(), 'initial')
808
809 # Subsequent iterations should not send data, but should start the flush
810 # timer and buffer the latest data.
811 with am._lock:
812 self.assertIsNone(am._flush_timer)
813 for i in xrange(10):
814 am.signal_update('test%d' % (i,))
815 time.sleep(self._SMALL_TIME_DELTA.total_seconds())
816 with am._lock:
817 self.assertEqual(am._current_data, 'test9')
818 self.assertIsNotNone(am._flush_timer)
819
820 # Pretend the timer triggered. We should receive the latest buffered data.
821 am._flush_timer_expired()
822 self.assertEqual(self.db.wait_for_data(), 'test9')
823 with am._lock:
824 # No more timer or buffered data.
825 self.assertIsNone(am._flush_timer)
826 self.assertIsNone(am._current_data)
827
828 # Send one last chunk of data, but don't let the timer expire. This will
829 # be sent on final flush.
830 am.signal_update('final')
831 with am._lock:
832 self.assertIsNotNone(am._flush_timer)
833
834 # Assert that the final chunk of data was sent.
835 self.assertEqual(self.db.latest, 'final')
836
837 # Only three datagrams should have been sent.
838 self.assertEqual(len(self.db.datagrams), 3)
839
840 def testMonitorIgnoresDuplicateData(self):
841 with self._annotation_monitor() as am:
842 # Get initial data out of the way.
843 am.signal_update('initial')
844 self.assertEqual(self.db.wait_for_data(), 'initial')
845
846 # Send the same thing. It should not be buffered.
847 am.signal_update('initial')
848 with am._lock:
849 self.assertIsNone(am._flush_timer)
850 self.assertIsNone(am._current_data)
851
852 # Only one datagrams should have been sent.
853 self.assertEqual(len(self.db.datagrams), 1)
854
855 def testStructuralUpdateSendsImmediately(self):
856 with self._annotation_monitor() as am:
857 # Get initial data out of the way.
858 am.signal_update('initial')
859 self.assertEqual(self.db.wait_for_data(), 'initial')
860
861 # Send a structural update. It should send immediately.
862 am.signal_update('test', structural=True)
863 self.assertEqual(self.db.wait_for_data(), 'test')
864
865 # Send a duplicate structural update. It should be ignored.
866 am.signal_update('test', structural=True)
867 with am._lock:
868 self.assertIsNone(am._flush_timer)
869 self.assertIsNone(am._current_data)
870
871 # Only two datagrams should have been sent.
872 self.assertEqual(len(self.db.datagrams), 2)
873
874 def testFlushesPeriodically(self):
875 with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am:
876 # Get initial data out of the way.
877 am.signal_update('initial')
878 self.assertEqual(self.db.wait_for_data(), 'initial')
879
880 # Send a structural update. It should send immediately.
881 am.signal_update('test')
882 self.assertEqual(self.db.wait_for_data(), 'test')
883
884 # Only two datagrams should have been sent.
885 self.assertEqual(len(self.db.datagrams), 2)
886
887
888 class AnnotationStateTest(unittest.TestCase):
889 """Tests the stream_logdog._AnnotationState directly."""
890
891 def setUp(self):
892 self.env = stream_logdog._Environment(
893 None,
894 argv=['command', 'arg0', 'arg1'],
895 cwd='path/to/cwd',
896 environ={
897 'foo': 'bar',
898 'FOO': 'baz',
899 },
900 )
901 self.astate = stream_logdog._AnnotationState.create(
902 stream_logdog._StreamName('strean/name'),
903 environment=self.env,
904 properties={'foo': 'bar'},
905 )
906
907 def testFirstCheckReturnsData(self):
908 # The first check should return data.
909 self.assertIsNotNone(self.astate.check())
910 # The second will, since nothing has changed.
911 self.assertIsNone(self.astate.check())
912
913 def testCanCreateAndGetStep(self):
914 # Root step.
915 base = self.astate.base
916 self.astate.create_step(recipe_api.StepConfig.create(name='first'))
917 self.assertEqual(len(base.substep), 1)
918 self.assertEqual(base.substep[0].step.name, 'first')
919 self.assertIsNotNone(self.astate.check())
920
921 # Child step.
922 self.astate.create_step(recipe_api.StepConfig.create(
923 name='first child',
924 step_nest_level=1))
925 self.assertEqual(len(base.substep), 1)
926 self.assertEqual(len(base.substep[0].step.substep), 1)
927 self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child')
928 self.assertIsNotNone(self.astate.check())
929
930 # Sibling step to 'first'.
931 self.astate.create_step(recipe_api.StepConfig.create(name='second'))
932 self.assertEqual(len(base.substep), 2)
933 self.assertEqual(base.substep[1].step.name, 'second')
934 self.assertIsNotNone(self.astate.check())
935
936 def testCanUpdateProperties(self):
937 self.astate.update_properties(foo='baz', qux='quux')
938 self.assertEqual(list(self.astate.base.property), [
939 pb.Step.Property(name='foo', value='baz'),
940 pb.Step.Property(name='qux', value='quux'),
941 ])
942
943
944 class StreamNameTest(unittest.TestCase):
945 """Tests the stream_logdog._StreamName directly."""
946
947 def testEmptyStreamNameRaisesValueError(self):
948 sn = stream_logdog._StreamName(None)
949 with self.assertRaises(ValueError):
950 str(sn)
951
952 def testInvalidBaseRaisesValueError(self):
953 with self.assertRaises(ValueError):
954 stream_logdog._StreamName('!!! invalid !!!')
955
956 def testAppendComponents(self):
957 sn = stream_logdog._StreamName('base')
958 self.assertEqual(str(sn.append()), 'base')
959 self.assertEqual(str(sn.append('foo')), 'base/foo')
960 self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar')
961 self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz')
962
963 def testAugment(self):
964 sn = stream_logdog._StreamName('base')
965 self.assertEqual(str(sn.augment('')), 'base')
966 self.assertEqual(str(sn.augment('foo')), 'basefoo')
967 self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz')
968
969 def testAppendInvalidStreamNameNormalizes(self):
970 sn = stream_logdog._StreamName('base')
971 sn = sn.append('#!!! stream name !!!')
972 self.assertEqual(str(sn), 'base/s______stream_name____')
973
974 def testAugmentInvalidStreamNameNormalizes(self):
975 sn = stream_logdog._StreamName('base')
976 self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____')
977
978
979 if __name__ == '__main__':
980 unittest.main()
OLDNEW
« no previous file with comments | « recipe_engine/third_party/six/six-1.10.0.dist-info/INSTALLER ('k') | recipe_engine/unittests/stream_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698