Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(417)

Side by Side Diff: chrome/test/pyautolib/policy_base.py

Issue 9791023: Allow setting of user and device policies in functional tests (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Comments addressed. Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « chrome/test/pyautolib/asn1der.py ('k') | chrome/test/pyautolib/pyauto.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/python 1 #!/usr/bin/python
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be 3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file. 4 # found in the LICENSE file.
5 5
6 """Base class for tests that need to update the policies enforced by Chrome. 6 """Base class for tests that need to update the policies enforced by Chrome.
7 7
8 Subclasses can call SetPolicies with a dictionary of policies to install. 8 Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and
9 SetPolicies can also be used to set the device policies on ChromeOS. 9 SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install.
10 10
11 The current implementation depends on the platform. The implementations might 11 The current implementation depends on the platform. The implementations might
12 change in the future, but tests relying on SetPolicies will keep working. 12 change in the future, but tests relying on the above calls will keep working.
13 """ 13 """
14 14
15 # On ChromeOS this relies on the device_management.py part of the TestServer, 15 # On ChromeOS, both user and device policies are supported. Chrome is set up to
16 # and forces the policies by triggering a new policy fetch and refreshing the 16 # fetch user policy from a TestServer. A call to SetUserPolicy triggers an
17 # cloud policy providers. This requires preparing the system for policy 17 # immediate policy refresh, allowing the effects of user policy changes on a
18 # fetching, which currently means the DMTokens have to be in place. Without the 18 # running session to be tested.
19 # DMTokens, the cloud policy controller won't be able to proceed, because the
20 # Gaia tokens for the DMService aren't available during tests.
21 # In the future this setup might not be necessary anymore, and the policy
22 # might also be pushed through the session_manager.
23 # 19 #
24 # On other platforms it relies on the SetPolicies automation call, which is 20 # Device policy is injected by stopping Chrome and the session manager, writing
25 # only available on non-official builds. This automation call is meant to be 21 # a new device policy blob and starting Chrome and the session manager again.
26 # eventually removed when a replacement for every platform is available. 22 # This means that setting device policy always logs out the current user. It is
27 # This requires setting up the policies in the registry on Windows, and writing 23 # *not* possible to test the effects on a running session. These limitations
28 # the right files on Linux and Mac. 24 # stem from the fact that Chrome will only fetch device policy from a TestServer
25 # if the device is enterprise owned. Enterprise ownership, in turn, requires
26 # ownership of the TPM which can only be undone by reboothing the device (and
27 # hence is not possible in a client test).
29 28
30 import json 29 import json
31 import logging 30 import logging
32 import os 31 import os
33 import subprocess 32 import subprocess
34 import tempfile 33 import tempfile
35 import urllib
36 import urllib2
37 34
35 import asn1der
38 import pyauto 36 import pyauto
39 import pyauto_paths 37 import pyauto_paths
40 import pyauto_utils 38 import pyauto_utils
41 39
42 40
43 if pyauto.PyUITest.IsChromeOS(): 41 if pyauto.PyUITest.IsChromeOS():
44 import sys 42 import sys
43 import warnings
44 # Ignore deprecation warnings, they make our output more cluttered.
45 warnings.filterwarnings('ignore', category=DeprecationWarning)
46
45 # Find the path to the pyproto and add it to sys.path. 47 # Find the path to the pyproto and add it to sys.path.
46 # Prepend it so that google.protobuf is loaded from here. 48 # Prepend it so that google.protobuf is loaded from here.
47 for path in pyauto_paths.GetBuildDirs(): 49 for path in pyauto_paths.GetBuildDirs():
48 p = os.path.join(path, 'pyproto') 50 p = os.path.join(path, 'pyproto')
49 if os.path.isdir(p): 51 if os.path.isdir(p):
50 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy', 52 sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy',
51 'proto')] + sys.path 53 'proto')] + sys.path
52 break 54 break
53 sys.path.append('/usr/local') # to import autotest libs. 55 sys.path.append('/usr/local') # to import autotest libs.
54 import device_management_local_pb2 as dml 56 sys.path.append(os.path.join(pyauto_paths.GetThirdPartyDir(), 'tlslite'))
55 import device_management_backend_pb2 as dmb 57
58 import chrome_device_policy_pb2 as dp
59 import device_management_backend_pb2 as dm
60 import tlslite.api
56 from autotest.cros import constants 61 from autotest.cros import constants
57 from autotest.cros import cros_ui 62 from autotest.cros import cros_ui
58 elif pyauto.PyUITest.IsWin(): 63 elif pyauto.PyUITest.IsWin():
59 import _winreg as winreg 64 import _winreg as winreg
60 65
66 # ASN.1 object identifier for PKCS#1/RSA.
67 PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
68
61 69
62 class PolicyTestBase(pyauto.PyUITest): 70 class PolicyTestBase(pyauto.PyUITest):
63 """A base class for tests that need to set up and modify policies. 71 """A base class for tests that need to set up and modify policies.
64 72
65 Subclasses can use the SetPolicies call to set the policies seen by Chrome. 73 Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and
74 SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome.
66 """ 75 """
67 76
68 def _WriteFile(self, path, content): 77 def _WriteFile(self, path, content):
69 """Writes content to path, creating any intermediary directories.""" 78 """Writes content to path, creating any intermediary directories."""
70 if not os.path.exists(os.path.dirname(path)): 79 if not os.path.exists(os.path.dirname(path)):
71 os.makedirs(os.path.dirname(path)) 80 os.makedirs(os.path.dirname(path))
72 f = open(path, 'w') 81 f = open(path, 'w')
73 f.write(content) 82 f.write(content)
74 f.close() 83 f.close()
75 84
76 def _GetTestServerPoliciesFilePath(self): 85 def _GetTestServerPoliciesFilePath(self):
77 """Returns the path of the cloud policy configuration file.""" 86 """Returns the path of the cloud policy configuration file."""
78 assert self.IsChromeOS() 87 assert self.IsChromeOS()
79 return os.path.join(self._temp_data_dir, 'device_management') 88 return os.path.join(self._temp_data_dir, 'device_management')
80 89
81 def _GetHttpURLForDeviceManagement(self): 90 def _GetHttpURLForDeviceManagement(self):
91 """Returns the URL at which the TestServer is serving user policy."""
82 assert self.IsChromeOS() 92 assert self.IsChromeOS()
83 return self._http_server.GetURL('device_management').spec() 93 return self._http_server.GetURL('device_management').spec()
84 94
85 def _WriteUserPolicyToken(self, token): 95 def _WriteDevicePolicyWithSessionManagerStopped(self):
86 """Writes the given token to the user device management cache.""" 96 """Writes the device policy blob while the session manager is stopped.
87 assert self.IsChromeOS()
88 blob = dml.DeviceCredentials()
89 blob.device_token = token
90 blob.device_id = '123'
91 self._WriteFile('/home/chronos/user/Device Management/Token',
92 blob.SerializeToString())
93 97
94 def _WriteDevicePolicy(self, fetch_response): 98 Updates the files holding the device policy blob and the public key need to
95 """Writes the given signed fetch_response to the device policy cache. 99 verify its signature.
96
97 Also writes the owner key, used to verify the signature.
98 """ 100 """
99 assert self.IsChromeOS() 101 assert self.IsChromeOS()
100 self._WriteFile(constants.SIGNED_POLICY_FILE,
101 fetch_response.SerializeToString())
102 self._WriteFile(constants.OWNER_KEY_FILE,
103 fetch_response.new_public_key)
104
105 def _PostToDMServer(self, request_type, body, headers):
106 """Posts a request to the TestServer's Device Management interface.
107
108 |request_type| is the value of the 'request' HTTP parameter.
109 Returns the response's body.
110 """
111 assert self.IsChromeOS()
112 url = self._GetHttpURLForDeviceManagement()
113 url += '?' + urllib.urlencode({
114 'deviceid': '123',
115 'oauth_token': '456',
116 'request': request_type,
117 'devicetype': 2,
118 'apptype': 'Chrome',
119 'agent': 'Chrome',
120 })
121 return urllib2.urlopen(urllib2.Request(url, body, headers)).read()
122
123 def _PostRegisterRequest(self, type):
124 """Sends a device register request to the TestServer, of the given type."""
125 assert self.IsChromeOS()
126 request = dmb.DeviceManagementRequest()
127 register = request.register_request
128 register.machine_id = '789'
129 register.type = type
130 return self._PostToDMServer('register', request.SerializeToString(), {})
131
132 def _RegisterAndGetDMToken(self, device):
133 """Registers with the TestServer and returns the DMToken fetched.
134
135 Registers for device policy if device is True. Otherwise registers for
136 user policy.
137 """
138 assert self.IsChromeOS()
139 type = device and dmb.DeviceRegisterRequest.DEVICE \
140 or dmb.DeviceRegisterRequest.USER
141 rstring = self._PostRegisterRequest(type)
142 response = dmb.DeviceManagementResponse()
143 response.ParseFromString(rstring)
144 return response.register_response.device_management_token
145
146 def _PostPolicyRequest(self, token, type, want_signature=False):
147 """Fetches policy from the TestServer, using the given token.
148
149 Policy is fetched for the given type. If want_signature is True, the
150 request will ask for a signed response. Returns the response body.
151 """
152 assert self.IsChromeOS()
153 request = dmb.DeviceManagementRequest()
154 policy = request.policy_request
155 prequest = policy.request.add()
156 prequest.policy_type = type
157 if want_signature:
158 prequest.signature_type = dmb.PolicyFetchRequest.SHA1_RSA
159 headers = {
160 'Authorization': 'GoogleDMToken token=' + token,
161 }
162 return self._PostToDMServer('policy', request.SerializeToString(), headers)
163
164 def _FetchPolicy(self, token, device):
165 """Fetches policy from the TestServer, using the given token.
166
167 Token must be a valid token retrieved with _RegisterAndGetDMToken. If
168 device is True, fetches signed device policy. Otherwise fetches user policy.
169 This method also verifies the response, and returns the first policy fetch
170 response.
171 """
172 assert self.IsChromeOS()
173 type = device and 'google/chromeos/device' or 'google/chromeos/user'
174 rstring = self._PostPolicyRequest(token=token, type=type,
175 want_signature=device)
176 response = dmb.DeviceManagementResponse()
177 response.ParseFromString(rstring)
178 fetch_response = response.policy_response.response[0]
179 assert fetch_response.policy_data
180 assert fetch_response.policy_data_signature
181 assert fetch_response.new_public_key
182 return fetch_response
183
184 def _WriteDevicePolicyWithSessionManagerStopped(self, policy):
185 """Writes the device policy blob while the Session Manager is stopped."""
186 assert self.IsChromeOS()
187 logging.debug('Stopping session manager') 102 logging.debug('Stopping session manager')
188 cros_ui.stop(allow_fail=True) 103 cros_ui.stop(allow_fail=True)
189 logging.debug('Writing device policy cache') 104 logging.debug('Writing device policy blob')
190 self._WriteDevicePolicy(policy) 105 self._WriteFile(constants.SIGNED_POLICY_FILE, self._device_policy_blob)
106 self._WriteFile(constants.OWNER_KEY_FILE, self._public_key)
191 107
192 # Ugly hack: session manager won't spawn chrome if this file exists. That's 108 # Ugly hack: session manager will not spawn Chrome if this file exists. That
193 # usually a good thing (to keep the automation channel open), but in this 109 # is usually a good thing (to keep the automation channel open), but in this
194 # case we really want to restart chrome. PyUITest.setUp() will be called 110 # case we really want to restart chrome. PyUITest.setUp() will be called
195 # after session manager and chrome have restarted, and will setup the 111 # after session manager and chrome have restarted, and will setup the
196 # automation channel. 112 # automation channel.
197 restore_magic_file = False 113 restore_magic_file = False
198 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE): 114 if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE):
199 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. ' 115 logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. '
200 'Removing temporarily for the next restart.') 116 'Removing temporarily for the next restart.')
201 restore_magic_file = True 117 restore_magic_file = True
202 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) 118 os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
203 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) 119 assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
204 120
205 logging.debug('Starting session manager again') 121 logging.debug('Starting session manager again')
206 cros_ui.start() 122 cros_ui.start()
207 123
208 # cros_ui.start() waits for the login prompt to be visible, so chrome has 124 # cros_ui.start() waits for the login prompt to be visible, so Chrome has
209 # already started once it returns. 125 # already started once it returns.
210 if restore_magic_file: 126 if restore_magic_file:
211 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close() 127 open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close()
212 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE) 128 assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
213 129
214 def ExtraChromeFlags(self): 130 def ExtraChromeFlags(self):
215 """Sets up Chrome to use cloud policies on ChromeOS.""" 131 """Sets up Chrome to use cloud policies on ChromeOS."""
216 flags = pyauto.PyUITest.ExtraChromeFlags(self) 132 flags = pyauto.PyUITest.ExtraChromeFlags(self)
217 if self.IsChromeOS(): 133 if self.IsChromeOS():
134 while '--skip-oauth-login' in flags:
135 flags.remove('--skip-oauth-login')
218 url = self._GetHttpURLForDeviceManagement() 136 url = self._GetHttpURLForDeviceManagement()
219 flag = '--device-management-url=' + url 137 flags.append('--device-management-url=' + url)
220 flags += [flag] 138 flags.append('--disable-sync')
221 return flags 139 return flags
222 140
223 def setUp(self): 141 def setUp(self):
224 """Sets up the platform for policy testing. 142 """Sets up the platform for policy testing.
225 143
226 On ChromeOS, part of the set up involves restarting the session_manager and 144 On ChromeOS, part of the setup involves restarting the session manager to
227 logging in with the $default account. 145 inject a device policy blob.
228 """ 146 """
229 if self.IsChromeOS(): 147 if self.IsChromeOS():
230 # Setup a temporary data dir and a TestServer serving files from there. 148 # Set up a temporary data dir and a TestServer serving files from there.
231 # The TestServer makes its document root relative to the src dir. 149 # The TestServer makes its document root relative to the src dir.
232 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir()) 150 self._temp_data_dir = tempfile.mkdtemp(dir=pyauto_paths.GetSourceDir())
233 relative_temp_data_dir = os.path.basename(self._temp_data_dir) 151 relative_temp_data_dir = os.path.basename(self._temp_data_dir)
234 self._http_server = self.StartHTTPServer(relative_temp_data_dir) 152 self._http_server = self.StartHTTPServer(relative_temp_data_dir)
235 153
236 # Setup empty policies, so that the TestServer can start replying. 154 # Set up an empty user policy so that the TestServer can start replying.
237 self._SetCloudPolicies() 155 self._SetUserPolicyChromeOS()
238 156
239 device_dmtoken = self._RegisterAndGetDMToken(device=True) 157 # Generate a key pair for signing device policy.
240 policy = self._FetchPolicy(token=device_dmtoken, device=True) 158 self._private_key = tlslite.api.generateRSAKey(1024)
241 user_dmtoken = self._RegisterAndGetDMToken(device=False) 159 algorithm = asn1der.Sequence(
160 [asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID),
161 asn1der.Data(asn1der.NULL, '')])
162 rsa_pubkey = asn1der.Sequence([asn1der.Integer(self._private_key.n),
163 asn1der.Integer(self._private_key.e)])
164 self._public_key = asn1der.Sequence(
165 [algorithm, asn1der.Bitstring(rsa_pubkey)])
242 166
243 # The device policy blob is only picked up by the session manager on 167 # Clear device policy. This also invokes pyauto.PyUITest.setUp(self).
244 # startup, and is overwritten on shutdown. So the blob has to be written 168 self.SetDevicePolicy()
245 # while the session manager is stopped.
246 self.WaitForSessionManagerRestart(
247 lambda: self._WriteDevicePolicyWithSessionManagerStopped(policy))
248 logging.debug('Session manager restarted with device policy ready')
249 169
250 pyauto.PyUITest.setUp(self) 170 # Remove any existing vaults.
251 171 self.RemoveAllCryptohomeVaultsOnChromeOS()
252 if self.IsChromeOS(): 172 else:
253 logging.debug('Logging in') 173 pyauto.PyUITest.setUp(self)
254 credentials = self.GetPrivateInfo()['prod_enterprise_test_user']
255 self.Login(credentials['username'], credentials['password'])
256 assert self.GetLoginInfo()['is_logged_in']
257
258 self._WriteUserPolicyToken(user_dmtoken)
259 # The browser has to be reloaded to make the user policy token cache
260 # reload the file just written. The file can also be written only after
261 # the cryptohome is mounted, after login.
262 self.RestartBrowser(clear_profile=False)
263 174
264 def tearDown(self): 175 def tearDown(self):
265 """Cleans up the files created by setUp and policies added in tests.""" 176 """Cleans up the policies and related files created in tests."""
266 # Clear the policies.
267 self.SetPolicies()
268
269 if self.IsChromeOS(): 177 if self.IsChromeOS():
270 pyauto.PyUITest.Logout(self) 178 # On ChromeOS, clearing device policy logs out the current user so that
179 # no policy is in force. User policy is then permanently cleared at the
180 # end of the method after stopping the TestServer.
181 self.SetDevicePolicy()
182 else:
183 # On other platforms, there is only user policy to clear.
184 self.SetUserPolicy()
271 185
272 pyauto.PyUITest.tearDown(self) 186 pyauto.PyUITest.tearDown(self)
273 187
274 if self.IsChromeOS(): 188 if self.IsChromeOS():
275 self.StopHTTPServer(self._http_server) 189 self.StopHTTPServer(self._http_server)
276 pyauto_utils.RemovePath(self._temp_data_dir) 190 pyauto_utils.RemovePath(self._temp_data_dir)
191 self.RemoveAllCryptohomeVaultsOnChromeOS()
277 192
278 def _SetCloudPolicies(self, user_mandatory=None, user_recommended=None, 193 def LoginWithTestAccount(self, account='prod_enterprise_test_user'):
279 device=None): 194 """Convenience method for logging in with one of the test accounts."""
280 """Exports the policies to the configuration file of the TestServer. 195 assert self.IsChromeOS()
196 credentials = self.GetPrivateInfo()[account]
197 self.Login(credentials['username'], credentials['password'])
198 assert self.GetLoginInfo()['is_logged_in']
281 199
282 The TestServer will serve these policies after this function returns. 200 def _SetUserPolicyChromeOS(self, user_policy=None):
283 201 """Writes the given user policy to the TestServer's input file."""
284 Args:
285 user_mandatory: user policies of mandatory level
286 user_recommended: user policies of recommended level
287 device: device policies
288 """
289 assert self.IsChromeOS() 202 assert self.IsChromeOS()
290 policy_dict = { 203 policy_dict = {
291 'google/chromeos/device': device or {}, 204 'google/chromeos/device': {},
292 'google/chromeos/user': { 205 'google/chromeos/user': {
293 'mandatory': user_mandatory or {}, 206 'mandatory': user_policy or {},
294 'recommended': user_recommended or {}, 207 'recommended': {},
295 }, 208 },
296 'managed_users': ['*'], 209 'managed_users': ['*'],
297 } 210 }
298 self._WriteFile(self._GetTestServerPoliciesFilePath(), 211 self._WriteFile(self._GetTestServerPoliciesFilePath(),
299 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n') 212 json.dumps(policy_dict, sort_keys=True, indent=2) + '\n')
300 213
301 def _SetPoliciesWin(self, user_policy=None): 214 def _SetUserPolicyWin(self, user_policy=None):
302 """Exports the policies as dictionary in the argument to Window registry. 215 """Writes the given user policy to the Windows registry."""
303
304 Removes the registry key and its subkeys if they exist.
305
306 Args:
307 user_policy: A dictionary representing the user policies. Clear the
308 registry if None.
309
310 Raises:
311 TypeError: If an unexpected value is found in the policy dictionary.
312 """
313
314 def SetValueEx(key, sub_key, value): 216 def SetValueEx(key, sub_key, value):
315 if isinstance(value, int): 217 if isinstance(value, int):
316 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value)) 218 winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value))
317 elif isinstance(value, basestring): 219 elif isinstance(value, basestring):
318 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii')) 220 winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii'))
319 elif isinstance(value, list): 221 elif isinstance(value, list):
320 k = winreg.CreateKey(key, sub_key) 222 k = winreg.CreateKey(key, sub_key)
321 for index, v in list(enumerate(value)): 223 for index, v in list(enumerate(value)):
322 SetValueEx(k, str(index + 1), v) 224 SetValueEx(k, str(index + 1), v)
323 winreg.CloseKey(k) 225 winreg.CloseKey(k)
(...skipping 10 matching lines...) Expand all
334 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0: 236 r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0:
335 logging.debug(r'Removing %s' % reg_base) 237 logging.debug(r'Removing %s' % reg_base)
336 subprocess.call(r'reg delete HKLM\%s /f' % reg_base) 238 subprocess.call(r'reg delete HKLM\%s /f' % reg_base)
337 239
338 if user_policy is not None: 240 if user_policy is not None:
339 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base) 241 root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base)
340 for k, v in user_policy.iteritems(): 242 for k, v in user_policy.iteritems():
341 SetValueEx(root_key, k, v) 243 SetValueEx(root_key, k, v)
342 winreg.CloseKey(root_key) 244 winreg.CloseKey(root_key)
343 245
344 def _SetPoliciesLinux(self, user_policy=None): 246 def _SetUserPolicyLinux(self, user_policy=None):
345 """Exports the policies as dictionary in the argument to a JSON file. 247 """Writes the given user policy to the JSON policy file read by Chrome."""
346
347 Removes the JSON file if it exists.
348
349 Args:
350 user_policy: A dictionary representing the user policies. Remove the
351 JSON file if None
352 """
353 assert self.IsLinux() 248 assert self.IsLinux()
354 sudo_cmd_file = os.path.join(os.path.dirname(__file__), 249 sudo_cmd_file = os.path.join(os.path.dirname(__file__),
355 'policy_linux_util.py') 250 'policy_linux_util.py')
356 251
357 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome': 252 if self.GetBrowserInfo()['properties']['branding'] == 'Google Chrome':
358 policies_location_base = '/etc/opt/chrome' 253 policies_location_base = '/etc/opt/chrome'
359 else: 254 else:
360 policies_location_base = '/etc/chromium' 255 policies_location_base = '/etc/chromium'
361 256
362 if os.path.isdir(policies_location_base): 257 if os.path.isdir(policies_location_base):
363 logging.debug('Removing directory %s' % policies_location_base) 258 logging.debug('Removing directory %s' % policies_location_base)
364 subprocess.call(['suid-python', sudo_cmd_file, 259 subprocess.call(['suid-python', sudo_cmd_file,
365 'remove_dir', policies_location_base]) 260 'remove_dir', policies_location_base])
366 261
367 if user_policy is not None: 262 if user_policy is not None:
368 self._WriteFile('/tmp/chrome.json', 263 self._WriteFile('/tmp/chrome.json',
369 json.dumps(user_policy, sort_keys=True, indent=2) + '\n') 264 json.dumps(user_policy, sort_keys=True, indent=2) + '\n')
370 265
371 policies_location = '%s/policies/managed' % policies_location_base 266 policies_location = '%s/policies/managed' % policies_location_base
372 subprocess.call(['suid-python', sudo_cmd_file, 267 subprocess.call(['suid-python', sudo_cmd_file,
373 'setup_dir', policies_location]) 268 'setup_dir', policies_location])
374 # Copy chrome.json file to the managed directory 269 # Copy chrome.json file to the managed directory
375 subprocess.call(['suid-python', sudo_cmd_file, 270 subprocess.call(['suid-python', sudo_cmd_file,
376 'copy', '/tmp/chrome.json', policies_location]) 271 'copy', '/tmp/chrome.json', policies_location])
377 os.remove('/tmp/chrome.json') 272 os.remove('/tmp/chrome.json')
378 273
379 def SetPolicies(self, user_policy=None, device_policy=None): 274 def SetUserPolicy(self, user_policy=None):
380 """Enforces the policies given in the arguments as dictionaries. 275 """Sets the user policy provided as a dict.
381 276
382 These policies will have been installed after this call returns. 277 Passing a value of None clears the user policy."""
383
384 Args:
385 user_policy: A dictionary representing the user policies.
386 device_policy: A dictionary representing the device policies on Chrome OS.
387
388 Raises:
389 NotImplementedError if the platform is not supported.
390 """
391 if self.IsChromeOS(): 278 if self.IsChromeOS():
392 self._SetCloudPolicies(user_mandatory=user_policy, device=device_policy) 279 self._SetUserPolicyChromeOS(user_policy=user_policy)
280 elif self.IsWin():
281 self._SetUserPolicyWin(user_policy=user_policy)
282 elif self.IsLinux():
283 self._SetUserPolicyLinux(user_policy=user_policy)
393 else: 284 else:
394 if device_policy is not None: 285 raise NotImplementedError('Not available on this platform.')
395 raise NotImplementedError('Device policy is only available on ChromeOS')
396 if self.IsWin():
397 self._SetPoliciesWin(user_policy=user_policy)
398 elif self.IsLinux():
399 self._SetPoliciesLinux(user_policy=user_policy)
400 else:
401 raise NotImplementedError('Not available on this platform.')
402 286
403 self.RefreshPolicies() 287 self.RefreshPolicies()
404 288
289 def _SetProtobufMessageField(self, group_message, field, field_value):
290 """Sets the given field in a protobuf to the given value."""
291 if field.label == field.LABEL_REPEATED:
292 assert type(field_value) == list
293 entries = group_message.__getattribute__(field.name)
294 for list_item in field_value:
295 entries.append(list_item)
296 return
297 elif field.type == field.TYPE_BOOL:
298 assert type(field_value) == bool
299 elif field.type == field.TYPE_STRING:
300 assert type(field_value) == str or type(field_value) == unicode
301 elif field.type == field.TYPE_INT64:
302 assert type(field_value) == int
303 elif (field.type == field.TYPE_MESSAGE and
304 field.message_type.name == 'StringList'):
305 assert type(field_value) == list
306 entries = group_message.__getattribute__(field.name).entries
307 for list_item in field_value:
308 entries.append(list_item)
309 return
310 else:
311 raise Exception('Unknown field type %s' % field.type)
312 group_message.__setattr__(field.name, field_value)
313
314 def _GenerateDevicePolicyBlob(self, device_policy=None, owner=None):
315 """Generates a signed device policy blob."""
316
317 # Fill in the device settings protobuf.
318 device_policy = device_policy or {}
319 owner = owner or constants.CREDENTIALS['$mockowner'][0]
320 settings = dp.ChromeDeviceSettingsProto()
321 for group in settings.DESCRIPTOR.fields:
322 # Create protobuf message for group.
323 group_message = eval('dp.' + group.message_type.name + '()')
324 # Indicates if at least one field was set in |group_message|.
325 got_fields = False
326 # Iterate over fields of the message and feed them from the policy dict.
327 for field in group_message.DESCRIPTOR.fields:
328 field_value = None
329 if field.name in device_policy:
330 got_fields = True
331 field_value = device_policy[field.name]
332 self._SetProtobufMessageField(group_message, field, field_value)
333 if got_fields:
334 settings.__getattribute__(group.name).CopyFrom(group_message)
335
336 # Fill in the policy data protobuf.
337 policy_data = dm.PolicyData()
338 policy_data.policy_type = 'google/chromeos/device'
339 policy_data.policy_value = settings.SerializeToString()
340 policy_data.username = owner
341 serialized_policy_data = policy_data.SerializeToString()
342
343 # Fill in the device management response protobuf.
344 response = dm.DeviceManagementResponse()
345 fetch_response = response.policy_response.response.add()
346 fetch_response.policy_data = serialized_policy_data
347 fetch_response.policy_data_signature = (
348 self._private_key.hashAndSign(serialized_policy_data).tostring())
349
350 self._device_policy_blob = fetch_response.SerializeToString()
351
352 def _RefreshDevicePolicy(self):
353 """Refreshes the device policy in force on ChromeOS."""
354 assert self.IsChromeOS()
355 # The device policy blob is only picked up by the session manager on
356 # startup, and is overwritten on shutdown. So the blob has to be written
357 # while the session manager is stopped.
358 self.WaitForSessionManagerRestart(
359 self._WriteDevicePolicyWithSessionManagerStopped)
360 logging.debug('Session manager restarted with device policy ready')
361 pyauto.PyUITest.setUp(self)
362
363 def SetDevicePolicy(self, device_policy=None, owner=None):
364 """Sets the device policy provided as a dict and the owner on ChromeOS.
365
366 Passing a value of None as the device policy clears it."""
367 if not self.IsChromeOS():
368 raise NotImplementedError('Device policy is only available on ChromeOS.')
369
370 self._GenerateDevicePolicyBlob(device_policy, owner)
371 self._RefreshDevicePolicy()
OLDNEW
« no previous file with comments | « chrome/test/pyautolib/asn1der.py ('k') | chrome/test/pyautolib/pyauto.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698