Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: infra/libs/event_mon/router.py

Issue 1157383006: Removed infra/libs/event_mon (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « infra/libs/event_mon/monitoring.py ('k') | infra/libs/event_mon/test/__init__.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 import Queue
6 import logging
7 import threading
8 import time
9
10 import httplib2
11 import oauth2client
12
13 from infra.libs.authentication import get_authenticated_http
14 from infra.libs.event_mon.log_request_lite_pb2 import LogRequestLite
15 from infra.libs.event_mon.chrome_infra_log_pb2 import ChromeInfraEvent
16
17 def time_ms():
18 """Return current timestamp in milliseconds."""
19 return int(1000 * time.time())
20
21
22 class _Router(object):
23 """Route events to the right destination.
24
25 This object is meant to be a singleton, and is not part of the API.
26
27 Usage:
28 router = _Router()
29 event = ChromeInfraEvent.LogEventLite(...)
30 ... fill in event ...
31 router.push_event(event)
32 """
33 def __init__(self, cache, endpoint=None):
34 # cache is defined in config.py. Passed as a parameter to avoid
35 # a circular import.
36
37 # endpoint == None means 'dry run'. No data is sent.
38 self.endpoint = endpoint
39 self.http = httplib2.Http()
40 self.cache = cache
41
42 if self.endpoint and self.cache['service_account_creds']:
43 logging.debug('Activating OAuth2 authentication.')
44 self.http = get_authenticated_http(
45 self.cache['service_account_creds'],
46 service_accounts_creds_root=self.cache['service_accounts_creds_root'],
47 scope='https://www.googleapis.com/auth/cclog'
48 )
49
50 self.event_queue = Queue.Queue()
51 self._thread = threading.Thread(target=self._router)
52 self._thread.daemon = True
53 logging.debug('event_mon: starting router thread')
54 self._thread.start()
55
56 def _router(self):
57 while(True): # pragma: no branch
58 events = self.event_queue.get()
59 if events is None:
60 break
61
62 # Set this time at the very last moment
63 events.request_time_ms = time_ms()
64 if self.endpoint: # pragma: no cover
65 logging.info('event_mon: POSTing events to %s', self.endpoint)
66 response, _ = self.http.request(
67 uri=self.endpoint,
68 method='POST',
69 headers={'Content-Type': 'application/octet-stream'},
70 body=events.SerializeToString()
71 )
72
73 if response.status != 200:
74 # TODO(pgervais): implement retry / local storage when this
75 # happens.
76 logging.error('failed to POST data to %s', self.endpoint)
77 logging.error('data: %s', str(events)[:1000])
78 logging.error(response)
79 else:
80 infra_events = [str(ChromeInfraEvent.FromString(
81 ev.source_extension)) for ev in events.log_event]
82 logging.info('Fake post request. Sending:\n%s',
83 '\n'.join(infra_events))
84
85 def close(self, timeout=None):
86 """
87 Returns:
88 success (bool): True if everything went well. Otherwise, there is no
89 guarantee that all events have been properly sent to the remote.
90 """
91 timeout = timeout or 5
92 logging.debug('event_mon: trying to close')
93 self.event_queue.put(None)
94 self._thread.join(timeout)
95 # If the thread is still alive at this point, we can't but wait for a call
96 # to sys.exit. Since we expect this function to be called at the end of the
97 # program, it should come soon.
98 success = not self._thread.is_alive()
99 if success:
100 logging.debug('event_mon: successfully closed.')
101 else: # pragma: no cover
102 logging.debug('event_mon: timeout waiting for thread to finish.')
103 return success
104
105 def push_event(self, event):
106 """Enqueue event to push to the collection service.
107
108 This method offers no guarantee on return that the event have been pushed
109 externally, as some buffering can take place.
110
111 Args:
112 event (LogRequestLite.LogEventLite): one single event.
113 Returns:
114 success (bool): False if an error happened. True means 'event accepted',
115 but NOT 'event successfully pushed to the remote'.
116 """
117 if not isinstance(event, LogRequestLite.LogEventLite):
118 logging.error('Invalid type for "event": %s (should be LogEventLite)'
119 % str(type(event)))
120 return False
121
122 # TODO(pgervais): implement batching.
123 request_p = LogRequestLite()
124 request_p.log_source_name = 'CHROME_INFRA'
125 request_p.log_event.extend((event,)) # copies the protobuf
126 self.event_queue.put(request_p)
127 return True
OLDNEW
« no previous file with comments | « infra/libs/event_mon/monitoring.py ('k') | infra/libs/event_mon/test/__init__.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698