OLD | NEW |
---|---|
(Empty) | |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import asn1 | |
6 import hashlib | |
7 import os | |
8 | |
9 | |
10 # This file implements very minimal certificate and OCSP generation. It's | |
11 # designed to test revocation checking. | |
12 | |
13 def RandomNumber(length_in_bytes): | |
14 '''RandomNumber returns a random number of length 8*|length_in_bytes| bits''' | |
15 rand = os.urandom(length_in_bytes) | |
16 n = 0 | |
17 for x in rand: | |
18 n <<= 8 | |
19 n |= ord(x) | |
20 return n | |
21 | |
22 | |
23 def ModExp(n, e, p): | |
24 '''ModExp returns n^e mod p''' | |
25 r = 1 | |
26 while e != 0: | |
27 if e & 1: | |
28 r = (r*n) % p | |
29 e >>= 1 | |
30 n = (n*n) % p | |
31 return r | |
32 | |
33 | |
34 class RSA(object): | |
35 def __init__(self, modulus, e, d): | |
36 self.m = modulus | |
37 self.e = e | |
38 self.d = d | |
39 | |
40 self.modlen = 0 | |
41 m = modulus | |
42 while m != 0: | |
43 self.modlen += 1 | |
44 m >>= 8 | |
45 | |
46 def Sign(self, message): | |
47 digest = hashlib.sha1(message).digest() | |
48 # This is the ASN.1 prefix for a SHA1 signature. | |
49 prefix = '3021300906052b0e03021a05000414'.decode('hex') | |
Ryan Sleevi
2012/03/13 23:06:39
nit: This should be some form of constant.
agl
2012/03/13 23:44:03
Done.
| |
50 | |
51 em = ['\xff'] * (self.modlen - 1 - len(prefix) - len(digest)) | |
52 em[0] = '\x00' | |
53 em[1] = '\x01' | |
54 em += "\x00" + prefix + digest | |
55 | |
56 n = 0 | |
57 for x in em: | |
58 n <<= 8 | |
59 n |= ord(x) | |
60 | |
61 s = ModExp(n, self.d, self.m) | |
62 out = [] | |
63 while s != 0: | |
64 out.append(s & 0xff) | |
65 s >>= 8 | |
66 out.reverse() | |
67 return '\x00' * (self.modlen - len(out)) + asn1.ToBytes(out) | |
68 | |
69 def ToDER(self): | |
70 return asn1.ToDER(asn1.SEQUENCE([self.m, self.e])) | |
71 | |
72 | |
73 def Name(cn = None, c = None, o = None): | |
74 names = asn1.SEQUENCE([]) | |
75 | |
76 if cn is not None: | |
77 names.children.append( | |
78 asn1.SET([ | |
79 asn1.SEQUENCE([ | |
80 common_name, cn, | |
81 ]) | |
82 ]) | |
83 ) | |
84 | |
85 if c is not None: | |
86 names.children.append( | |
87 asn1.SET([ | |
88 asn1.SEQUENCE([ | |
89 country, c, | |
90 ]) | |
91 ]) | |
92 ) | |
93 | |
94 if o is not None: | |
95 names.children.append( | |
96 asn1.SET([ | |
97 asn1.SEQUENCE([ | |
98 organization, o, | |
99 ]) | |
100 ]) | |
101 ) | |
102 | |
103 return names | |
104 | |
105 | |
106 # The private key and root certificate name are hard coded here: | |
107 | |
108 # This is the private key | |
109 key = RSA(0x00a71998f2930bfe73d031a87f133d2f378eeeeed52a77e44d0fc9ff6f07ff32cbf3 da999de4ed65832afcb0807f98787506539d258a0ce3c2c77967653099a9034a9b115a876c39a8c4 e4ed4acd0c64095946fb39eeeb47a0704dbb018acf48c3a1c4b895fc409fb4a340a986b1afc45519 ab9eca47c30185c771c64aa5ecf07d, | |
110 3, | |
111 0x6f6665f70cb2a9a28acbc5aa0cd374cfb49f49e371a542de0a86aa4a0554cc87f7e7 1113edf399021ca875aaffbafaf8aee268c3b15ded2c84fb9a4375bbc6011d841e57833bc6f998d2 5daf6fa7f166b233e3e54a4bae7a5aaaba21431324967d5ff3e1d4f413827994262115ca54396e70 68d0afa7af787a5782bc7040e6d3) | |
112 | |
113 # And the same thing in PEM format | |
114 key_pem = '''-----BEGIN RSA PRIVATE KEY----- | |
115 MIICXAIBAAKBgQCnGZjykwv+c9AxqH8TPS83ju7u1Sp35E0Pyf9vB/8yy/PamZ3k | |
116 7WWDKvywgH+YeHUGU50ligzjwsd5Z2UwmakDSpsRWodsOajE5O1KzQxkCVlG+znu | |
117 60egcE27AYrPSMOhxLiV/ECftKNAqYaxr8RVGaueykfDAYXHccZKpezwfQIBAwKB | |
118 gG9mZfcMsqmiisvFqgzTdM+0n0njcaVC3gqGqkoFVMyH9+cRE+3zmQIcqHWq/7r6 | |
119 +K7iaMOxXe0shPuaQ3W7xgEdhB5XgzvG+ZjSXa9vp/FmsjPj5UpLrnpaqrohQxMk | |
120 ln1f8+HU9BOCeZQmIRXKVDlucGjQr6eveHpXgrxwQObTAkEA2wBAfuduw5G0/VfN | |
121 Wx66D5fbPccfYFqLM5LuTimLmNqzK2gIKXckB2sm44gJZ6wVlumaB1CSNug2LNYx | |
122 3cAjUwJBAMNUo1hbI8ugqqwI9kpxv9+2Heea4BlnXbS6tYF8pvkHMoliuxNbXmmB | |
123 u4zNB5iZ6V0ZZ4nvtUNo2cGr/h/Lcu8CQQCSACr/RPSCYSNTj948vya1D+d+hL+V | |
124 kbIiYfQ0G7Jl5yIc8AVw+hgE8hntBVuacrkPRmaviwwkms7IjsvpKsI3AkEAgjhs | |
125 5ZIX3RXHHVtO3EvVP86+mmdAEO+TzdHOVlMZ+1ohsOx8t5I+8QEnszNaZbvw6Lua | |
126 W/UjgkXmgR1UFTJMnwJBAKErmAw21/g3SST0a4wlyaGT/MbXL8Ouwnb5IOKQVe55 | |
127 CZdeVeSh6cJ4hAcQKfr2s1JaZTJFIBPGKAif5HqpydA= | |
128 -----END RSA PRIVATE KEY----- | |
129 ''' | |
130 | |
131 # Root certificate CN | |
132 issuer_cn = "Testing CA" | |
133 | |
134 # All certificates are issued under this policy OID, in the Google arc: | |
135 certPolicyOID = asn1.OID([1, 3, 6, 1, 4, 1, 11129, 2, 4, 1]) | |
136 | |
137 # These result in the following root certificate: | |
138 # -----BEGIN CERTIFICATE----- | |
139 # MIIB0TCCATqgAwIBAgIBATANBgkqhkiG9w0BAQUFADAVMRMwEQYDVQQDEwpUZXN0aW5nIENBMB4X | |
140 # DTEwMDEwMTA2MDAwMFoXDTMyMTIwMTA2MDAwMFowFTETMBEGA1UEAxMKVGVzdGluZyBDQTCBnTAN | |
141 # BgkqhkiG9w0BAQEFAAOBiwAwgYcCgYEApxmY8pML/nPQMah/Ez0vN47u7tUqd+RND8n/bwf/Msvz | |
142 # 2pmd5O1lgyr8sIB/mHh1BlOdJYoM48LHeWdlMJmpA0qbEVqHbDmoxOTtSs0MZAlZRvs57utHoHBN | |
143 # uwGKz0jDocS4lfxAn7SjQKmGsa/EVRmrnspHwwGFx3HGSqXs8H0CAQOjMzAxMBIGA1UdEwEB/wQI | |
144 # MAYBAf8CAQAwGwYDVR0gAQEABBEwDzANBgsrBgEEAdZ5AgHODzANBgkqhkiG9w0BAQUFAAOBgQA/ | |
145 # STb40A6D+93jMfLGQzXc997IsaJZdoPt7tYa8PqGJBL62EiTj+erd/H5pDZx/2/bcpOG4m9J56yg | |
146 # wOohbllw2TM+oeEd8syzV6X+1SIPnGI56JRrm3UXcHYx1Rq5loM9WKAiz/WmIWmskljsEQ7+542p | |
147 # q0pkHjs8nuXovSkUYA== | |
148 # -----END CERTIFICATE----- | |
149 | |
Ryan Sleevi
2012/03/13 23:06:39
As should all of these
agl
2012/03/13 23:44:03
They are constants, but I've renamed them ALL_CAPS
| |
150 # If you update any of the above, you can generate a new root with the | |
151 # following line: | |
152 # print DERToPEM(MakeCertificate(issuer_cn, issuer_cn, 1, key, key, None)) | |
153 | |
154 | |
155 # Various OIDs | |
156 | |
157 aia_ocsp = asn1.OID([1, 3, 6, 1, 5, 5, 7, 48, 1]) | |
158 authority_information_access = asn1.OID([1, 3, 6, 1, 5, 5, 7, 1, 1]) | |
159 basic_constraints = asn1.OID([2, 5, 29, 19]) | |
160 cert_policies = asn1.OID([2, 5, 29, 32]) | |
161 common_name = asn1.OID([2, 5, 4, 3]) | |
162 country = asn1.OID([2, 5, 4, 6]) | |
163 hash_sha1 = asn1.OID([1, 3, 14, 3, 2, 26]) | |
164 ocsp_type_basic = asn1.OID([1, 3, 6, 1, 5, 5, 7, 48, 1, 1]) | |
165 organization = asn1.OID([2, 5, 4, 10]) | |
166 public_key_rsa = asn1.OID([1, 2, 840, 113549, 1, 1, 1]) | |
167 sha1_with_rsa_encryption = asn1.OID([1, 2, 840, 113549, 1, 1, 5]) | |
Ryan Sleevi
2012/03/13 23:06:39
And these, renamed in ALL_CAPS per the style.
agl
2012/03/13 23:44:03
Done.
| |
168 | |
169 | |
170 def MakeCertificate( | |
171 issuer_cn, subject_cn, serial, pubkey, privkey, ocsp_url = None): | |
172 '''MakeCertificate returns a DER encoded certificate, signed by privkey.''' | |
173 extensions = asn1.SEQUENCE([]) | |
174 | |
175 # Default subject name fields | |
176 c = "XX" | |
177 o = "Testing Org" | |
178 | |
179 if issuer_cn == subject_cn: | |
180 # Root certificate. | |
181 c = None | |
182 o = None | |
183 extensions.children.append( | |
184 asn1.SEQUENCE([ | |
185 basic_constraints, | |
186 True, | |
187 asn1.OCTETSTRING(asn1.ToDER(asn1.SEQUENCE([ | |
188 True, # IsCA | |
189 0, # Path len | |
190 ]))), | |
191 ])) | |
192 | |
193 if ocsp_url is not None: | |
194 extensions.children.append( | |
195 asn1.SEQUENCE([ | |
196 authority_information_access, | |
197 False, | |
198 asn1.OCTETSTRING(asn1.ToDER(asn1.SEQUENCE([ | |
199 asn1.SEQUENCE([ | |
200 aia_ocsp, | |
201 asn1.Raw(asn1.TagAndLength(0x86, len(ocsp_url)) + ocsp_url), | |
202 ]), | |
203 ]))), | |
204 ])) | |
205 | |
206 extensions.children.append( | |
207 asn1.SEQUENCE([ | |
208 cert_policies, | |
209 False, | |
210 asn1.OCTETSTRING(asn1.ToDER(asn1.SEQUENCE([ | |
211 asn1.SEQUENCE([ # PolicyInformation | |
212 certPolicyOID, | |
213 ]), | |
214 ]))), | |
215 ]) | |
216 ) | |
217 | |
218 tbsCert = asn1.ToDER(asn1.SEQUENCE([ | |
219 asn1.Explicit(0, 2), # Version | |
220 serial, | |
221 asn1.SEQUENCE([sha1_with_rsa_encryption, None]), # SignatureAlgorithm | |
222 Name(cn = issuer_cn), # Issuer | |
223 asn1.SEQUENCE([ # Validity | |
224 asn1.UTCTime("100101060000Z"), # NotBefore | |
225 asn1.UTCTime("321201060000Z"), # NotAfter | |
226 ]), | |
227 Name(cn = subject_cn, c = c, o = o), # Subject | |
228 asn1.SEQUENCE([ # SubjectPublicKeyInfo | |
229 asn1.SEQUENCE([ # Algorithm | |
230 public_key_rsa, | |
231 None, | |
232 ]), | |
233 asn1.BitString(asn1.ToDER(key)), | |
234 ]), | |
235 asn1.Explicit(3, extensions), | |
236 ])) | |
237 | |
238 return asn1.ToDER(asn1.SEQUENCE([ | |
239 asn1.Raw(tbsCert), | |
240 asn1.SEQUENCE([ | |
241 sha1_with_rsa_encryption, | |
242 None, | |
243 ]), | |
244 asn1.BitString(key.Sign(tbsCert)), | |
245 ])) | |
246 | |
247 | |
248 def MakeOCSPResponse(issuer_cn, issuer_key, serial, revoked): | |
249 # https://tools.ietf.org/html/rfc2560 | |
250 issuer_name_hash = asn1.OCTETSTRING( | |
251 hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest()) | |
252 | |
253 issuer_key_hash = asn1.OCTETSTRING( | |
254 hashlib.sha1(asn1.ToDER(issuer_key)).digest()) | |
255 | |
256 cert_status = None | |
257 if revoked: | |
258 cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z")) | |
259 else: | |
260 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) | |
261 | |
262 basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([ | |
263 asn1.Explicit(2, issuer_key_hash), | |
264 asn1.GeneralizedTime("20100101060000Z"), # producedAt | |
265 asn1.SEQUENCE([ | |
266 asn1.SEQUENCE([ # SingleResponse | |
267 asn1.SEQUENCE([ # CertID | |
268 asn1.SEQUENCE([ # hashAlgorithm | |
269 hash_sha1, | |
270 None, | |
271 ]), | |
272 issuer_name_hash, | |
273 issuer_key_hash, | |
274 serial, | |
275 ]), | |
276 cert_status, | |
277 asn1.GeneralizedTime("20100101060000Z"), # thisUpdate | |
278 asn1.Explicit(0, asn1.GeneralizedTime("20300101060000Z")), # nextUpdate | |
279 ]), | |
280 ]), | |
281 ])) | |
282 | |
283 basic_resp = asn1.SEQUENCE([ | |
284 asn1.Raw(basic_resp_data_der), | |
285 asn1.SEQUENCE([ | |
286 sha1_with_rsa_encryption, | |
287 None, | |
288 ]), | |
289 asn1.BitString(key.Sign(basic_resp_data_der)), | |
290 ]) | |
291 | |
292 resp = asn1.SEQUENCE([ | |
293 asn1.ENUMERATED(0), | |
294 asn1.Explicit(0, asn1.SEQUENCE([ | |
295 ocsp_type_basic, | |
296 asn1.OCTETSTRING(asn1.ToDER(basic_resp)), | |
297 ])) | |
298 ]) | |
299 | |
300 return asn1.ToDER(resp) | |
301 | |
302 | |
303 def DERToPEM(der): | |
304 pem = '-----BEGIN CERTIFICATE-----\n' | |
305 pem += der.encode('base64') | |
306 pem += '-----END CERTIFICATE-----\n' | |
307 return pem | |
308 | |
309 | |
310 def GenerateCertKeyAndOCSP(subject = "127.0.0.1", | |
311 ocsp_url = "http://127.0.0.1", | |
312 ocsp_revoked = False): | |
313 '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: | |
314 * cert_and_key_pem contains a certificate and private key in PEM format | |
315 with the given subject common name and OCSP URL. | |
316 * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is | |
317 None''' | |
318 | |
319 serial = RandomNumber(16) | |
320 cert_der = MakeCertificate(issuer_cn, subject, serial, key, key, ocsp_url) | |
321 cert_pem = DERToPEM(cert_der) | |
322 | |
323 ocsp_der = None | |
324 if ocsp_url is not None: | |
325 ocsp_der = MakeOCSPResponse(issuer_cn, key, serial, ocsp_revoked) | |
326 | |
327 return (cert_pem + key_pem, ocsp_der) | |
OLD | NEW |