Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(794)

Unified Diff: chrome/test/pyautolib/policy_base.py

Issue 9791023: Allow setting of user and device policies in functional tests (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Comments addressed. Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chrome/test/pyautolib/policy_base.py
diff --git a/chrome/test/pyautolib/policy_base.py b/chrome/test/pyautolib/policy_base.py
index db66807245e19cb20e9924c4c966b1ff63c32f71..51a1ee16e2f454d33a7b2d3c565b62cea2fed63d 100644
--- a/chrome/test/pyautolib/policy_base.py
+++ b/chrome/test/pyautolib/policy_base.py
@@ -5,36 +5,34 @@
"""Base class for tests that need to update the policies enforced by Chrome.
-Subclasses can call SetPolicies with a dictionary of policies to install.
-SetPolicies can also be used to set the device policies on ChromeOS.
+Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and
+SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install.
The current implementation depends on the platform. The implementations might
-change in the future, but tests relying on SetPolicies will keep working.
+change in the future, but tests relying on the above calls will keep working.
"""
-# On ChromeOS this relies on the device_management.py part of the TestServer,
-# and forces the policies by triggering a new policy fetch and refreshing the
-# cloud policy providers. This requires preparing the system for policy
-# fetching, which currently means the DMTokens have to be in place. Without the
-# DMTokens, the cloud policy controller won't be able to proceed, because the
-# Gaia tokens for the DMService aren't available during tests.
-# In the future this setup might not be necessary anymore, and the policy
-# might also be pushed through the session_manager.
+# On ChromeOS, both user and device policies are supported. Chrome is set up to
+# fetch user policy from a TestServer. A call to SetUserPolicy triggers an
+# immediate policy refresh, allowing the effects of user policy changes on a
+# running session to be tested.
#
-# On other platforms it relies on the SetPolicies automation call, which is
-# only available on non-official builds. This automation call is meant to be
-# eventually removed when a replacement for every platform is available.
-# This requires setting up the policies in the registry on Windows, and writing
-# the right files on Linux and Mac.
+# Device policy is injected by stopping Chrome and the session manager, writing
+# a new device policy blob and starting Chrome and the session manager again.
+# This means that setting device policy always logs out the current user. It is
+# *not* possible to test the effects on a running session. These limitations
+# stem from the fact that Chrome will only fetch device policy from a TestServer
+# if the device is enterprise owned. Enterprise ownership, in turn, requires
+# ownership of the TPM which can only be undone by reboothing the device (and
+# hence is not possible in a client test).
import json
import logging
import os
import subprocess
import tempfile
-import urllib
-import urllib2
+import asn1der
import pyauto
import pyauto_paths
import pyauto_utils
@@ -42,6 +40,10 @@ import pyauto_utils
if pyauto.PyUITest.IsChromeOS():
import sys
+ import warnings
+ # Ignore deprecation warnings, they make our output more cluttered.
+ warnings.filterwarnings("ignore", category=DeprecationWarning)
Nirnimesh 2012/04/05 19:05:30 nit: use ' instead of " for consistency
bartfab (slow) 2012/04/10 15:27:20 Done.
+
# Find the path to the pyproto and add it to sys.path.
# Prepend it so that google.protobuf is loaded from here.
for path in pyauto_paths.GetBuildDirs():
@@ -51,18 +53,25 @@ if pyauto.PyUITest.IsChromeOS():
'proto')] + sys.path
break
sys.path.append('/usr/local') # to import autotest libs.
- import device_management_local_pb2 as dml
- import device_management_backend_pb2 as dmb
+ sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite'))
+
+ import chrome_device_policy_pb2 as dp
+ import device_management_backend_pb2 as dm
+ import tlslite.api
from autotest.cros import constants
from autotest.cros import cros_ui
elif pyauto.PyUITest.IsWin():
import _winreg as winreg
+# ASN.1 object identifier for PKCS#1/RSA.
+PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
+
class PolicyTestBase(pyauto.PyUITest):
"""A base class for tests that need to set up and modify policies.
- Subclasses can use the SetPolicies call to set the policies seen by Chrome.
+ Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and
+ SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome.
"""
def _WriteFile(self, path, content):
@@ -79,118 +88,25 @@ class PolicyTestBase(pyauto.PyUITest):
return os.path.join(self._temp_data_dir, 'device_management')
def _GetHttpURLForDeviceManagement(self):
+ """Returns the URL at which the TestServer is serving user policy."""
assert self.IsChromeOS()
return self._http_server.GetURL('device_management').spec()
- def _WriteUserPolicyToken(self, token):
- """Writes the given token to the user device management cache."""
- assert self.IsChromeOS()
- blob = dml.DeviceCredentials()
- blob.device_token = token
- blob.device_id = '123'
- self._WriteFile('/home/chronos/user/Device Management/Token',
- blob.SerializeToString())
+ def _WriteDevicePolicyWithSessionManagerStopped(self):
+ """Writes the device policy blob while the session manager is stopped.
- def _WriteDevicePolicy(self, fetch_response):
- """Writes the given signed fetch_response to the device policy cache.
-
- Also writes the owner key, used to verify the signature.
+ Updates the files holding the device policy blob and the public key need to
+ verify its signature.
"""
assert self.IsChromeOS()
- self._WriteFile(constants.SIGNED_POLICY_FILE,
- fetch_response.SerializeToString())
- self._WriteFile(constants.OWNER_KEY_FILE,
- fetch_response.new_public_key)
-
- def _PostToDMServer(self, request_type, body, headers):
- """Posts a request to the TestServer's Device Management interface.
-
- |request_type| is the value of the 'request' HTTP parameter.
- Returns the response's body.
- """
- assert self.IsChromeOS()
- url = self._GetHttpURLForDeviceManagement()
- url += '?' + urllib.urlencode({
- 'deviceid': '123',
- 'oauth_token': '456',
- 'request': request_type,
- 'devicetype': 2,
- 'apptype': 'Chrome',
- 'agent': 'Chrome',
- })
- return urllib2.urlopen(urllib2.Request(url, body, headers)).read()
-
- def _PostRegisterRequest(self, type):
- """Sends a device register request to the TestServer, of the given type."""
- assert self.IsChromeOS()
- request = dmb.DeviceManagementRequest()
- register = request.register_request
- register.machine_id = '789'
- register.type = type
- return self._PostToDMServer('register', request.SerializeToString(), {})
-
- def _RegisterAndGetDMToken(self, device):
- """Registers with the TestServer and returns the DMToken fetched.
-
- Registers for device policy if device is True. Otherwise registers for
- user policy.
- """
- assert self.IsChromeOS()
- type = device and dmb.DeviceRegisterRequest.DEVICE \
- or dmb.DeviceRegisterRequest.USER
- rstring = self._PostRegisterRequest(type)
- response = dmb.DeviceManagementResponse()
- response.ParseFromString(rstring)
- return response.register_response.device_management_token
-
- def _PostPolicyRequest(self, token, type, want_signature=False):
- """Fetches policy from the TestServer, using the given token.
-
- Policy is fetched for the given type. If want_signature is True, the
- request will ask for a signed response. Returns the response body.
- """
- assert self.IsChromeOS()
- request = dmb.DeviceManagementRequest()
- policy = request.policy_request
- prequest = policy.request.add()
- prequest.policy_type = type
- if want_signature:
- prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA
- headers = {
- 'Authorization': 'GoogleDMToken token=' + token,
- }
- return self._PostToDMServer('policy', request.SerializeToString(), headers)
-
- def _FetchPolicy(self, token, device):
- """Fetches policy from the TestServer, using the given token.
-
- Token must be a valid token retrieved with _RegisterAndGetDMToken. If
- device is True, fetches signed device policy. Otherwise fetches user policy.
- This method also verifies the response, and returns the first policy fetch
- response.
- """
- assert self.IsChromeOS()
- type = device and 'google/chromeos/device' or 'google/chromeos/user'
- rstring = self._PostPolicyRequest(token=token, type=type,
- want_signature=device)
- response = dmb.DeviceManagementResponse()
- response.ParseFromString(rstring)
- fetch_response = response.policy_response.response[0]
- assert fetch_response.policy_data
- assert fetch_response.policy_data_signature
- assert fetch_response.new_public_key
- return fetch_response
-
- def _WriteDevicePolicyWithSessionManagerStopped(self, policy):
- """Writes the device policy blob while the Session Manager is stopped."""
- assert self.IsChromeOS()
logging.debug('Stopping session manager')
cros_ui.stop(allow_fail=True)
- logging.debug('Writing device policy cache')
- self._WriteDevicePolicy(policy)
+ logging.debug('Writing device policy blob')
+ self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob)
+ self._WriteFile(constants.OWNER_KEY_FILE, self._public_key)
- # Ugly hack: session manager won't spawn chrome if this file exists. That's
- # usually a good thing (to keep the automation channel open), but in this
+ # Ugly hack: session manager will not spawn Chrome if this file exists. That
+ # is usually a good thing (to keep the automation channel open), but in this
# case we really want to restart chrome. PyUITest.setUp() will be called
# after session manager and chrome have restarted, and will setup the
# automation channel.
@@ -205,7 +121,7 @@ class PolicyTestBase(pyauto.PyUITest):
logging.debug('Starting session manager again')
cros_ui.start()
- # cros_ui.start() waits for the login prompt to be visible, so chrome has
+ # cros_ui.start() waits for the login prompt to be visible, so Chrome has
# already started once it returns.
if restore_magic_file:
open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close()
@@ -215,102 +131,93 @@ class PolicyTestBase(pyauto.PyUITest):
"""Sets up Chrome to use cloud policies on ChromeOS."""
flags = pyauto.PyUITest.ExtraChromeFlags(self)
if self.IsChromeOS():
+ while '--skip-oauth-login' in flags:
+ flags.remove('--skip-oauth-login')
url = self._GetHttpURLForDeviceManagement()
- flag = '--device-management-url=' + url
- flags += [flag]
+ flags.append('--device-management-url=' + url)
+ flags.append('--disable-sync')
return flags
def setUp(self):
"""Sets up the platform for policy testing.
- On ChromeOS, part of the set up involves restarting the session_manager and
- logging in with the $default account.
+ On ChromeOS, part of the setup involves restarting the session manager to
+ inject a device policy blob.
"""
if self.IsChromeOS():
- # Setup a temporary data dir and a TestServer serving files from there.
+ # Set up a temporary data dir and a TestServer serving files from there.
# The TestServer makes its document root relative to the src dir.
self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir())
relative_temp_data_dir = os.path.basename(self._temp_data_dir)
self._http_server = self.StartHTTPServer(relative_temp_data_dir)
- # Setup empty policies, so that the TestServer can start replying.
- self._SetCloudPolicies()
+ # Set up an empty user policy so that the TestServer can start replying.
+ self._SetUserPolicyChromeOS()
- device_dmtoken = self._RegisterAndGetDMToken(device=True)
- policy = self._FetchPolicy(token=device_dmtoken, device=True)
- user_dmtoken = self._RegisterAndGetDMToken(device=False)
+ # Generate a key pair for signing device policy.
+ self._private_key = tlslite.api.generateRSAKey(1024)
+ algorithm = asn1der.Sequence(
+ [asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID),
+ asn1der.Data(asn1der.NULL, '')])
+ rsa_pubkey = asn1der.Sequence([asn1der.Integer(self._private_key.n),
+ asn1der.Integer(self._private_key.e)])
+ self._public_key = asn1der.Sequence(
+ [algorithm, asn1der.Bitstring(rsa_pubkey)])
- # The device policy blob is only picked up by the session manager on
- # startup, and is overwritten on shutdown. So the blob has to be written
- # while the session manager is stopped.
- self.WaitForSessionManagerRestart(
- lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy))
- logging.debug('Session manager restarted with device policy ready')
+ # Clear device policy. This also invokes pyauto.PyUITest.setUp(self).
+ self.SetDevicePolicy()
- pyauto.PyUITest.setUp(self)
-
- if self.IsChromeOS():
- logging.debug('Logging in')
- credentials = self.GetPrivateInfo()['prod_enterprise_test_user']
- self.Login(credentials['username'], credentials['password'])
- assert self.GetLoginInfo()['is_logged_in']
-
- self._WriteUserPolicyToken(user_dmtoken)
- # The browser has to be reloaded to make the user policy token cache
- # reload the file just written. The file can also be written only after
- # the cryptohome is mounted, after login.
- self.RestartBrowser(clear_profile=False)
+ # Remove any existing vaults.
+ self.RemoveAllCryptohomeVaultsOnChromeOS()
+ else:
+ pyauto.PyUITest.setUp(self)
def tearDown(self):
- """Cleans up the files created by setUp and policies added in tests."""
- # Clear the policies.
- self.SetPolicies()
-
+ """Cleans up the policies and related files created in tests."""
if self.IsChromeOS():
- pyauto.PyUITest.Logout(self)
+ # On ChromeOS, clearing device policy logs out the current user so that
+ # no policy is in force. User policy is then permanently cleared at the
+ # end of the method after stopping the TestServer.
+ self.SetDevicePolicy()
+ else:
+ # On other platforms, there is only user policy to clear.
+ self.SetUserPolicy()
pyauto.PyUITest.tearDown(self)
if self.IsChromeOS():
self.StopHTTPServer(self._http_server)
pyauto_utils.RemovePath(self._temp_data_dir)
+ self.RemoveAllCryptohomeVaultsOnChromeOS()
- def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None,
- device=None):
- """Exports the policies to the configuration file of the TestServer.
+ def LoginWithTestAccount(self, account='prod_enterprise_test_user'):
+ """Convenience method for logging in with one of the test accounts."""
+ assert self.IsChromeOS()
+ credentials = self.GetPrivateInfo()[account]
+ self.Login(credentials['username'], credentials['password'])
+ assert self.GetLoginInfo()['is_logged_in']
- The TestServer will serve these policies after this function returns.
+ def SetPolicy(self):
+ raise NotImplementedError('This class supports user and device policies. '
+ 'Instead of SetPolicies, use SetUserPolicy or '
+ 'SetDevicePolicy.')
- Args:
- user_mandatory: user policies of mandatory level
- user_recommended: user policies of recommended level
- device: device policies
- """
+ def _SetUserPolicyChromeOS(self, user_policy=None):
+ """Writes the given user policy to the TestServer's input file."""
assert self.IsChromeOS()
policy_dict = {
- 'google/chromeos/device': device or {},
+ 'google/chromeos/device': {},
'google/chromeos/user': {
- 'mandatory': user_mandatory or {},
- 'recommended': user_recommended or {},
+ 'mandatory': user_policy or {},
+ 'recommended': {},
},
'managed_users': ['*'],
}
self._WriteFile(self._GetTestServerPoliciesFilePath(),
json.dumps(policy_dict, sort_keys=True, indent=2) + '\n')
- def _SetPoliciesWin(self, user_policy=None):
- """Exports the policies as dictionary in the argument to Window registry.
-
- Removes the registry key and its subkeys if they exist.
-
- Args:
- user_policy: A dictionary representing the user policies. Clear the
- registry if None.
-
- Raises:
- TypeError: If an unexpected value is found in the policy dictionary.
- """
-
+ def _SetUserPolicyWin(self, user_policy=None):
+ """Writes the given user policy to the Windows registry."""
def SetValueEx(key, sub_key, value):
if isinstance(value, int):
winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value))
@@ -341,15 +248,8 @@ class PolicyTestBase(pyauto.PyUITest):
SetValueEx(root_key, k, v)
winreg.CloseKey(root_key)
- def _SetPoliciesLinux(self, user_policy=None):
- """Exports the policies as dictionary in the argument to a JSON file.
-
- Removes the JSON file if it exists.
-
- Args:
- user_policy: A dictionary representing the user policies. Remove the
- JSON file if None
- """
+ def _SetUserPolicyLinux(self, user_policy=None):
+ """Writes the given user policy to the JSON policy file read by Chrome."""
assert self.IsLinux()
sudo_cmd_file = os.path.join(os.path.dirname(__file__),
'policy_linux_util.py')
@@ -376,29 +276,101 @@ class PolicyTestBase(pyauto.PyUITest):
'copy', '/tmp/chrome.json', policies_location])
os.remove('/tmp/chrome.json')
- def SetPolicies(self, user_policy=None, device_policy=None):
- """Enforces the policies given in the arguments as dictionaries.
-
- These policies will have been installed after this call returns.
+ def SetUserPolicy(self, user_policy=None):
+ """Sets the user policy provided as a dict.
- Args:
- user_policy: A dictionary representing the user policies.
- device_policy: A dictionary representing the device policies on Chrome OS.
-
- Raises:
- NotImplementedError if the platform is not supported.
- """
+ Passing a value of None clears the user policy."""
if self.IsChromeOS():
- self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy)
+ self._SetUserPolicyChromeOS(user_policy=user_policy)
+ elif self.IsWin():
+ self._SetUserPolicyWin(user_policy=user_policy)
+ elif self.IsLinux():
+ self._SetUserPolicyLinux(user_policy=user_policy)
else:
- if device_policy is not None:
- raise NotImplementedError('Device policy is only available on ChromeOS')
- if self.IsWin():
- self._SetPoliciesWin(user_policy=user_policy)
- elif self.IsLinux():
- self._SetPoliciesLinux(user_policy=user_policy)
- else:
- raise NotImplementedError('Not available on this platform.')
+ raise NotImplementedError('Not available on this platform.')
self.RefreshPolicies()
+ def _SetProtobufMessageField(self, group_message, field, field_value):
+ """Sets the given field in a protobuf to the given value."""
+ if field.label == field.LABEL_REPEATED:
+ assert type(field_value) == list
+ entries = group_message.__getattribute__(field.name)
+ for list_item in field_value:
+ entries.append(list_item)
+ return
+ elif field.type == field.TYPE_BOOL:
+ assert type(field_value) == bool
+ elif field.type == field.TYPE_STRING:
+ assert type(field_value) == str or type(field_value) == unicode
+ elif field.type == field.TYPE_INT64:
+ assert type(field_value) == int
+ elif (field.type == field.TYPE_MESSAGE and
+ field.message_type.name == 'StringList'):
+ assert type(field_value) == list
+ entries = group_message.__getattribute__(field.name).entries
+ for list_item in field_value:
+ entries.append(list_item)
+ return
+ else:
+ raise Exception('Unknown field type %s' % field.type)
+ group_message.__setattr__(field.name, field_value)
+
+ def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None):
+ """Generates a signed device policy blob."""
+
+ # Fill in the device settings protobuf.
+ device_policy = device_policy or {}
+ owner = owner or constants.CREDENTIALS['$mockowner'][0]
+ settings = dp.ChromeDeviceSettingsProto()
+ for group in settings.DESCRIPTOR.fields:
+ # Create protobuf message for group.
+ group_message = eval('dp.' + group.message_type.name + '()')
+ # Indicates if at least one field was set in |group_message|.
+ got_fields = False
+ # Iterate over fields of the message and feed them from the policy dict.
+ for field in group_message.DESCRIPTOR.fields:
+ field_value = None
+ if field.name in device_policy:
+ got_fields = True
+ field_value = device_policy[field.name]
+ self._SetProtobufMessageField(group_message, field, field_value)
+ if got_fields:
+ settings.__getattribute__(group.name).CopyFrom(group_message)
+
+ # Fill in the policy data protobuf.
+ policy_data = dm.PolicyData()
+ policy_data.policy_type = 'google/chromeos/device'
+ policy_data.policy_value = settings.SerializeToString()
+ policy_data.username = owner
+ serialized_policy_data = policy_data.SerializeToString()
+
+ # Fill in the device management response protobuf.
+ response = dm.DeviceManagementResponse()
+ fetch_response = response.policy_response.response.add()
+ fetch_response.policy_data = serialized_policy_data
+ fetch_response.policy_data_signature = (
+ self._private_key.hashAndSign(serialized_policy_data).tostring())
+
+ self._device_policy_blob = fetch_response.SerializeToString()
+
+ def _RefreshDevicePolicy(self):
+ """Refreshes the device policy in force on ChromeOS."""
+ assert self.IsChromeOS()
+ # The device policy blob is only picked up by the session manager on
+ # startup, and is overwritten on shutdown. So the blob has to be written
+ # while the session manager is stopped.
+ self.WaitForSessionManagerRestart(
+ lambda: self._WriteDevicePolicyWithSessionManagerStopped())
Nirnimesh 2012/04/05 19:05:30 lambda is redundant. self.WaitForSessionManagerRe
bartfab (slow) 2012/04/10 15:27:20 Done.
+ logging.debug('Session manager restarted with device policy ready')
+ pyauto.PyUITest.setUp(self)
+
+ def SetDevicePolicy(self, device_policy=None, owner=None):
+ """Sets the device policy provided as a dict and the owner on ChromeOS.
+
+ Passing a value of None as the device policy clears it."""
+ if not self.IsChromeOS():
+ raise NotImplementedError('Device policy is only available on ChromeOS.')
+
+ self._GenerateDevicePolicyBlob(device_policy, owner)
+ self._RefreshDevicePolicy()

Powered by Google App Engine
This is Rietveld 408576698