Chromium Code Reviews| Index: appengine/chrome_infra_mon_proxy/test_module.py |
| diff --git a/appengine/chrome_infra_mon_proxy/test_module.py b/appengine/chrome_infra_mon_proxy/test_module.py |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..21b023c35cbb9c21262007ed8045d29b5576f302 |
| --- /dev/null |
| +++ b/appengine/chrome_infra_mon_proxy/test_module.py |
| @@ -0,0 +1,87 @@ |
| +# Copyright 2015 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +import json |
| +import logging |
| +import os |
| +import sys |
| +import webapp2 |
| + |
| +from google.appengine.ext import ndb |
| +from google.appengine.api import users |
| + |
| +import common |
| +import main |
| + |
| +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| +sys.path.insert(0, os.path.join(BASE_DIR, 'components', 'third_party')) |
| + |
| +from components import auth |
| + |
| + |
| +TEST_DATA_KEY = 'test_data_key' |
| + |
| +class LoadTestData(ndb.Model): |
| + """Store load test data. |
| + |
| + To start a load test, set qps to a positive value. |
|
ghost stip (do not use)
2015/04/14 00:36:43
I'm not sure exactly how this tests works. Does it
Sergey Berezin (google)
2015/04/16 04:39:07
The /loadtest endpoint does one simple thing - sto
|
| + """ |
| + qps = ndb.IntegerProperty(default=0) |
| + payload_size = ndb.IntegerProperty(default=1024) |
| + |
| + |
| +class TestHandler(auth.AuthenticatingHandler): |
| + """A /dev/null endpoint to test the entire pipeline. |
| + |
| + This endpoint can be used in place of the MonAcq official endpoint |
| + for testing the proxy. |
| + """ |
| + # Disable XSRF in local dev appserver; otherwise requests will fail. |
| + if common.is_development_server(): |
| + xsrf_token_enforce_on = [] # pragma: no cover |
| + @main.is_group_member('service-account-monitoring-proxy-test') |
| + def get(self): |
| + msg = 'Received GET request.' |
| + self.response.headers['Content-Type'] = 'text/plain' |
| + self.response.out.write(msg) |
| + logging.debug('Received GET at /test') |
| + |
| + @main.is_group_member('service-account-monitoring-proxy-test') |
| + def post(self): |
| + logging.info('Received POST /test: %s', |
| + common.payload_stats(self.request.body)) |
| + |
| + |
| +class LoadTestHandler(webapp2.RequestHandler): |
| + """Store and publish load test parameters. |
| + |
| + Admins can set QPS, and anyone can retrieve it. This endpoint controls |
| + distributed machines sending load test traffic to the proxy. |
| + """ |
| + def get(self, qps): |
| + data = LoadTestData.get_or_insert(TEST_DATA_KEY) |
| + if qps: # Only admins can set a qps value. |
| + if not users.is_current_user_admin(): |
| + self.redirect(users.create_login_url(self.request.url)) |
| + return |
| + qps = int(qps) |
| + if qps < 0: |
| + self.response.write('Error: cannot set QPS to negative value.') |
| + return |
| + data.qps = qps |
| + payload_size = self.request.get('size') |
| + if payload_size: |
| + data.payload_size = int(payload_size) |
| + data.put() |
| + # Return JSON either way. |
| + self.response.write(json.dumps(data.to_dict())) |
| + |
| +logging.basicConfig(level=logging.DEBUG) |
| + |
| +test_handlers = [ |
| + (r'/test', TestHandler), |
| + (r'/loadtest/(.*)', LoadTestHandler), |
| +] |
| + |
| +test = webapp2.WSGIApplication(test_handlers, debug=True) |