Index: chrome/test/pyautolib/policy_base.py |
diff --git a/chrome/test/pyautolib/policy_base.py b/chrome/test/pyautolib/policy_base.py |
index db66807245e19cb20e9924c4c966b1ff63c32f71..51a1ee16e2f454d33a7b2d3c565b62cea2fed63d 100644 |
--- a/chrome/test/pyautolib/policy_base.py |
+++ b/chrome/test/pyautolib/policy_base.py |
@@ -5,36 +5,34 @@ |
"""Base class for tests that need to update the policies enforced by Chrome. |
-Subclasses can call SetPolicies with a dictionary of policies to install. |
-SetPolicies can also be used to set the device policies on ChromeOS. |
+Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and |
+SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install. |
The current implementation depends on the platform. The implementations might |
-change in the future, but tests relying on SetPolicies will keep working. |
+change in the future, but tests relying on the above calls will keep working. |
""" |
-# On ChromeOS this relies on the device_management.py part of the TestServer, |
-# and forces the policies by triggering a new policy fetch and refreshing the |
-# cloud policy providers. This requires preparing the system for policy |
-# fetching, which currently means the DMTokens have to be in place. Without the |
-# DMTokens, the cloud policy controller won't be able to proceed, because the |
-# Gaia tokens for the DMService aren't available during tests. |
-# In the future this setup might not be necessary anymore, and the policy |
-# might also be pushed through the session_manager. |
+# On ChromeOS, both user and device policies are supported. Chrome is set up to |
+# fetch user policy from a TestServer. A call to SetUserPolicy triggers an |
+# immediate policy refresh, allowing the effects of user policy changes on a |
+# running session to be tested. |
# |
-# On other platforms it relies on the SetPolicies automation call, which is |
-# only available on non-official builds. This automation call is meant to be |
-# eventually removed when a replacement for every platform is available. |
-# This requires setting up the policies in the registry on Windows, and writing |
-# the right files on Linux and Mac. |
+# Device policy is injected by stopping Chrome and the session manager, writing |
+# a new device policy blob and starting Chrome and the session manager again. |
+# This means that setting device policy always logs out the current user. It is |
+# *not* possible to test the effects on a running session. These limitations |
+# stem from the fact that Chrome will only fetch device policy from a TestServer |
+# if the device is enterprise owned. Enterprise ownership, in turn, requires |
+# ownership of the TPM which can only be undone by reboothing the device (and |
+# hence is not possible in a client test). |
import json |
import logging |
import os |
import subprocess |
import tempfile |
-import urllib |
-import urllib2 |
+import asn1der |
import pyauto |
import pyauto_paths |
import pyauto_utils |
@@ -42,6 +40,10 @@ import pyauto_utils |
if pyauto.PyUITest.IsChromeOS(): |
import sys |
+ import warnings |
+ # Ignore deprecation warnings, they make our output more cluttered. |
+ warnings.filterwarnings("ignore", category=DeprecationWarning) |
Nirnimesh
2012/04/05 19:05:30
nit: use ' instead of " for consistency
bartfab (slow)
2012/04/10 15:27:20
Done.
|
+ |
# Find the path to the pyproto and add it to sys.path. |
# Prepend it so that google.protobuf is loaded from here. |
for path in pyauto_paths.GetBuildDirs(): |
@@ -51,18 +53,25 @@ if pyauto.PyUITest.IsChromeOS(): |
'proto')] + sys.path |
break |
sys.path.append('/usr/local') # to import autotest libs. |
- import device_management_local_pb2 as dml |
- import device_management_backend_pb2 as dmb |
+ sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite')) |
+ |
+ import chrome_device_policy_pb2 as dp |
+ import device_management_backend_pb2 as dm |
+ import tlslite.api |
from autotest.cros import constants |
from autotest.cros import cros_ui |
elif pyauto.PyUITest.IsWin(): |
import _winreg as winreg |
+# ASN.1 object identifier for PKCS#1/RSA. |
+PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' |
+ |
class PolicyTestBase(pyauto.PyUITest): |
"""A base class for tests that need to set up and modify policies. |
- Subclasses can use the SetPolicies call to set the policies seen by Chrome. |
+ Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and |
+ SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome. |
""" |
def _WriteFile(self, path, content): |
@@ -79,118 +88,25 @@ class PolicyTestBase(pyauto.PyUITest): |
return os.path.join(self._temp_data_dir, 'device_management') |
def _GetHttpURLForDeviceManagement(self): |
+ """Returns the URL at which the TestServer is serving user policy.""" |
assert self.IsChromeOS() |
return self._http_server.GetURL('device_management').spec() |
- def _WriteUserPolicyToken(self, token): |
- """Writes the given token to the user device management cache.""" |
- assert self.IsChromeOS() |
- blob = dml.DeviceCredentials() |
- blob.device_token = token |
- blob.device_id = '123' |
- self._WriteFile('/home/chronos/user/Device Management/Token', |
- blob.SerializeToString()) |
+ def _WriteDevicePolicyWithSessionManagerStopped(self): |
+ """Writes the device policy blob while the session manager is stopped. |
- def _WriteDevicePolicy(self, fetch_response): |
- """Writes the given signed fetch_response to the device policy cache. |
- |
- Also writes the owner key, used to verify the signature. |
+ Updates the files holding the device policy blob and the public key need to |
+ verify its signature. |
""" |
assert self.IsChromeOS() |
- self._WriteFile(constants.SIGNED_POLICY_FILE, |
- fetch_response.SerializeToString()) |
- self._WriteFile(constants.OWNER_KEY_FILE, |
- fetch_response.new_public_key) |
- |
- def _PostToDMServer(self, request_type, body, headers): |
- """Posts a request to the TestServer's Device Management interface. |
- |
- |request_type| is the value of the 'request' HTTP parameter. |
- Returns the response's body. |
- """ |
- assert self.IsChromeOS() |
- url = self._GetHttpURLForDeviceManagement() |
- url += '?' + urllib.urlencode({ |
- 'deviceid': '123', |
- 'oauth_token': '456', |
- 'request': request_type, |
- 'devicetype': 2, |
- 'apptype': 'Chrome', |
- 'agent': 'Chrome', |
- }) |
- return urllib2.urlopen(urllib2.Request(url, body, headers)).read() |
- |
- def _PostRegisterRequest(self, type): |
- """Sends a device register request to the TestServer, of the given type.""" |
- assert self.IsChromeOS() |
- request = dmb.DeviceManagementRequest() |
- register = request.register_request |
- register.machine_id = '789' |
- register.type = type |
- return self._PostToDMServer('register', request.SerializeToString(), {}) |
- |
- def _RegisterAndGetDMToken(self, device): |
- """Registers with the TestServer and returns the DMToken fetched. |
- |
- Registers for device policy if device is True. Otherwise registers for |
- user policy. |
- """ |
- assert self.IsChromeOS() |
- type = device and dmb.DeviceRegisterRequest.DEVICE \ |
- or dmb.DeviceRegisterRequest.USER |
- rstring = self._PostRegisterRequest(type) |
- response = dmb.DeviceManagementResponse() |
- response.ParseFromString(rstring) |
- return response.register_response.device_management_token |
- |
- def _PostPolicyRequest(self, token, type, want_signature=False): |
- """Fetches policy from the TestServer, using the given token. |
- |
- Policy is fetched for the given type. If want_signature is True, the |
- request will ask for a signed response. Returns the response body. |
- """ |
- assert self.IsChromeOS() |
- request = dmb.DeviceManagementRequest() |
- policy = request.policy_request |
- prequest = policy.request.add() |
- prequest.policy_type = type |
- if want_signature: |
- prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA |
- headers = { |
- 'Authorization': 'GoogleDMToken token=' + token, |
- } |
- return self._PostToDMServer('policy', request.SerializeToString(), headers) |
- |
- def _FetchPolicy(self, token, device): |
- """Fetches policy from the TestServer, using the given token. |
- |
- Token must be a valid token retrieved with _RegisterAndGetDMToken. If |
- device is True, fetches signed device policy. Otherwise fetches user policy. |
- This method also verifies the response, and returns the first policy fetch |
- response. |
- """ |
- assert self.IsChromeOS() |
- type = device and 'google/chromeos/device' or 'google/chromeos/user' |
- rstring = self._PostPolicyRequest(token=token, type=type, |
- want_signature=device) |
- response = dmb.DeviceManagementResponse() |
- response.ParseFromString(rstring) |
- fetch_response = response.policy_response.response[0] |
- assert fetch_response.policy_data |
- assert fetch_response.policy_data_signature |
- assert fetch_response.new_public_key |
- return fetch_response |
- |
- def _WriteDevicePolicyWithSessionManagerStopped(self, policy): |
- """Writes the device policy blob while the Session Manager is stopped.""" |
- assert self.IsChromeOS() |
logging.debug('Stopping session manager') |
cros_ui.stop(allow_fail=True) |
- logging.debug('Writing device policy cache') |
- self._WriteDevicePolicy(policy) |
+ logging.debug('Writing device policy blob') |
+ self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob) |
+ self._WriteFile(constants.OWNER_KEY_FILE, self._public_key) |
- # Ugly hack: session manager won't spawn chrome if this file exists. That's |
- # usually a good thing (to keep the automation channel open), but in this |
+ # Ugly hack: session manager will not spawn Chrome if this file exists. That |
+ # is usually a good thing (to keep the automation channel open), but in this |
# case we really want to restart chrome. PyUITest.setUp() will be called |
# after session manager and chrome have restarted, and will setup the |
# automation channel. |
@@ -205,7 +121,7 @@ class PolicyTestBase(pyauto.PyUITest): |
logging.debug('Starting session manager again') |
cros_ui.start() |
- # cros_ui.start() waits for the login prompt to be visible, so chrome has |
+ # cros_ui.start() waits for the login prompt to be visible, so Chrome has |
# already started once it returns. |
if restore_magic_file: |
open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() |
@@ -215,102 +131,93 @@ class PolicyTestBase(pyauto.PyUITest): |
"""Sets up Chrome to use cloud policies on ChromeOS.""" |
flags = pyauto.PyUITest.ExtraChromeFlags(self) |
if self.IsChromeOS(): |
+ while '--skip-oauth-login' in flags: |
+ flags.remove('--skip-oauth-login') |
url = self._GetHttpURLForDeviceManagement() |
- flag = '--device-management-url=' + url |
- flags += [flag] |
+ flags.append('--device-management-url=' + url) |
+ flags.append('--disable-sync') |
return flags |
def setUp(self): |
"""Sets up the platform for policy testing. |
- On ChromeOS, part of the set up involves restarting the session_manager and |
- logging in with the $default account. |
+ On ChromeOS, part of the setup involves restarting the session manager to |
+ inject a device policy blob. |
""" |
if self.IsChromeOS(): |
- # Setup a temporary data dir and a TestServer serving files from there. |
+ # Set up a temporary data dir and a TestServer serving files from there. |
# The TestServer makes its document root relative to the src dir. |
self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) |
relative_temp_data_dir = os.path.basename(self._temp_data_dir) |
self._http_server = self.StartHTTPServer(relative_temp_data_dir) |
- # Setup empty policies, so that the TestServer can start replying. |
- self._SetCloudPolicies() |
+ # Set up an empty user policy so that the TestServer can start replying. |
+ self._SetUserPolicyChromeOS() |
- device_dmtoken = self._RegisterAndGetDMToken(device=True) |
- policy = self._FetchPolicy(token=device_dmtoken, device=True) |
- user_dmtoken = self._RegisterAndGetDMToken(device=False) |
+ # Generate a key pair for signing device policy. |
+ self._private_key = tlslite.api.generateRSAKey(1024) |
+ algorithm = asn1der.Sequence( |
+ [asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), |
+ asn1der.Data(asn1der.NULL, '')]) |
+ rsa_pubkey = asn1der.Sequence([asn1der.Integer(self._private_key.n), |
+ asn1der.Integer(self._private_key.e)]) |
+ self._public_key = asn1der.Sequence( |
+ [algorithm, asn1der.Bitstring(rsa_pubkey)]) |
- # The device policy blob is only picked up by the session manager on |
- # startup, and is overwritten on shutdown. So the blob has to be written |
- # while the session manager is stopped. |
- self.WaitForSessionManagerRestart( |
- lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy)) |
- logging.debug('Session manager restarted with device policy ready') |
+ # Clear device policy. This also invokes pyauto.PyUITest.setUp(self). |
+ self.SetDevicePolicy() |
- pyauto.PyUITest.setUp(self) |
- |
- if self.IsChromeOS(): |
- logging.debug('Logging in') |
- credentials = self.GetPrivateInfo()['prod_enterprise_test_user'] |
- self.Login(credentials['username'], credentials['password']) |
- assert self.GetLoginInfo()['is_logged_in'] |
- |
- self._WriteUserPolicyToken(user_dmtoken) |
- # The browser has to be reloaded to make the user policy token cache |
- # reload the file just written. The file can also be written only after |
- # the cryptohome is mounted, after login. |
- self.RestartBrowser(clear_profile=False) |
+ # Remove any existing vaults. |
+ self.RemoveAllCryptohomeVaultsOnChromeOS() |
+ else: |
+ pyauto.PyUITest.setUp(self) |
def tearDown(self): |
- """Cleans up the files created by setUp and policies added in tests.""" |
- # Clear the policies. |
- self.SetPolicies() |
- |
+ """Cleans up the policies and related files created in tests.""" |
if self.IsChromeOS(): |
- pyauto.PyUITest.Logout(self) |
+ # On ChromeOS, clearing device policy logs out the current user so that |
+ # no policy is in force. User policy is then permanently cleared at the |
+ # end of the method after stopping the TestServer. |
+ self.SetDevicePolicy() |
+ else: |
+ # On other platforms, there is only user policy to clear. |
+ self.SetUserPolicy() |
pyauto.PyUITest.tearDown(self) |
if self.IsChromeOS(): |
self.StopHTTPServer(self._http_server) |
pyauto_utils.RemovePath(self._temp_data_dir) |
+ self.RemoveAllCryptohomeVaultsOnChromeOS() |
- def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, |
- device=None): |
- """Exports the policies to the configuration file of the TestServer. |
+ def LoginWithTestAccount(self, account='prod_enterprise_test_user'): |
+ """Convenience method for logging in with one of the test accounts.""" |
+ assert self.IsChromeOS() |
+ credentials = self.GetPrivateInfo()[account] |
+ self.Login(credentials['username'], credentials['password']) |
+ assert self.GetLoginInfo()['is_logged_in'] |
- The TestServer will serve these policies after this function returns. |
+ def SetPolicy(self): |
+ raise NotImplementedError('This class supports user and device policies. ' |
+ 'Instead of SetPolicies, use SetUserPolicy or ' |
+ 'SetDevicePolicy.') |
- Args: |
- user_mandatory: user policies of mandatory level |
- user_recommended: user policies of recommended level |
- device: device policies |
- """ |
+ def _SetUserPolicyChromeOS(self, user_policy=None): |
+ """Writes the given user policy to the TestServer's input file.""" |
assert self.IsChromeOS() |
policy_dict = { |
- 'google/chromeos/device': device or {}, |
+ 'google/chromeos/device': {}, |
'google/chromeos/user': { |
- 'mandatory': user_mandatory or {}, |
- 'recommended': user_recommended or {}, |
+ 'mandatory': user_policy or {}, |
+ 'recommended': {}, |
}, |
'managed_users': ['*'], |
} |
self._WriteFile(self._GetTestServerPoliciesFilePath(), |
json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') |
- def _SetPoliciesWin(self, user_policy=None): |
- """Exports the policies as dictionary in the argument to Window registry. |
- |
- Removes the registry key and its subkeys if they exist. |
- |
- Args: |
- user_policy: A dictionary representing the user policies. Clear the |
- registry if None. |
- |
- Raises: |
- TypeError: If an unexpected value is found in the policy dictionary. |
- """ |
- |
+ def _SetUserPolicyWin(self, user_policy=None): |
+ """Writes the given user policy to the Windows registry.""" |
def SetValueEx(key, sub_key, value): |
if isinstance(value, int): |
winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) |
@@ -341,15 +248,8 @@ class PolicyTestBase(pyauto.PyUITest): |
SetValueEx(root_key, k, v) |
winreg.CloseKey(root_key) |
- def _SetPoliciesLinux(self, user_policy=None): |
- """Exports the policies as dictionary in the argument to a JSON file. |
- |
- Removes the JSON file if it exists. |
- |
- Args: |
- user_policy: A dictionary representing the user policies. Remove the |
- JSON file if None |
- """ |
+ def _SetUserPolicyLinux(self, user_policy=None): |
+ """Writes the given user policy to the JSON policy file read by Chrome.""" |
assert self.IsLinux() |
sudo_cmd_file = os.path.join(os.path.dirname(__file__), |
'policy_linux_util.py') |
@@ -376,29 +276,101 @@ class PolicyTestBase(pyauto.PyUITest): |
'copy', '/tmp/chrome.json', policies_location]) |
os.remove('/tmp/chrome.json') |
- def SetPolicies(self, user_policy=None, device_policy=None): |
- """Enforces the policies given in the arguments as dictionaries. |
- |
- These policies will have been installed after this call returns. |
+ def SetUserPolicy(self, user_policy=None): |
+ """Sets the user policy provided as a dict. |
- Args: |
- user_policy: A dictionary representing the user policies. |
- device_policy: A dictionary representing the device policies on Chrome OS. |
- |
- Raises: |
- NotImplementedError if the platform is not supported. |
- """ |
+ Passing a value of None clears the user policy.""" |
if self.IsChromeOS(): |
- self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) |
+ self._SetUserPolicyChromeOS(user_policy=user_policy) |
+ elif self.IsWin(): |
+ self._SetUserPolicyWin(user_policy=user_policy) |
+ elif self.IsLinux(): |
+ self._SetUserPolicyLinux(user_policy=user_policy) |
else: |
- if device_policy is not None: |
- raise NotImplementedError('Device policy is only available on ChromeOS') |
- if self.IsWin(): |
- self._SetPoliciesWin(user_policy=user_policy) |
- elif self.IsLinux(): |
- self._SetPoliciesLinux(user_policy=user_policy) |
- else: |
- raise NotImplementedError('Not available on this platform.') |
+ raise NotImplementedError('Not available on this platform.') |
self.RefreshPolicies() |
+ def _SetProtobufMessageField(self, group_message, field, field_value): |
+ """Sets the given field in a protobuf to the given value.""" |
+ if field.label == field.LABEL_REPEATED: |
+ assert type(field_value) == list |
+ entries = group_message.__getattribute__(field.name) |
+ for list_item in field_value: |
+ entries.append(list_item) |
+ return |
+ elif field.type == field.TYPE_BOOL: |
+ assert type(field_value) == bool |
+ elif field.type == field.TYPE_STRING: |
+ assert type(field_value) == str or type(field_value) == unicode |
+ elif field.type == field.TYPE_INT64: |
+ assert type(field_value) == int |
+ elif (field.type == field.TYPE_MESSAGE and |
+ field.message_type.name == 'StringList'): |
+ assert type(field_value) == list |
+ entries = group_message.__getattribute__(field.name).entries |
+ for list_item in field_value: |
+ entries.append(list_item) |
+ return |
+ else: |
+ raise Exception('Unknown field type %s' % field.type) |
+ group_message.__setattr__(field.name, field_value) |
+ |
+ def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None): |
+ """Generates a signed device policy blob.""" |
+ |
+ # Fill in the device settings protobuf. |
+ device_policy = device_policy or {} |
+ owner = owner or constants.CREDENTIALS['$mockowner'][0] |
+ settings = dp.ChromeDeviceSettingsProto() |
+ for group in settings.DESCRIPTOR.fields: |
+ # Create protobuf message for group. |
+ group_message = eval('dp.' + group.message_type.name + '()') |
+ # Indicates if at least one field was set in |group_message|. |
+ got_fields = False |
+ # Iterate over fields of the message and feed them from the policy dict. |
+ for field in group_message.DESCRIPTOR.fields: |
+ field_value = None |
+ if field.name in device_policy: |
+ got_fields = True |
+ field_value = device_policy[field.name] |
+ self._SetProtobufMessageField(group_message, field, field_value) |
+ if got_fields: |
+ settings.__getattribute__(group.name).CopyFrom(group_message) |
+ |
+ # Fill in the policy data protobuf. |
+ policy_data = dm.PolicyData() |
+ policy_data.policy_type = 'google/chromeos/device' |
+ policy_data.policy_value = settings.SerializeToString() |
+ policy_data.username = owner |
+ serialized_policy_data = policy_data.SerializeToString() |
+ |
+ # Fill in the device management response protobuf. |
+ response = dm.DeviceManagementResponse() |
+ fetch_response = response.policy_response.response.add() |
+ fetch_response.policy_data = serialized_policy_data |
+ fetch_response.policy_data_signature = ( |
+ self._private_key.hashAndSign(serialized_policy_data).tostring()) |
+ |
+ self._device_policy_blob = fetch_response.SerializeToString() |
+ |
+ def _RefreshDevicePolicy(self): |
+ """Refreshes the device policy in force on ChromeOS.""" |
+ assert self.IsChromeOS() |
+ # The device policy blob is only picked up by the session manager on |
+ # startup, and is overwritten on shutdown. So the blob has to be written |
+ # while the session manager is stopped. |
+ self.WaitForSessionManagerRestart( |
+ lambda: self._WriteDevicePolicyWithSessionManagerStopped()) |
Nirnimesh
2012/04/05 19:05:30
lambda is redundant.
self.WaitForSessionManagerRe
bartfab (slow)
2012/04/10 15:27:20
Done.
|
+ logging.debug('Session manager restarted with device policy ready') |
+ pyauto.PyUITest.setUp(self) |
+ |
+ def SetDevicePolicy(self, device_policy=None, owner=None): |
+ """Sets the device policy provided as a dict and the owner on ChromeOS. |
+ |
+ Passing a value of None as the device policy clears it.""" |
+ if not self.IsChromeOS(): |
+ raise NotImplementedError('Device policy is only available on ChromeOS.') |
+ |
+ self._GenerateDevicePolicyBlob(device_policy, owner) |
+ self._RefreshDevicePolicy() |