OLD | NEW |
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Base class for tests that need to update the policies enforced by Chrome. | 6 """Base class for tests that need to update the policies enforced by Chrome. |
7 | 7 |
8 Subclasses can call SetPolicies with a dictionary of policies to install. | 8 Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and |
9 SetPolicies can also be used to set the device policies on ChromeOS. | 9 SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install. |
10 | 10 |
11 The current implementation depends on the platform. The implementations might | 11 The current implementation depends on the platform. The implementations might |
12 change in the future, but tests relying on SetPolicies will keep working. | 12 change in the future, but tests relying on the above calls will keep working. |
13 """ | 13 """ |
14 | 14 |
15 # On ChromeOS this relies on the device_management.py part of the TestServer, | 15 # On ChromeOS, both user and device policies are supported. Chrome is set up to |
16 # and forces the policies by triggering a new policy fetch and refreshing the | 16 # fetch user policy from a TestServer. A call to SetUserPolicy triggers an |
17 # cloud policy providers. This requires preparing the system for policy | 17 # immediate policy refresh, allowing the effects of user policy changes on a |
18 # fetching, which currently means the DMTokens have to be in place. Without the | 18 # running session to be tested. |
19 # DMTokens, the cloud policy controller won't be able to proceed, because the | |
20 # Gaia tokens for the DMService aren't available during tests. | |
21 # In the future this setup might not be necessary anymore, and the policy | |
22 # might also be pushed through the session_manager. | |
23 # | 19 # |
24 # On other platforms it relies on the SetPolicies automation call, which is | 20 # Device policy is injected by stopping Chrome and the session manager, writing |
25 # only available on non-official builds. This automation call is meant to be | 21 # a new device policy blob and starting Chrome and the session manager again. |
26 # eventually removed when a replacement for every platform is available. | 22 # This means that setting device policy always logs out the current user. It is |
27 # This requires setting up the policies in the registry on Windows, and writing | 23 # *not* possible to test the effects on a running session. These limitations |
28 # the right files on Linux and Mac. | 24 # stem from the fact that Chrome will only fetch device policy from a TestServer |
| 25 # if the device is enterprise owned. Enterprise ownership, in turn, requires |
| 26 # ownership of the TPM which can only be undone by reboothing the device (and |
| 27 # hence is not possible in a client test). |
29 | 28 |
30 import json | 29 import json |
31 import logging | 30 import logging |
32 import os | 31 import os |
33 import subprocess | 32 import subprocess |
34 import tempfile | 33 import tempfile |
35 import urllib | |
36 import urllib2 | |
37 | 34 |
| 35 import asn1der |
38 import pyauto | 36 import pyauto |
39 import pyauto_paths | 37 import pyauto_paths |
40 import pyauto_utils | 38 import pyauto_utils |
41 | 39 |
42 | 40 |
43 if pyauto.PyUITest.IsChromeOS(): | 41 if pyauto.PyUITest.IsChromeOS(): |
44 import sys | 42 import sys |
| 43 import warnings |
| 44 # Ignore deprecation warnings, they make our output more cluttered. |
| 45 warnings.filterwarnings('ignore', category=DeprecationWarning) |
| 46 |
45 # Find the path to the pyproto and add it to sys.path. | 47 # Find the path to the pyproto and add it to sys.path. |
46 # Prepend it so that google.protobuf is loaded from here. | 48 # Prepend it so that google.protobuf is loaded from here. |
47 for path in pyauto_paths.GetBuildDirs(): | 49 for path in pyauto_paths.GetBuildDirs(): |
48 p = os.path.join(path, 'pyproto') | 50 p = os.path.join(path, 'pyproto') |
49 if os.path.isdir(p): | 51 if os.path.isdir(p): |
50 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', | 52 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', |
51 'proto')] + sys.path | 53 'proto')] + sys.path |
52 break | 54 break |
53 sys.path.append('/usr/local') # to import autotest libs. | 55 sys.path.append('/usr/local') # to import autotest libs. |
54 import device_management_local_pb2 as dml | 56 sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite')) |
55 import device_management_backend_pb2 as dmb | 57 |
| 58 import chrome_device_policy_pb2 as dp |
| 59 import device_management_backend_pb2 as dm |
| 60 import tlslite.api |
56 from autotest.cros import constants | 61 from autotest.cros import constants |
57 from autotest.cros import cros_ui | 62 from autotest.cros import cros_ui |
58 elif pyauto.PyUITest.IsWin(): | 63 elif pyauto.PyUITest.IsWin(): |
59 import _winreg as winreg | 64 import _winreg as winreg |
60 | 65 |
| 66 # ASN.1 object identifier for PKCS#1/RSA. |
| 67 PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' |
| 68 |
61 | 69 |
62 class PolicyTestBase(pyauto.PyUITest): | 70 class PolicyTestBase(pyauto.PyUITest): |
63 """A base class for tests that need to set up and modify policies. | 71 """A base class for tests that need to set up and modify policies. |
64 | 72 |
65 Subclasses can use the SetPolicies call to set the policies seen by Chrome. | 73 Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and |
| 74 SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome. |
66 """ | 75 """ |
67 | 76 |
68 def _WriteFile(self, path, content): | 77 def _WriteFile(self, path, content): |
69 """Writes content to path, creating any intermediary directories.""" | 78 """Writes content to path, creating any intermediary directories.""" |
70 if not os.path.exists(os.path.dirname(path)): | 79 if not os.path.exists(os.path.dirname(path)): |
71 os.makedirs(os.path.dirname(path)) | 80 os.makedirs(os.path.dirname(path)) |
72 f = open(path, 'w') | 81 f = open(path, 'w') |
73 f.write(content) | 82 f.write(content) |
74 f.close() | 83 f.close() |
75 | 84 |
76 def _GetTestServerPoliciesFilePath(self): | 85 def _GetTestServerPoliciesFilePath(self): |
77 """Returns the path of the cloud policy configuration file.""" | 86 """Returns the path of the cloud policy configuration file.""" |
78 assert self.IsChromeOS() | 87 assert self.IsChromeOS() |
79 return os.path.join(self._temp_data_dir, 'device_management') | 88 return os.path.join(self._temp_data_dir, 'device_management') |
80 | 89 |
81 def _GetHttpURLForDeviceManagement(self): | 90 def _GetHttpURLForDeviceManagement(self): |
| 91 """Returns the URL at which the TestServer is serving user policy.""" |
82 assert self.IsChromeOS() | 92 assert self.IsChromeOS() |
83 return self._http_server.GetURL('device_management').spec() | 93 return self._http_server.GetURL('device_management').spec() |
84 | 94 |
85 def _WriteUserPolicyToken(self, token): | 95 def _WriteDevicePolicyWithSessionManagerStopped(self): |
86 """Writes the given token to the user device management cache.""" | 96 """Writes the device policy blob while the session manager is stopped. |
87 assert self.IsChromeOS() | |
88 blob = dml.DeviceCredentials() | |
89 blob.device_token = token | |
90 blob.device_id = '123' | |
91 self._WriteFile('/home/chronos/user/Device Management/Token', | |
92 blob.SerializeToString()) | |
93 | 97 |
94 def _WriteDevicePolicy(self, fetch_response): | 98 Updates the files holding the device policy blob and the public key need to |
95 """Writes the given signed fetch_response to the device policy cache. | 99 verify its signature. |
96 | |
97 Also writes the owner key, used to verify the signature. | |
98 """ | 100 """ |
99 assert self.IsChromeOS() | 101 assert self.IsChromeOS() |
100 self._WriteFile(constants.SIGNED_POLICY_FILE, | |
101 fetch_response.SerializeToString()) | |
102 self._WriteFile(constants.OWNER_KEY_FILE, | |
103 fetch_response.new_public_key) | |
104 | |
105 def _PostToDMServer(self, request_type, body, headers): | |
106 """Posts a request to the TestServer's Device Management interface. | |
107 | |
108 |request_type| is the value of the 'request' HTTP parameter. | |
109 Returns the response's body. | |
110 """ | |
111 assert self.IsChromeOS() | |
112 url = self._GetHttpURLForDeviceManagement() | |
113 url += '?' + urllib.urlencode({ | |
114 'deviceid': '123', | |
115 'oauth_token': '456', | |
116 'request': request_type, | |
117 'devicetype': 2, | |
118 'apptype': 'Chrome', | |
119 'agent': 'Chrome', | |
120 }) | |
121 return urllib2.urlopen(urllib2.Request(url, body, headers)).read() | |
122 | |
123 def _PostRegisterRequest(self, type): | |
124 """Sends a device register request to the TestServer, of the given type.""" | |
125 assert self.IsChromeOS() | |
126 request = dmb.DeviceManagementRequest() | |
127 register = request.register_request | |
128 register.machine_id = '789' | |
129 register.type = type | |
130 return self._PostToDMServer('register', request.SerializeToString(), {}) | |
131 | |
132 def _RegisterAndGetDMToken(self, device): | |
133 """Registers with the TestServer and returns the DMToken fetched. | |
134 | |
135 Registers for device policy if device is True. Otherwise registers for | |
136 user policy. | |
137 """ | |
138 assert self.IsChromeOS() | |
139 type = device and dmb.DeviceRegisterRequest.DEVICE \ | |
140 or dmb.DeviceRegisterRequest.USER | |
141 rstring = self._PostRegisterRequest(type) | |
142 response = dmb.DeviceManagementResponse() | |
143 response.ParseFromString(rstring) | |
144 return response.register_response.device_management_token | |
145 | |
146 def _PostPolicyRequest(self, token, type, want_signature=False): | |
147 """Fetches policy from the TestServer, using the given token. | |
148 | |
149 Policy is fetched for the given type. If want_signature is True, the | |
150 request will ask for a signed response. Returns the response body. | |
151 """ | |
152 assert self.IsChromeOS() | |
153 request = dmb.DeviceManagementRequest() | |
154 policy = request.policy_request | |
155 prequest = policy.request.add() | |
156 prequest.policy_type = type | |
157 if want_signature: | |
158 prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA | |
159 headers = { | |
160 'Authorization': 'GoogleDMToken token=' + token, | |
161 } | |
162 return self._PostToDMServer('policy', request.SerializeToString(), headers) | |
163 | |
164 def _FetchPolicy(self, token, device): | |
165 """Fetches policy from the TestServer, using the given token. | |
166 | |
167 Token must be a valid token retrieved with _RegisterAndGetDMToken. If | |
168 device is True, fetches signed device policy. Otherwise fetches user policy. | |
169 This method also verifies the response, and returns the first policy fetch | |
170 response. | |
171 """ | |
172 assert self.IsChromeOS() | |
173 type = device and 'google/chromeos/device' or 'google/chromeos/user' | |
174 rstring = self._PostPolicyRequest(token=token, type=type, | |
175 want_signature=device) | |
176 response = dmb.DeviceManagementResponse() | |
177 response.ParseFromString(rstring) | |
178 fetch_response = response.policy_response.response[0] | |
179 assert fetch_response.policy_data | |
180 assert fetch_response.policy_data_signature | |
181 assert fetch_response.new_public_key | |
182 return fetch_response | |
183 | |
184 def _WriteDevicePolicyWithSessionManagerStopped(self, policy): | |
185 """Writes the device policy blob while the Session Manager is stopped.""" | |
186 assert self.IsChromeOS() | |
187 logging.debug('Stopping session manager') | 102 logging.debug('Stopping session manager') |
188 cros_ui.stop(allow_fail=True) | 103 cros_ui.stop(allow_fail=True) |
189 logging.debug('Writing device policy cache') | 104 logging.debug('Writing device policy blob') |
190 self._WriteDevicePolicy(policy) | 105 self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob) |
| 106 self._WriteFile(constants.OWNER_KEY_FILE, self._public_key) |
191 | 107 |
192 # Ugly hack: session manager won't spawn chrome if this file exists. That's | 108 # Ugly hack: session manager will not spawn Chrome if this file exists. That |
193 # usually a good thing (to keep the automation channel open), but in this | 109 # is usually a good thing (to keep the automation channel open), but in this |
194 # case we really want to restart chrome. PyUITest.setUp() will be called | 110 # case we really want to restart chrome. PyUITest.setUp() will be called |
195 # after session manager and chrome have restarted, and will setup the | 111 # after session manager and chrome have restarted, and will setup the |
196 # automation channel. | 112 # automation channel. |
197 restore_magic_file = False | 113 restore_magic_file = False |
198 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): | 114 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): |
199 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' | 115 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' |
200 'Removing temporarily for the next restart.') | 116 'Removing temporarily for the next restart.') |
201 restore_magic_file = True | 117 restore_magic_file = True |
202 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 118 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
203 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 119 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
204 | 120 |
205 logging.debug('Starting session manager again') | 121 logging.debug('Starting session manager again') |
206 cros_ui.start() | 122 cros_ui.start() |
207 | 123 |
208 # cros_ui.start() waits for the login prompt to be visible, so chrome has | 124 # cros_ui.start() waits for the login prompt to be visible, so Chrome has |
209 # already started once it returns. | 125 # already started once it returns. |
210 if restore_magic_file: | 126 if restore_magic_file: |
211 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() | 127 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() |
212 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 128 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
213 | 129 |
214 def ExtraChromeFlags(self): | 130 def ExtraChromeFlags(self): |
215 """Sets up Chrome to use cloud policies on ChromeOS.""" | 131 """Sets up Chrome to use cloud policies on ChromeOS.""" |
216 flags = pyauto.PyUITest.ExtraChromeFlags(self) | 132 flags = pyauto.PyUITest.ExtraChromeFlags(self) |
217 if self.IsChromeOS(): | 133 if self.IsChromeOS(): |
| 134 while '--skip-oauth-login' in flags: |
| 135 flags.remove('--skip-oauth-login') |
218 url = self._GetHttpURLForDeviceManagement() | 136 url = self._GetHttpURLForDeviceManagement() |
219 flag = '--device-management-url=' + url | 137 flags.append('--device-management-url=' + url) |
220 flags += [flag] | 138 flags.append('--disable-sync') |
221 return flags | 139 return flags |
222 | 140 |
223 def setUp(self): | 141 def setUp(self): |
224 """Sets up the platform for policy testing. | 142 """Sets up the platform for policy testing. |
225 | 143 |
226 On ChromeOS, part of the set up involves restarting the session_manager and | 144 On ChromeOS, part of the setup involves restarting the session manager to |
227 logging in with the $default account. | 145 inject a device policy blob. |
228 """ | 146 """ |
229 if self.IsChromeOS(): | 147 if self.IsChromeOS(): |
230 # Setup a temporary data dir and a TestServer serving files from there. | 148 # Set up a temporary data dir and a TestServer serving files from there. |
231 # The TestServer makes its document root relative to the src dir. | 149 # The TestServer makes its document root relative to the src dir. |
232 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) | 150 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) |
233 relative_temp_data_dir = os.path.basename(self._temp_data_dir) | 151 relative_temp_data_dir = os.path.basename(self._temp_data_dir) |
234 self._http_server = self.StartHTTPServer(relative_temp_data_dir) | 152 self._http_server = self.StartHTTPServer(relative_temp_data_dir) |
235 | 153 |
236 # Setup empty policies, so that the TestServer can start replying. | 154 # Set up an empty user policy so that the TestServer can start replying. |
237 self._SetCloudPolicies() | 155 self._SetUserPolicyChromeOS() |
238 | 156 |
239 device_dmtoken = self._RegisterAndGetDMToken(device=True) | 157 # Generate a key pair for signing device policy. |
240 policy = self._FetchPolicy(token=device_dmtoken, device=True) | 158 self._private_key = tlslite.api.generateRSAKey(1024) |
241 user_dmtoken = self._RegisterAndGetDMToken(device=False) | 159 algorithm = asn1der.Sequence( |
| 160 [asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), |
| 161 asn1der.Data(asn1der.NULL, '')]) |
| 162 rsa_pubkey = asn1der.Sequence([asn1der.Integer(self._private_key.n), |
| 163 asn1der.Integer(self._private_key.e)]) |
| 164 self._public_key = asn1der.Sequence( |
| 165 [algorithm, asn1der.Bitstring(rsa_pubkey)]) |
242 | 166 |
243 # The device policy blob is only picked up by the session manager on | 167 # Clear device policy. This also invokes pyauto.PyUITest.setUp(self). |
244 # startup, and is overwritten on shutdown. So the blob has to be written | 168 self.SetDevicePolicy() |
245 # while the session manager is stopped. | |
246 self.WaitForSessionManagerRestart( | |
247 lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy)) | |
248 logging.debug('Session manager restarted with device policy ready') | |
249 | 169 |
250 pyauto.PyUITest.setUp(self) | 170 # Remove any existing vaults. |
251 | 171 self.RemoveAllCryptohomeVaultsOnChromeOS() |
252 if self.IsChromeOS(): | 172 else: |
253 logging.debug('Logging in') | 173 pyauto.PyUITest.setUp(self) |
254 credentials = self.GetPrivateInfo()['prod_enterprise_test_user'] | |
255 self.Login(credentials['username'], credentials['password']) | |
256 assert self.GetLoginInfo()['is_logged_in'] | |
257 | |
258 self._WriteUserPolicyToken(user_dmtoken) | |
259 # The browser has to be reloaded to make the user policy token cache | |
260 # reload the file just written. The file can also be written only after | |
261 # the cryptohome is mounted, after login. | |
262 self.RestartBrowser(clear_profile=False) | |
263 | 174 |
264 def tearDown(self): | 175 def tearDown(self): |
265 """Cleans up the files created by setUp and policies added in tests.""" | 176 """Cleans up the policies and related files created in tests.""" |
266 # Clear the policies. | |
267 self.SetPolicies() | |
268 | |
269 if self.IsChromeOS(): | 177 if self.IsChromeOS(): |
270 pyauto.PyUITest.Logout(self) | 178 # On ChromeOS, clearing device policy logs out the current user so that |
| 179 # no policy is in force. User policy is then permanently cleared at the |
| 180 # end of the method after stopping the TestServer. |
| 181 self.SetDevicePolicy() |
| 182 else: |
| 183 # On other platforms, there is only user policy to clear. |
| 184 self.SetUserPolicy() |
271 | 185 |
272 pyauto.PyUITest.tearDown(self) | 186 pyauto.PyUITest.tearDown(self) |
273 | 187 |
274 if self.IsChromeOS(): | 188 if self.IsChromeOS(): |
275 self.StopHTTPServer(self._http_server) | 189 self.StopHTTPServer(self._http_server) |
276 pyauto_utils.RemovePath(self._temp_data_dir) | 190 pyauto_utils.RemovePath(self._temp_data_dir) |
| 191 self.RemoveAllCryptohomeVaultsOnChromeOS() |
277 | 192 |
278 def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, | 193 def LoginWithTestAccount(self, account='prod_enterprise_test_user'): |
279 device=None): | 194 """Convenience method for logging in with one of the test accounts.""" |
280 """Exports the policies to the configuration file of the TestServer. | 195 assert self.IsChromeOS() |
| 196 credentials = self.GetPrivateInfo()[account] |
| 197 self.Login(credentials['username'], credentials['password']) |
| 198 assert self.GetLoginInfo()['is_logged_in'] |
281 | 199 |
282 The TestServer will serve these policies after this function returns. | 200 def _SetUserPolicyChromeOS(self, user_policy=None): |
283 | 201 """Writes the given user policy to the TestServer's input file.""" |
284 Args: | |
285 user_mandatory: user policies of mandatory level | |
286 user_recommended: user policies of recommended level | |
287 device: device policies | |
288 """ | |
289 assert self.IsChromeOS() | 202 assert self.IsChromeOS() |
290 policy_dict = { | 203 policy_dict = { |
291 'google/chromeos/device': device or {}, | 204 'google/chromeos/device': {}, |
292 'google/chromeos/user': { | 205 'google/chromeos/user': { |
293 'mandatory': user_mandatory or {}, | 206 'mandatory': user_policy or {}, |
294 'recommended': user_recommended or {}, | 207 'recommended': {}, |
295 }, | 208 }, |
296 'managed_users': ['*'], | 209 'managed_users': ['*'], |
297 } | 210 } |
298 self._WriteFile(self._GetTestServerPoliciesFilePath(), | 211 self._WriteFile(self._GetTestServerPoliciesFilePath(), |
299 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') | 212 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') |
300 | 213 |
301 def _SetPoliciesWin(self, user_policy=None): | 214 def _SetUserPolicyWin(self, user_policy=None): |
302 """Exports the policies as dictionary in the argument to Window registry. | 215 """Writes the given user policy to the Windows registry.""" |
303 | |
304 Removes the registry key and its subkeys if they exist. | |
305 | |
306 Args: | |
307 user_policy: A dictionary representing the user policies. Clear the | |
308 registry if None. | |
309 | |
310 Raises: | |
311 TypeError: If an unexpected value is found in the policy dictionary. | |
312 """ | |
313 | |
314 def SetValueEx(key, sub_key, value): | 216 def SetValueEx(key, sub_key, value): |
315 if isinstance(value, int): | 217 if isinstance(value, int): |
316 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) | 218 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) |
317 elif isinstance(value, basestring): | 219 elif isinstance(value, basestring): |
318 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) | 220 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) |
319 elif isinstance(value, list): | 221 elif isinstance(value, list): |
320 k = winreg.CreateKey(key, sub_key) | 222 k = winreg.CreateKey(key, sub_key) |
321 for index, v in list(enumerate(value)): | 223 for index, v in list(enumerate(value)): |
322 SetValueEx(k, str(index + 1), v) | 224 SetValueEx(k, str(index + 1), v) |
323 winreg.CloseKey(k) | 225 winreg.CloseKey(k) |
(...skipping 10 matching lines...) Expand all Loading... |
334 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: | 236 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: |
335 logging.debug(r'Removing %s' % reg_base) | 237 logging.debug(r'Removing %s' % reg_base) |
336 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) | 238 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) |
337 | 239 |
338 if user_policy is not None: | 240 if user_policy is not None: |
339 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) | 241 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) |
340 for k, v in user_policy.iteritems(): | 242 for k, v in user_policy.iteritems(): |
341 SetValueEx(root_key, k, v) | 243 SetValueEx(root_key, k, v) |
342 winreg.CloseKey(root_key) | 244 winreg.CloseKey(root_key) |
343 | 245 |
344 def _SetPoliciesLinux(self, user_policy=None): | 246 def _SetUserPolicyLinux(self, user_policy=None): |
345 """Exports the policies as dictionary in the argument to a JSON file. | 247 """Writes the given user policy to the JSON policy file read by Chrome.""" |
346 | |
347 Removes the JSON file if it exists. | |
348 | |
349 Args: | |
350 user_policy: A dictionary representing the user policies. Remove the | |
351 JSON file if None | |
352 """ | |
353 assert self.IsLinux() | 248 assert self.IsLinux() |
354 sudo_cmd_file = os.path.join(os.path.dirname(__file__), | 249 sudo_cmd_file = os.path.join(os.path.dirname(__file__), |
355 'policy_linux_util.py') | 250 'policy_linux_util.py') |
356 | 251 |
357 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': | 252 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': |
358 policies_location_base = '/etc/opt/chrome' | 253 policies_location_base = '/etc/opt/chrome' |
359 else: | 254 else: |
360 policies_location_base = '/etc/chromium' | 255 policies_location_base = '/etc/chromium' |
361 | 256 |
362 if os.path.isdir(policies_location_base): | 257 if os.path.isdir(policies_location_base): |
363 logging.debug('Removing directory %s' % policies_location_base) | 258 logging.debug('Removing directory %s' % policies_location_base) |
364 subprocess.call(['suid-python', sudo_cmd_file, | 259 subprocess.call(['suid-python', sudo_cmd_file, |
365 'remove_dir', policies_location_base]) | 260 'remove_dir', policies_location_base]) |
366 | 261 |
367 if user_policy is not None: | 262 if user_policy is not None: |
368 self._WriteFile('/tmp/chrome.json', | 263 self._WriteFile('/tmp/chrome.json', |
369 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') | 264 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') |
370 | 265 |
371 policies_location = '%s/policies/managed' % policies_location_base | 266 policies_location = '%s/policies/managed' % policies_location_base |
372 subprocess.call(['suid-python', sudo_cmd_file, | 267 subprocess.call(['suid-python', sudo_cmd_file, |
373 'setup_dir', policies_location]) | 268 'setup_dir', policies_location]) |
374 # Copy chrome.json file to the managed directory | 269 # Copy chrome.json file to the managed directory |
375 subprocess.call(['suid-python', sudo_cmd_file, | 270 subprocess.call(['suid-python', sudo_cmd_file, |
376 'copy', '/tmp/chrome.json', policies_location]) | 271 'copy', '/tmp/chrome.json', policies_location]) |
377 os.remove('/tmp/chrome.json') | 272 os.remove('/tmp/chrome.json') |
378 | 273 |
379 def SetPolicies(self, user_policy=None, device_policy=None): | 274 def SetUserPolicy(self, user_policy=None): |
380 """Enforces the policies given in the arguments as dictionaries. | 275 """Sets the user policy provided as a dict. |
381 | 276 |
382 These policies will have been installed after this call returns. | 277 Passing a value of None clears the user policy.""" |
383 | |
384 Args: | |
385 user_policy: A dictionary representing the user policies. | |
386 device_policy: A dictionary representing the device policies on Chrome OS. | |
387 | |
388 Raises: | |
389 NotImplementedError if the platform is not supported. | |
390 """ | |
391 if self.IsChromeOS(): | 278 if self.IsChromeOS(): |
392 self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) | 279 self._SetUserPolicyChromeOS(user_policy=user_policy) |
| 280 elif self.IsWin(): |
| 281 self._SetUserPolicyWin(user_policy=user_policy) |
| 282 elif self.IsLinux(): |
| 283 self._SetUserPolicyLinux(user_policy=user_policy) |
393 else: | 284 else: |
394 if device_policy is not None: | 285 raise NotImplementedError('Not available on this platform.') |
395 raise NotImplementedError('Device policy is only available on ChromeOS') | |
396 if self.IsWin(): | |
397 self._SetPoliciesWin(user_policy=user_policy) | |
398 elif self.IsLinux(): | |
399 self._SetPoliciesLinux(user_policy=user_policy) | |
400 else: | |
401 raise NotImplementedError('Not available on this platform.') | |
402 | 286 |
403 self.RefreshPolicies() | 287 self.RefreshPolicies() |
404 | 288 |
| 289 def _SetProtobufMessageField(self, group_message, field, field_value): |
| 290 """Sets the given field in a protobuf to the given value.""" |
| 291 if field.label == field.LABEL_REPEATED: |
| 292 assert type(field_value) == list |
| 293 entries = group_message.__getattribute__(field.name) |
| 294 for list_item in field_value: |
| 295 entries.append(list_item) |
| 296 return |
| 297 elif field.type == field.TYPE_BOOL: |
| 298 assert type(field_value) == bool |
| 299 elif field.type == field.TYPE_STRING: |
| 300 assert type(field_value) == str or type(field_value) == unicode |
| 301 elif field.type == field.TYPE_INT64: |
| 302 assert type(field_value) == int |
| 303 elif (field.type == field.TYPE_MESSAGE and |
| 304 field.message_type.name == 'StringList'): |
| 305 assert type(field_value) == list |
| 306 entries = group_message.__getattribute__(field.name).entries |
| 307 for list_item in field_value: |
| 308 entries.append(list_item) |
| 309 return |
| 310 else: |
| 311 raise Exception('Unknown field type %s' % field.type) |
| 312 group_message.__setattr__(field.name, field_value) |
| 313 |
| 314 def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None): |
| 315 """Generates a signed device policy blob.""" |
| 316 |
| 317 # Fill in the device settings protobuf. |
| 318 device_policy = device_policy or {} |
| 319 owner = owner or constants.CREDENTIALS['$mockowner'][0] |
| 320 settings = dp.ChromeDeviceSettingsProto() |
| 321 for group in settings.DESCRIPTOR.fields: |
| 322 # Create protobuf message for group. |
| 323 group_message = eval('dp.' + group.message_type.name + '()') |
| 324 # Indicates if at least one field was set in |group_message|. |
| 325 got_fields = False |
| 326 # Iterate over fields of the message and feed them from the policy dict. |
| 327 for field in group_message.DESCRIPTOR.fields: |
| 328 field_value = None |
| 329 if field.name in device_policy: |
| 330 got_fields = True |
| 331 field_value = device_policy[field.name] |
| 332 self._SetProtobufMessageField(group_message, field, field_value) |
| 333 if got_fields: |
| 334 settings.__getattribute__(group.name).CopyFrom(group_message) |
| 335 |
| 336 # Fill in the policy data protobuf. |
| 337 policy_data = dm.PolicyData() |
| 338 policy_data.policy_type = 'google/chromeos/device' |
| 339 policy_data.policy_value = settings.SerializeToString() |
| 340 policy_data.username = owner |
| 341 serialized_policy_data = policy_data.SerializeToString() |
| 342 |
| 343 # Fill in the device management response protobuf. |
| 344 response = dm.DeviceManagementResponse() |
| 345 fetch_response = response.policy_response.response.add() |
| 346 fetch_response.policy_data = serialized_policy_data |
| 347 fetch_response.policy_data_signature = ( |
| 348 self._private_key.hashAndSign(serialized_policy_data).tostring()) |
| 349 |
| 350 self._device_policy_blob = fetch_response.SerializeToString() |
| 351 |
| 352 def _RefreshDevicePolicy(self): |
| 353 """Refreshes the device policy in force on ChromeOS.""" |
| 354 assert self.IsChromeOS() |
| 355 # The device policy blob is only picked up by the session manager on |
| 356 # startup, and is overwritten on shutdown. So the blob has to be written |
| 357 # while the session manager is stopped. |
| 358 self.WaitForSessionManagerRestart( |
| 359 self._WriteDevicePolicyWithSessionManagerStopped) |
| 360 logging.debug('Session manager restarted with device policy ready') |
| 361 pyauto.PyUITest.setUp(self) |
| 362 |
| 363 def SetDevicePolicy(self, device_policy=None, owner=None): |
| 364 """Sets the device policy provided as a dict and the owner on ChromeOS. |
| 365 |
| 366 Passing a value of None as the device policy clears it.""" |
| 367 if not self.IsChromeOS(): |
| 368 raise NotImplementedError('Device policy is only available on ChromeOS.') |
| 369 |
| 370 self._GenerateDevicePolicyBlob(device_policy, owner) |
| 371 self._RefreshDevicePolicy() |
OLD | NEW |