| OLD | NEW |
| (Empty) |
| 1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 import Queue | |
| 6 import logging | |
| 7 import threading | |
| 8 import time | |
| 9 | |
| 10 import httplib2 | |
| 11 import oauth2client | |
| 12 | |
| 13 from infra.libs.authentication import get_authenticated_http | |
| 14 from infra.libs.event_mon.log_request_lite_pb2 import LogRequestLite | |
| 15 from infra.libs.event_mon.chrome_infra_log_pb2 import ChromeInfraEvent | |
| 16 | |
| 17 def time_ms(): | |
| 18 """Return current timestamp in milliseconds.""" | |
| 19 return int(1000 * time.time()) | |
| 20 | |
| 21 | |
| 22 class _Router(object): | |
| 23 """Route events to the right destination. | |
| 24 | |
| 25 This object is meant to be a singleton, and is not part of the API. | |
| 26 | |
| 27 Usage: | |
| 28 router = _Router() | |
| 29 event = ChromeInfraEvent.LogEventLite(...) | |
| 30 ... fill in event ... | |
| 31 router.push_event(event) | |
| 32 """ | |
| 33 def __init__(self, cache, endpoint=None): | |
| 34 # cache is defined in config.py. Passed as a parameter to avoid | |
| 35 # a circular import. | |
| 36 | |
| 37 # endpoint == None means 'dry run'. No data is sent. | |
| 38 self.endpoint = endpoint | |
| 39 self.http = httplib2.Http() | |
| 40 self.cache = cache | |
| 41 | |
| 42 if self.endpoint and self.cache['service_account_creds']: | |
| 43 logging.debug('Activating OAuth2 authentication.') | |
| 44 self.http = get_authenticated_http( | |
| 45 self.cache['service_account_creds'], | |
| 46 service_accounts_creds_root=self.cache['service_accounts_creds_root'], | |
| 47 scope='https://www.googleapis.com/auth/cclog' | |
| 48 ) | |
| 49 | |
| 50 self.event_queue = Queue.Queue() | |
| 51 self._thread = threading.Thread(target=self._router) | |
| 52 self._thread.daemon = True | |
| 53 logging.debug('event_mon: starting router thread') | |
| 54 self._thread.start() | |
| 55 | |
| 56 def _router(self): | |
| 57 while(True): # pragma: no branch | |
| 58 events = self.event_queue.get() | |
| 59 if events is None: | |
| 60 break | |
| 61 | |
| 62 # Set this time at the very last moment | |
| 63 events.request_time_ms = time_ms() | |
| 64 if self.endpoint: # pragma: no cover | |
| 65 logging.info('event_mon: POSTing events to %s', self.endpoint) | |
| 66 response, _ = self.http.request( | |
| 67 uri=self.endpoint, | |
| 68 method='POST', | |
| 69 headers={'Content-Type': 'application/octet-stream'}, | |
| 70 body=events.SerializeToString() | |
| 71 ) | |
| 72 | |
| 73 if response.status != 200: | |
| 74 # TODO(pgervais): implement retry / local storage when this | |
| 75 # happens. | |
| 76 logging.error('failed to POST data to %s', self.endpoint) | |
| 77 logging.error('data: %s', str(events)[:1000]) | |
| 78 logging.error(response) | |
| 79 else: | |
| 80 infra_events = [str(ChromeInfraEvent.FromString( | |
| 81 ev.source_extension)) for ev in events.log_event] | |
| 82 logging.info('Fake post request. Sending:\n%s', | |
| 83 '\n'.join(infra_events)) | |
| 84 | |
| 85 def close(self, timeout=None): | |
| 86 """ | |
| 87 Returns: | |
| 88 success (bool): True if everything went well. Otherwise, there is no | |
| 89 guarantee that all events have been properly sent to the remote. | |
| 90 """ | |
| 91 timeout = timeout or 5 | |
| 92 logging.debug('event_mon: trying to close') | |
| 93 self.event_queue.put(None) | |
| 94 self._thread.join(timeout) | |
| 95 # If the thread is still alive at this point, we can't but wait for a call | |
| 96 # to sys.exit. Since we expect this function to be called at the end of the | |
| 97 # program, it should come soon. | |
| 98 success = not self._thread.is_alive() | |
| 99 if success: | |
| 100 logging.debug('event_mon: successfully closed.') | |
| 101 else: # pragma: no cover | |
| 102 logging.debug('event_mon: timeout waiting for thread to finish.') | |
| 103 return success | |
| 104 | |
| 105 def push_event(self, event): | |
| 106 """Enqueue event to push to the collection service. | |
| 107 | |
| 108 This method offers no guarantee on return that the event have been pushed | |
| 109 externally, as some buffering can take place. | |
| 110 | |
| 111 Args: | |
| 112 event (LogRequestLite.LogEventLite): one single event. | |
| 113 Returns: | |
| 114 success (bool): False if an error happened. True means 'event accepted', | |
| 115 but NOT 'event successfully pushed to the remote'. | |
| 116 """ | |
| 117 if not isinstance(event, LogRequestLite.LogEventLite): | |
| 118 logging.error('Invalid type for "event": %s (should be LogEventLite)' | |
| 119 % str(type(event))) | |
| 120 return False | |
| 121 | |
| 122 # TODO(pgervais): implement batching. | |
| 123 request_p = LogRequestLite() | |
| 124 request_p.log_source_name = 'CHROME_INFRA' | |
| 125 request_p.log_event.extend((event,)) # copies the protobuf | |
| 126 self.event_queue.put(request_p) | |
| 127 return True | |
| OLD | NEW |