OLD | NEW |
---|---|
1 #!/usr/bin/python | 1 #!/usr/bin/python |
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
5 | 5 |
6 """Base class for tests that need to update the policies enforced by Chrome. | 6 """Base class for tests that need to update the policies enforced by Chrome. |
7 | 7 |
8 Subclasses can call SetPolicies with a dictionary of policies to install. | 8 Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and |
9 SetPolicies can also be used to set the device policies on ChromeOS. | 9 SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install. |
10 | 10 |
11 The current implementation depends on the platform. The implementations might | 11 The current implementation depends on the platform. The implementations might |
12 change in the future, but tests relying on SetPolicies will keep working. | 12 change in the future, but tests relying on the above calls will keep working. |
13 """ | 13 """ |
14 | 14 |
15 # On ChromeOS this relies on the device_management.py part of the TestServer, | 15 # On ChromeOS, both user and device policies are supported. Chrome is set up to |
16 # and forces the policies by triggering a new policy fetch and refreshing the | 16 # fetch user policy from a TestServer. A call to SetUserPolicy triggers an |
17 # cloud policy providers. This requires preparing the system for policy | 17 # immediate policy refresh, allowing the effects of user policy changes on a |
18 # fetching, which currently means the DMTokens have to be in place. Without the | 18 # running session to be tested. |
19 # DMTokens, the cloud policy controller won't be able to proceed, because the | |
20 # Gaia tokens for the DMService aren't available during tests. | |
21 # In the future this setup might not be necessary anymore, and the policy | |
22 # might also be pushed through the session_manager. | |
23 # | 19 # |
24 # On other platforms it relies on the SetPolicies automation call, which is | 20 # Device policy is injected by stopping Chrome and the session manager, writing |
25 # only available on non-official builds. This automation call is meant to be | 21 # a new device policy blob and starting Chrome and the session manager again. |
26 # eventually removed when a replacement for every platform is available. | 22 # This means that setting device policy always logs out the current user. It is |
27 # This requires setting up the policies in the registry on Windows, and writing | 23 # *not* possible to test the effects on a running session. These limitations |
28 # the right files on Linux and Mac. | 24 # stem from the fact that Chrome will only fetch device policy from a TestServer |
25 # if the device is enterprise owned. Enterprise ownership, in turn, requires | |
26 # ownership of the TPM which can only be undone by reboothing the device (and | |
27 # hence is not possible in a client test). | |
29 | 28 |
30 import json | 29 import json |
31 import logging | 30 import logging |
32 import os | 31 import os |
33 import subprocess | 32 import subprocess |
34 import tempfile | 33 import tempfile |
35 import urllib | |
36 import urllib2 | |
37 | 34 |
35 import asn1der | |
38 import pyauto | 36 import pyauto |
39 import pyauto_paths | 37 import pyauto_paths |
40 import pyauto_utils | 38 import pyauto_utils |
41 | 39 |
42 | 40 |
43 if pyauto.PyUITest.IsChromeOS(): | 41 if pyauto.PyUITest.IsChromeOS(): |
44 import sys | 42 import sys |
43 import warnings | |
44 # Ignore deprecation warnings, they make our output more cluttered. | |
45 warnings.filterwarnings("ignore", category=DeprecationWarning) | |
Nirnimesh
2012/04/05 19:05:30
nit: use ' instead of " for consistency
bartfab (slow)
2012/04/10 15:27:20
Done.
| |
46 | |
45 # Find the path to the pyproto and add it to sys.path. | 47 # Find the path to the pyproto and add it to sys.path. |
46 # Prepend it so that google.protobuf is loaded from here. | 48 # Prepend it so that google.protobuf is loaded from here. |
47 for path in pyauto_paths.GetBuildDirs(): | 49 for path in pyauto_paths.GetBuildDirs(): |
48 p = os.path.join(path, 'pyproto') | 50 p = os.path.join(path, 'pyproto') |
49 if os.path.isdir(p): | 51 if os.path.isdir(p): |
50 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', | 52 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', |
51 'proto')] + sys.path | 53 'proto')] + sys.path |
52 break | 54 break |
53 sys.path.append('/usr/local') # to import autotest libs. | 55 sys.path.append('/usr/local') # to import autotest libs. |
54 import device_management_local_pb2 as dml | 56 sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite')) |
55 import device_management_backend_pb2 as dmb | 57 |
58 import chrome_device_policy_pb2 as dp | |
59 import device_management_backend_pb2 as dm | |
60 import tlslite.api | |
56 from autotest.cros import constants | 61 from autotest.cros import constants |
57 from autotest.cros import cros_ui | 62 from autotest.cros import cros_ui |
58 elif pyauto.PyUITest.IsWin(): | 63 elif pyauto.PyUITest.IsWin(): |
59 import _winreg as winreg | 64 import _winreg as winreg |
60 | 65 |
66 # ASN.1 object identifier for PKCS#1/RSA. | |
67 PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' | |
68 | |
61 | 69 |
62 class PolicyTestBase(pyauto.PyUITest): | 70 class PolicyTestBase(pyauto.PyUITest): |
63 """A base class for tests that need to set up and modify policies. | 71 """A base class for tests that need to set up and modify policies. |
64 | 72 |
65 Subclasses can use the SetPolicies call to set the policies seen by Chrome. | 73 Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and |
74 SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome. | |
66 """ | 75 """ |
67 | 76 |
68 def _WriteFile(self, path, content): | 77 def _WriteFile(self, path, content): |
69 """Writes content to path, creating any intermediary directories.""" | 78 """Writes content to path, creating any intermediary directories.""" |
70 if not os.path.exists(os.path.dirname(path)): | 79 if not os.path.exists(os.path.dirname(path)): |
71 os.makedirs(os.path.dirname(path)) | 80 os.makedirs(os.path.dirname(path)) |
72 f = open(path, 'w') | 81 f = open(path, 'w') |
73 f.write(content) | 82 f.write(content) |
74 f.close() | 83 f.close() |
75 | 84 |
76 def _GetTestServerPoliciesFilePath(self): | 85 def _GetTestServerPoliciesFilePath(self): |
77 """Returns the path of the cloud policy configuration file.""" | 86 """Returns the path of the cloud policy configuration file.""" |
78 assert self.IsChromeOS() | 87 assert self.IsChromeOS() |
79 return os.path.join(self._temp_data_dir, 'device_management') | 88 return os.path.join(self._temp_data_dir, 'device_management') |
80 | 89 |
81 def _GetHttpURLForDeviceManagement(self): | 90 def _GetHttpURLForDeviceManagement(self): |
91 """Returns the URL at which the TestServer is serving user policy.""" | |
82 assert self.IsChromeOS() | 92 assert self.IsChromeOS() |
83 return self._http_server.GetURL('device_management').spec() | 93 return self._http_server.GetURL('device_management').spec() |
84 | 94 |
85 def _WriteUserPolicyToken(self, token): | 95 def _WriteDevicePolicyWithSessionManagerStopped(self): |
86 """Writes the given token to the user device management cache.""" | 96 """Writes the device policy blob while the session manager is stopped. |
87 assert self.IsChromeOS() | |
88 blob = dml.DeviceCredentials() | |
89 blob.device_token = token | |
90 blob.device_id = '123' | |
91 self._WriteFile('/home/chronos/user/Device Management/Token', | |
92 blob.SerializeToString()) | |
93 | 97 |
94 def _WriteDevicePolicy(self, fetch_response): | 98 Updates the files holding the device policy blob and the public key need to |
95 """Writes the given signed fetch_response to the device policy cache. | 99 verify its signature. |
96 | |
97 Also writes the owner key, used to verify the signature. | |
98 """ | 100 """ |
99 assert self.IsChromeOS() | 101 assert self.IsChromeOS() |
100 self._WriteFile(constants.SIGNED_POLICY_FILE, | |
101 fetch_response.SerializeToString()) | |
102 self._WriteFile(constants.OWNER_KEY_FILE, | |
103 fetch_response.new_public_key) | |
104 | |
105 def _PostToDMServer(self, request_type, body, headers): | |
106 """Posts a request to the TestServer's Device Management interface. | |
107 | |
108 |request_type| is the value of the 'request' HTTP parameter. | |
109 Returns the response's body. | |
110 """ | |
111 assert self.IsChromeOS() | |
112 url = self._GetHttpURLForDeviceManagement() | |
113 url += '?' + urllib.urlencode({ | |
114 'deviceid': '123', | |
115 'oauth_token': '456', | |
116 'request': request_type, | |
117 'devicetype': 2, | |
118 'apptype': 'Chrome', | |
119 'agent': 'Chrome', | |
120 }) | |
121 return urllib2.urlopen(urllib2.Request(url, body, headers)).read() | |
122 | |
123 def _PostRegisterRequest(self, type): | |
124 """Sends a device register request to the TestServer, of the given type.""" | |
125 assert self.IsChromeOS() | |
126 request = dmb.DeviceManagementRequest() | |
127 register = request.register_request | |
128 register.machine_id = '789' | |
129 register.type = type | |
130 return self._PostToDMServer('register', request.SerializeToString(), {}) | |
131 | |
132 def _RegisterAndGetDMToken(self, device): | |
133 """Registers with the TestServer and returns the DMToken fetched. | |
134 | |
135 Registers for device policy if device is True. Otherwise registers for | |
136 user policy. | |
137 """ | |
138 assert self.IsChromeOS() | |
139 type = device and dmb.DeviceRegisterRequest.DEVICE \ | |
140 or dmb.DeviceRegisterRequest.USER | |
141 rstring = self._PostRegisterRequest(type) | |
142 response = dmb.DeviceManagementResponse() | |
143 response.ParseFromString(rstring) | |
144 return response.register_response.device_management_token | |
145 | |
146 def _PostPolicyRequest(self, token, type, want_signature=False): | |
147 """Fetches policy from the TestServer, using the given token. | |
148 | |
149 Policy is fetched for the given type. If want_signature is True, the | |
150 request will ask for a signed response. Returns the response body. | |
151 """ | |
152 assert self.IsChromeOS() | |
153 request = dmb.DeviceManagementRequest() | |
154 policy = request.policy_request | |
155 prequest = policy.request.add() | |
156 prequest.policy_type = type | |
157 if want_signature: | |
158 prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA | |
159 headers = { | |
160 'Authorization': 'GoogleDMToken token=' + token, | |
161 } | |
162 return self._PostToDMServer('policy', request.SerializeToString(), headers) | |
163 | |
164 def _FetchPolicy(self, token, device): | |
165 """Fetches policy from the TestServer, using the given token. | |
166 | |
167 Token must be a valid token retrieved with _RegisterAndGetDMToken. If | |
168 device is True, fetches signed device policy. Otherwise fetches user policy. | |
169 This method also verifies the response, and returns the first policy fetch | |
170 response. | |
171 """ | |
172 assert self.IsChromeOS() | |
173 type = device and 'google/chromeos/device' or 'google/chromeos/user' | |
174 rstring = self._PostPolicyRequest(token=token, type=type, | |
175 want_signature=device) | |
176 response = dmb.DeviceManagementResponse() | |
177 response.ParseFromString(rstring) | |
178 fetch_response = response.policy_response.response[0] | |
179 assert fetch_response.policy_data | |
180 assert fetch_response.policy_data_signature | |
181 assert fetch_response.new_public_key | |
182 return fetch_response | |
183 | |
184 def _WriteDevicePolicyWithSessionManagerStopped(self, policy): | |
185 """Writes the device policy blob while the Session Manager is stopped.""" | |
186 assert self.IsChromeOS() | |
187 logging.debug('Stopping session manager') | 102 logging.debug('Stopping session manager') |
188 cros_ui.stop(allow_fail=True) | 103 cros_ui.stop(allow_fail=True) |
189 logging.debug('Writing device policy cache') | 104 logging.debug('Writing device policy blob') |
190 self._WriteDevicePolicy(policy) | 105 self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob) |
106 self._WriteFile(constants.OWNER_KEY_FILE, self._public_key) | |
191 | 107 |
192 # Ugly hack: session manager won't spawn chrome if this file exists. That's | 108 # Ugly hack: session manager will not spawn Chrome if this file exists. That |
193 # usually a good thing (to keep the automation channel open), but in this | 109 # is usually a good thing (to keep the automation channel open), but in this |
194 # case we really want to restart chrome. PyUITest.setUp() will be called | 110 # case we really want to restart chrome. PyUITest.setUp() will be called |
195 # after session manager and chrome have restarted, and will setup the | 111 # after session manager and chrome have restarted, and will setup the |
196 # automation channel. | 112 # automation channel. |
197 restore_magic_file = False | 113 restore_magic_file = False |
198 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): | 114 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): |
199 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' | 115 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' |
200 'Removing temporarily for the next restart.') | 116 'Removing temporarily for the next restart.') |
201 restore_magic_file = True | 117 restore_magic_file = True |
202 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 118 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
203 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 119 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
204 | 120 |
205 logging.debug('Starting session manager again') | 121 logging.debug('Starting session manager again') |
206 cros_ui.start() | 122 cros_ui.start() |
207 | 123 |
208 # cros_ui.start() waits for the login prompt to be visible, so chrome has | 124 # cros_ui.start() waits for the login prompt to be visible, so Chrome has |
209 # already started once it returns. | 125 # already started once it returns. |
210 if restore_magic_file: | 126 if restore_magic_file: |
211 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() | 127 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() |
212 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) | 128 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) |
213 | 129 |
214 def ExtraChromeFlags(self): | 130 def ExtraChromeFlags(self): |
215 """Sets up Chrome to use cloud policies on ChromeOS.""" | 131 """Sets up Chrome to use cloud policies on ChromeOS.""" |
216 flags = pyauto.PyUITest.ExtraChromeFlags(self) | 132 flags = pyauto.PyUITest.ExtraChromeFlags(self) |
217 if self.IsChromeOS(): | 133 if self.IsChromeOS(): |
134 while '--skip-oauth-login' in flags: | |
135 flags.remove('--skip-oauth-login') | |
218 url = self._GetHttpURLForDeviceManagement() | 136 url = self._GetHttpURLForDeviceManagement() |
219 flag = '--device-management-url=' + url | 137 flags.append('--device-management-url=' + url) |
220 flags += [flag] | 138 flags.append('--disable-sync') |
221 return flags | 139 return flags |
222 | 140 |
223 def setUp(self): | 141 def setUp(self): |
224 """Sets up the platform for policy testing. | 142 """Sets up the platform for policy testing. |
225 | 143 |
226 On ChromeOS, part of the set up involves restarting the session_manager and | 144 On ChromeOS, part of the setup involves restarting the session manager to |
227 logging in with the $default account. | 145 inject a device policy blob. |
228 """ | 146 """ |
229 if self.IsChromeOS(): | 147 if self.IsChromeOS(): |
230 # Setup a temporary data dir and a TestServer serving files from there. | 148 # Set up a temporary data dir and a TestServer serving files from there. |
231 # The TestServer makes its document root relative to the src dir. | 149 # The TestServer makes its document root relative to the src dir. |
232 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) | 150 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) |
233 relative_temp_data_dir = os.path.basename(self._temp_data_dir) | 151 relative_temp_data_dir = os.path.basename(self._temp_data_dir) |
234 self._http_server = self.StartHTTPServer(relative_temp_data_dir) | 152 self._http_server = self.StartHTTPServer(relative_temp_data_dir) |
235 | 153 |
236 # Setup empty policies, so that the TestServer can start replying. | 154 # Set up an empty user policy so that the TestServer can start replying. |
237 self._SetCloudPolicies() | 155 self._SetUserPolicyChromeOS() |
238 | 156 |
239 device_dmtoken = self._RegisterAndGetDMToken(device=True) | 157 # Generate a key pair for signing device policy. |
240 policy = self._FetchPolicy(token=device_dmtoken, device=True) | 158 self._private_key = tlslite.api.generateRSAKey(1024) |
241 user_dmtoken = self._RegisterAndGetDMToken(device=False) | 159 algorithm = asn1der.Sequence( |
160 [asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), | |
161 asn1der.Data(asn1der.NULL, '')]) | |
162 rsa_pubkey = asn1der.Sequence([asn1der.Integer(self._private_key.n), | |
163 asn1der.Integer(self._private_key.e)]) | |
164 self._public_key = asn1der.Sequence( | |
165 [algorithm, asn1der.Bitstring(rsa_pubkey)]) | |
242 | 166 |
243 # The device policy blob is only picked up by the session manager on | 167 # Clear device policy. This also invokes pyauto.PyUITest.setUp(self). |
244 # startup, and is overwritten on shutdown. So the blob has to be written | 168 self.SetDevicePolicy() |
245 # while the session manager is stopped. | |
246 self.WaitForSessionManagerRestart( | |
247 lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy)) | |
248 logging.debug('Session manager restarted with device policy ready') | |
249 | 169 |
250 pyauto.PyUITest.setUp(self) | 170 # Remove any existing vaults. |
251 | 171 self.RemoveAllCryptohomeVaultsOnChromeOS() |
252 if self.IsChromeOS(): | 172 else: |
253 logging.debug('Logging in') | 173 pyauto.PyUITest.setUp(self) |
254 credentials = self.GetPrivateInfo()['prod_enterprise_test_user'] | |
255 self.Login(credentials['username'], credentials['password']) | |
256 assert self.GetLoginInfo()['is_logged_in'] | |
257 | |
258 self._WriteUserPolicyToken(user_dmtoken) | |
259 # The browser has to be reloaded to make the user policy token cache | |
260 # reload the file just written. The file can also be written only after | |
261 # the cryptohome is mounted, after login. | |
262 self.RestartBrowser(clear_profile=False) | |
263 | 174 |
264 def tearDown(self): | 175 def tearDown(self): |
265 """Cleans up the files created by setUp and policies added in tests.""" | 176 """Cleans up the policies and related files created in tests.""" |
266 # Clear the policies. | |
267 self.SetPolicies() | |
268 | |
269 if self.IsChromeOS(): | 177 if self.IsChromeOS(): |
270 pyauto.PyUITest.Logout(self) | 178 # On ChromeOS, clearing device policy logs out the current user so that |
179 # no policy is in force. User policy is then permanently cleared at the | |
180 # end of the method after stopping the TestServer. | |
181 self.SetDevicePolicy() | |
182 else: | |
183 # On other platforms, there is only user policy to clear. | |
184 self.SetUserPolicy() | |
271 | 185 |
272 pyauto.PyUITest.tearDown(self) | 186 pyauto.PyUITest.tearDown(self) |
273 | 187 |
274 if self.IsChromeOS(): | 188 if self.IsChromeOS(): |
275 self.StopHTTPServer(self._http_server) | 189 self.StopHTTPServer(self._http_server) |
276 pyauto_utils.RemovePath(self._temp_data_dir) | 190 pyauto_utils.RemovePath(self._temp_data_dir) |
191 self.RemoveAllCryptohomeVaultsOnChromeOS() | |
277 | 192 |
278 def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, | 193 def LoginWithTestAccount(self, account='prod_enterprise_test_user'): |
279 device=None): | 194 """Convenience method for logging in with one of the test accounts.""" |
280 """Exports the policies to the configuration file of the TestServer. | 195 assert self.IsChromeOS() |
196 credentials = self.GetPrivateInfo()[account] | |
197 self.Login(credentials['username'], credentials['password']) | |
198 assert self.GetLoginInfo()['is_logged_in'] | |
281 | 199 |
282 The TestServer will serve these policies after this function returns. | 200 def SetPolicy(self): |
201 raise NotImplementedError('This class supports user and device policies. ' | |
202 'Instead of SetPolicies, use SetUserPolicy or ' | |
203 'SetDevicePolicy.') | |
283 | 204 |
284 Args: | 205 def _SetUserPolicyChromeOS(self, user_policy=None): |
285 user_mandatory: user policies of mandatory level | 206 """Writes the given user policy to the TestServer's input file.""" |
286 user_recommended: user policies of recommended level | |
287 device: device policies | |
288 """ | |
289 assert self.IsChromeOS() | 207 assert self.IsChromeOS() |
290 policy_dict = { | 208 policy_dict = { |
291 'google/chromeos/device': device or {}, | 209 'google/chromeos/device': {}, |
292 'google/chromeos/user': { | 210 'google/chromeos/user': { |
293 'mandatory': user_mandatory or {}, | 211 'mandatory': user_policy or {}, |
294 'recommended': user_recommended or {}, | 212 'recommended': {}, |
295 }, | 213 }, |
296 'managed_users': ['*'], | 214 'managed_users': ['*'], |
297 } | 215 } |
298 self._WriteFile(self._GetTestServerPoliciesFilePath(), | 216 self._WriteFile(self._GetTestServerPoliciesFilePath(), |
299 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') | 217 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') |
300 | 218 |
301 def _SetPoliciesWin(self, user_policy=None): | 219 def _SetUserPolicyWin(self, user_policy=None): |
302 """Exports the policies as dictionary in the argument to Window registry. | 220 """Writes the given user policy to the Windows registry.""" |
303 | |
304 Removes the registry key and its subkeys if they exist. | |
305 | |
306 Args: | |
307 user_policy: A dictionary representing the user policies. Clear the | |
308 registry if None. | |
309 | |
310 Raises: | |
311 TypeError: If an unexpected value is found in the policy dictionary. | |
312 """ | |
313 | |
314 def SetValueEx(key, sub_key, value): | 221 def SetValueEx(key, sub_key, value): |
315 if isinstance(value, int): | 222 if isinstance(value, int): |
316 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) | 223 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) |
317 elif isinstance(value, basestring): | 224 elif isinstance(value, basestring): |
318 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) | 225 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) |
319 elif isinstance(value, list): | 226 elif isinstance(value, list): |
320 k = winreg.CreateKey(key, sub_key) | 227 k = winreg.CreateKey(key, sub_key) |
321 for index, v in list(enumerate(value)): | 228 for index, v in list(enumerate(value)): |
322 SetValueEx(k, str(index + 1), v) | 229 SetValueEx(k, str(index + 1), v) |
323 winreg.CloseKey(k) | 230 winreg.CloseKey(k) |
(...skipping 10 matching lines...) Expand all Loading... | |
334 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: | 241 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: |
335 logging.debug(r'Removing %s' % reg_base) | 242 logging.debug(r'Removing %s' % reg_base) |
336 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) | 243 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) |
337 | 244 |
338 if user_policy is not None: | 245 if user_policy is not None: |
339 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) | 246 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) |
340 for k, v in user_policy.iteritems(): | 247 for k, v in user_policy.iteritems(): |
341 SetValueEx(root_key, k, v) | 248 SetValueEx(root_key, k, v) |
342 winreg.CloseKey(root_key) | 249 winreg.CloseKey(root_key) |
343 | 250 |
344 def _SetPoliciesLinux(self, user_policy=None): | 251 def _SetUserPolicyLinux(self, user_policy=None): |
345 """Exports the policies as dictionary in the argument to a JSON file. | 252 """Writes the given user policy to the JSON policy file read by Chrome.""" |
346 | |
347 Removes the JSON file if it exists. | |
348 | |
349 Args: | |
350 user_policy: A dictionary representing the user policies. Remove the | |
351 JSON file if None | |
352 """ | |
353 assert self.IsLinux() | 253 assert self.IsLinux() |
354 sudo_cmd_file = os.path.join(os.path.dirname(__file__), | 254 sudo_cmd_file = os.path.join(os.path.dirname(__file__), |
355 'policy_linux_util.py') | 255 'policy_linux_util.py') |
356 | 256 |
357 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': | 257 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': |
358 policies_location_base = '/etc/opt/chrome' | 258 policies_location_base = '/etc/opt/chrome' |
359 else: | 259 else: |
360 policies_location_base = '/etc/chromium' | 260 policies_location_base = '/etc/chromium' |
361 | 261 |
362 if os.path.isdir(policies_location_base): | 262 if os.path.isdir(policies_location_base): |
363 logging.debug('Removing directory %s' % policies_location_base) | 263 logging.debug('Removing directory %s' % policies_location_base) |
364 subprocess.call(['suid-python', sudo_cmd_file, | 264 subprocess.call(['suid-python', sudo_cmd_file, |
365 'remove_dir', policies_location_base]) | 265 'remove_dir', policies_location_base]) |
366 | 266 |
367 if user_policy is not None: | 267 if user_policy is not None: |
368 self._WriteFile('/tmp/chrome.json', | 268 self._WriteFile('/tmp/chrome.json', |
369 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') | 269 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') |
370 | 270 |
371 policies_location = '%s/policies/managed' % policies_location_base | 271 policies_location = '%s/policies/managed' % policies_location_base |
372 subprocess.call(['suid-python', sudo_cmd_file, | 272 subprocess.call(['suid-python', sudo_cmd_file, |
373 'setup_dir', policies_location]) | 273 'setup_dir', policies_location]) |
374 # Copy chrome.json file to the managed directory | 274 # Copy chrome.json file to the managed directory |
375 subprocess.call(['suid-python', sudo_cmd_file, | 275 subprocess.call(['suid-python', sudo_cmd_file, |
376 'copy', '/tmp/chrome.json', policies_location]) | 276 'copy', '/tmp/chrome.json', policies_location]) |
377 os.remove('/tmp/chrome.json') | 277 os.remove('/tmp/chrome.json') |
378 | 278 |
379 def SetPolicies(self, user_policy=None, device_policy=None): | 279 def SetUserPolicy(self, user_policy=None): |
380 """Enforces the policies given in the arguments as dictionaries. | 280 """Sets the user policy provided as a dict. |
381 | 281 |
382 These policies will have been installed after this call returns. | 282 Passing a value of None clears the user policy.""" |
383 | |
384 Args: | |
385 user_policy: A dictionary representing the user policies. | |
386 device_policy: A dictionary representing the device policies on Chrome OS. | |
387 | |
388 Raises: | |
389 NotImplementedError if the platform is not supported. | |
390 """ | |
391 if self.IsChromeOS(): | 283 if self.IsChromeOS(): |
392 self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) | 284 self._SetUserPolicyChromeOS(user_policy=user_policy) |
285 elif self.IsWin(): | |
286 self._SetUserPolicyWin(user_policy=user_policy) | |
287 elif self.IsLinux(): | |
288 self._SetUserPolicyLinux(user_policy=user_policy) | |
393 else: | 289 else: |
394 if device_policy is not None: | 290 raise NotImplementedError('Not available on this platform.') |
395 raise NotImplementedError('Device policy is only available on ChromeOS') | |
396 if self.IsWin(): | |
397 self._SetPoliciesWin(user_policy=user_policy) | |
398 elif self.IsLinux(): | |
399 self._SetPoliciesLinux(user_policy=user_policy) | |
400 else: | |
401 raise NotImplementedError('Not available on this platform.') | |
402 | 291 |
403 self.RefreshPolicies() | 292 self.RefreshPolicies() |
404 | 293 |
294 def _SetProtobufMessageField(self, group_message, field, field_value): | |
295 """Sets the given field in a protobuf to the given value.""" | |
296 if field.label == field.LABEL_REPEATED: | |
297 assert type(field_value) == list | |
298 entries = group_message.__getattribute__(field.name) | |
299 for list_item in field_value: | |
300 entries.append(list_item) | |
301 return | |
302 elif field.type == field.TYPE_BOOL: | |
303 assert type(field_value) == bool | |
304 elif field.type == field.TYPE_STRING: | |
305 assert type(field_value) == str or type(field_value) == unicode | |
306 elif field.type == field.TYPE_INT64: | |
307 assert type(field_value) == int | |
308 elif (field.type == field.TYPE_MESSAGE and | |
309 field.message_type.name == 'StringList'): | |
310 assert type(field_value) == list | |
311 entries = group_message.__getattribute__(field.name).entries | |
312 for list_item in field_value: | |
313 entries.append(list_item) | |
314 return | |
315 else: | |
316 raise Exception('Unknown field type %s' % field.type) | |
317 group_message.__setattr__(field.name, field_value) | |
318 | |
319 def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None): | |
320 """Generates a signed device policy blob.""" | |
321 | |
322 # Fill in the device settings protobuf. | |
323 device_policy = device_policy or {} | |
324 owner = owner or constants.CREDENTIALS['$mockowner'][0] | |
325 settings = dp.ChromeDeviceSettingsProto() | |
326 for group in settings.DESCRIPTOR.fields: | |
327 # Create protobuf message for group. | |
328 group_message = eval('dp.' + group.message_type.name + '()') | |
329 # Indicates if at least one field was set in |group_message|. | |
330 got_fields = False | |
331 # Iterate over fields of the message and feed them from the policy dict. | |
332 for field in group_message.DESCRIPTOR.fields: | |
333 field_value = None | |
334 if field.name in device_policy: | |
335 got_fields = True | |
336 field_value = device_policy[field.name] | |
337 self._SetProtobufMessageField(group_message, field, field_value) | |
338 if got_fields: | |
339 settings.__getattribute__(group.name).CopyFrom(group_message) | |
340 | |
341 # Fill in the policy data protobuf. | |
342 policy_data = dm.PolicyData() | |
343 policy_data.policy_type = 'google/chromeos/device' | |
344 policy_data.policy_value = settings.SerializeToString() | |
345 policy_data.username = owner | |
346 serialized_policy_data = policy_data.SerializeToString() | |
347 | |
348 # Fill in the device management response protobuf. | |
349 response = dm.DeviceManagementResponse() | |
350 fetch_response = response.policy_response.response.add() | |
351 fetch_response.policy_data = serialized_policy_data | |
352 fetch_response.policy_data_signature = ( | |
353 self._private_key.hashAndSign(serialized_policy_data).tostring()) | |
354 | |
355 self._device_policy_blob = fetch_response.SerializeToString() | |
356 | |
357 def _RefreshDevicePolicy(self): | |
358 """Refreshes the device policy in force on ChromeOS.""" | |
359 assert self.IsChromeOS() | |
360 # The device policy blob is only picked up by the session manager on | |
361 # startup, and is overwritten on shutdown. So the blob has to be written | |
362 # while the session manager is stopped. | |
363 self.WaitForSessionManagerRestart( | |
364 lambda: self._WriteDevicePolicyWithSessionManagerStopped()) | |
Nirnimesh
2012/04/05 19:05:30
lambda is redundant.
self.WaitForSessionManagerRe
bartfab (slow)
2012/04/10 15:27:20
Done.
| |
365 logging.debug('Session manager restarted with device policy ready') | |
366 pyauto.PyUITest.setUp(self) | |
367 | |
368 def SetDevicePolicy(self, device_policy=None, owner=None): | |
369 """Sets the device policy provided as a dict and the owner on ChromeOS. | |
370 | |
371 Passing a value of None as the device policy clears it.""" | |
372 if not self.IsChromeOS(): | |
373 raise NotImplementedError('Device policy is only available on ChromeOS.') | |
374 | |
375 self._GenerateDevicePolicyBlob(device_policy, owner) | |
376 self._RefreshDevicePolicy() | |
OLD | NEW |