OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import hashlib |
| 6 import os |
| 7 import types |
| 8 |
| 9 # This file implements very minimal certificate and OCSP generation. It's |
| 10 # designed to test revocation checking. |
| 11 |
| 12 def RandomNumber(length_in_bytes): |
| 13 '''RandomNumber returns a random number of length 8*|length_in_bytes| bits''' |
| 14 rand = os.urandom(length_in_bytes) |
| 15 n = 0 |
| 16 for x in rand: |
| 17 n <<= 8 |
| 18 n |= ord(x) |
| 19 return n |
| 20 |
| 21 def ModExp(n, e, p): |
| 22 '''ModExp returns n^e mod p''' |
| 23 r = 1 |
| 24 while e != 0: |
| 25 if e & 1: |
| 26 r = (r*n) % p |
| 27 e >>= 1 |
| 28 n = (n*n) % p |
| 29 return r |
| 30 |
| 31 class RSA(object): |
| 32 def __init__(self, modulus, e, d): |
| 33 self.m = modulus |
| 34 self.e = e |
| 35 self.d = d |
| 36 |
| 37 self.modlen = 0 |
| 38 m = modulus |
| 39 while m != 0: |
| 40 self.modlen += 1 |
| 41 m >>= 8 |
| 42 |
| 43 def Sign(self, message): |
| 44 digest = hashlib.sha1(message).digest() |
| 45 # This is the ASN.1 prefix for a SHA1 signature. |
| 46 prefix = '3021300906052b0e03021a05000414'.decode('hex') |
| 47 |
| 48 em = ['\xff'] * (self.modlen - 1 - len(prefix) - len(digest)) |
| 49 em[0] = '\x00' |
| 50 em[1] = '\x01' |
| 51 em += "\x00" + prefix + digest |
| 52 |
| 53 n = 0 |
| 54 for x in em: |
| 55 n <<= 8 |
| 56 n |= ord(x) |
| 57 |
| 58 s = ModExp(n, self.d, self.m) |
| 59 out = [] |
| 60 while s != 0: |
| 61 out.append(s & 0xff) |
| 62 s >>= 8 |
| 63 out.reverse() |
| 64 return '\x00' * (self.modlen - len(out)) + ToBytes(out) |
| 65 |
| 66 def ToDER(self): |
| 67 return ToDER(SEQUENCE([self.m, self.e])) |
| 68 |
| 69 # The following functions and classes implement a trivial ASN.1 DER encoder. |
| 70 |
| 71 def ToDER(obj): |
| 72 '''ToDER converts the given object into DER encoding''' |
| 73 if type(obj) == types.NoneType: |
| 74 # None turns into NULL |
| 75 return TagAndLength(5, 0) |
| 76 if type(obj) == types.StringType: |
| 77 # Strings are PRINTABLESTRING |
| 78 return TagAndLength(19, len(obj)) + obj |
| 79 if type(obj) == types.BooleanType: |
| 80 val = "\x00" |
| 81 if obj: |
| 82 val = "\xff" |
| 83 return TagAndLength(1, 1) + val |
| 84 if type(obj) == types.IntType or type(obj) == types.LongType: |
| 85 big_endian = [] |
| 86 val = obj |
| 87 while val != 0: |
| 88 big_endian.append(val & 0xff) |
| 89 val >>= 8 |
| 90 |
| 91 if len(big_endian) == 0 or big_endian[-1] >= 128: |
| 92 big_endian.append(0) |
| 93 |
| 94 big_endian.reverse() |
| 95 return TagAndLength(2, len(big_endian)) + ToBytes(big_endian) |
| 96 |
| 97 return obj.ToDER() |
| 98 |
| 99 def ToBytes(array_of_bytes): |
| 100 '''ToBytes converts the array of byte values into a binary string''' |
| 101 return ''.join([chr(x) for x in array_of_bytes]) |
| 102 |
| 103 def TagAndLength(tag, length): |
| 104 der = [tag] |
| 105 if length < 128: |
| 106 der.append(length) |
| 107 elif length < 256: |
| 108 der.append(0x81) |
| 109 der.append(length) |
| 110 elif length < 65535: |
| 111 der.append(0x82) |
| 112 der.append(length >> 8) |
| 113 der.append(length & 0xff) |
| 114 else: |
| 115 raise "too long" |
| 116 |
| 117 return ToBytes(der) |
| 118 |
| 119 class Raw(object): |
| 120 '''Raw contains raw DER encoded bytes that are used verbatim''' |
| 121 def __init__(self, der): |
| 122 self.der = der |
| 123 |
| 124 def ToDER(self): |
| 125 return self.der |
| 126 |
| 127 class Explicit(object): |
| 128 '''Explicit prepends an explicit tag''' |
| 129 def __init__(self, tag, child): |
| 130 self.tag = tag |
| 131 self.child = child |
| 132 |
| 133 def ToDER(self): |
| 134 der = ToDER(self.child) |
| 135 tag = self.tag |
| 136 tag |= 0x80 # content specific |
| 137 tag |= 0x20 # complex |
| 138 return TagAndLength(tag, len(der)) + der |
| 139 |
| 140 class ENUMERATED(object): |
| 141 def __init__(self, value): |
| 142 self.value = value |
| 143 |
| 144 def ToDER(self): |
| 145 return TagAndLength(10, 1) + chr(self.value) |
| 146 |
| 147 class SEQUENCE(object): |
| 148 def __init__(self, children): |
| 149 self.children = children |
| 150 |
| 151 def ToDER(self): |
| 152 der = ''.join([ToDER(x) for x in self.children]) |
| 153 return TagAndLength(0x30, len(der)) + der |
| 154 |
| 155 class SET(object): |
| 156 def __init__(self, children): |
| 157 self.children = children |
| 158 |
| 159 def ToDER(self): |
| 160 der = ''.join([ToDER(x) for x in self.children]) |
| 161 return TagAndLength(0x31, len(der)) + der |
| 162 |
| 163 class OCTETSTRING(object): |
| 164 def __init__(self, val): |
| 165 self.val = val |
| 166 |
| 167 def ToDER(self): |
| 168 return TagAndLength(4, len(self.val)) + self.val |
| 169 |
| 170 class OID(object): |
| 171 def __init__(self, parts): |
| 172 self.parts = parts |
| 173 |
| 174 def ToDER(self): |
| 175 if len(self.parts) < 2 or self.parts[0] > 6 or self.parts[1] >= 40: |
| 176 raise "invalid OID" |
| 177 |
| 178 der = [self.parts[0]*40 + self.parts[1]] |
| 179 for x in self.parts[2:]: |
| 180 if x == 0: |
| 181 der.append(0) |
| 182 else: |
| 183 octets = [] |
| 184 while x != 0: |
| 185 v = x & 0x7f |
| 186 if len(octets) > 0: |
| 187 v |= 0x80 |
| 188 octets.append(v) |
| 189 x >>= 7 |
| 190 octets.reverse() |
| 191 der = der + octets |
| 192 |
| 193 return TagAndLength(6, len(der)) + ToBytes(der) |
| 194 |
| 195 class UTCTime(object): |
| 196 def __init__(self, time_str): |
| 197 self.time_str = time_str |
| 198 |
| 199 def ToDER(self): |
| 200 return TagAndLength(23, len(self.time_str)) + self.time_str |
| 201 |
| 202 class GeneralizedTime(object): |
| 203 def __init__(self, time_str): |
| 204 self.time_str = time_str |
| 205 |
| 206 def ToDER(self): |
| 207 return TagAndLength(24, len(self.time_str)) + self.time_str |
| 208 |
| 209 class BitString(object): |
| 210 def __init__(self, bits): |
| 211 self.bits = bits |
| 212 |
| 213 def ToDER(self): |
| 214 return TagAndLength(3, 1 + len(self.bits)) + "\x00" + self.bits |
| 215 |
| 216 def Name(cn = None, c = None, o = None): |
| 217 names = SEQUENCE([]) |
| 218 |
| 219 if cn is not None: |
| 220 names.children.append( |
| 221 SET([ |
| 222 SEQUENCE([ |
| 223 common_name, cn, |
| 224 ]) |
| 225 ]) |
| 226 ) |
| 227 |
| 228 if c is not None: |
| 229 names.children.append( |
| 230 SET([ |
| 231 SEQUENCE([ |
| 232 country, c, |
| 233 ]) |
| 234 ]) |
| 235 ) |
| 236 |
| 237 if o is not None: |
| 238 names.children.append( |
| 239 SET([ |
| 240 SEQUENCE([ |
| 241 organization, o, |
| 242 ]) |
| 243 ]) |
| 244 ) |
| 245 |
| 246 return names |
| 247 |
| 248 # The private key and root certificate name are hard coded here: |
| 249 |
| 250 # This is the private key |
| 251 key = RSA(0x00a71998f2930bfe73d031a87f133d2f378eeeeed52a77e44d0fc9ff6f07ff32cbf3
da999de4ed65832afcb0807f98787506539d258a0ce3c2c77967653099a9034a9b115a876c39a8c4
e4ed4acd0c64095946fb39eeeb47a0704dbb018acf48c3a1c4b895fc409fb4a340a986b1afc45519
ab9eca47c30185c771c64aa5ecf07d, |
| 252 3, |
| 253 0x6f6665f70cb2a9a28acbc5aa0cd374cfb49f49e371a542de0a86aa4a0554cc87f7e7
1113edf399021ca875aaffbafaf8aee268c3b15ded2c84fb9a4375bbc6011d841e57833bc6f998d2
5daf6fa7f166b233e3e54a4bae7a5aaaba21431324967d5ff3e1d4f413827994262115ca54396e70
68d0afa7af787a5782bc7040e6d3) |
| 254 |
| 255 # And the same thing in PEM format |
| 256 key_pem = '''-----BEGIN RSA PRIVATE KEY----- |
| 257 MIICXAIBAAKBgQCnGZjykwv+c9AxqH8TPS83ju7u1Sp35E0Pyf9vB/8yy/PamZ3k |
| 258 7WWDKvywgH+YeHUGU50ligzjwsd5Z2UwmakDSpsRWodsOajE5O1KzQxkCVlG+znu |
| 259 60egcE27AYrPSMOhxLiV/ECftKNAqYaxr8RVGaueykfDAYXHccZKpezwfQIBAwKB |
| 260 gG9mZfcMsqmiisvFqgzTdM+0n0njcaVC3gqGqkoFVMyH9+cRE+3zmQIcqHWq/7r6 |
| 261 +K7iaMOxXe0shPuaQ3W7xgEdhB5XgzvG+ZjSXa9vp/FmsjPj5UpLrnpaqrohQxMk |
| 262 ln1f8+HU9BOCeZQmIRXKVDlucGjQr6eveHpXgrxwQObTAkEA2wBAfuduw5G0/VfN |
| 263 Wx66D5fbPccfYFqLM5LuTimLmNqzK2gIKXckB2sm44gJZ6wVlumaB1CSNug2LNYx |
| 264 3cAjUwJBAMNUo1hbI8ugqqwI9kpxv9+2Heea4BlnXbS6tYF8pvkHMoliuxNbXmmB |
| 265 u4zNB5iZ6V0ZZ4nvtUNo2cGr/h/Lcu8CQQCSACr/RPSCYSNTj948vya1D+d+hL+V |
| 266 kbIiYfQ0G7Jl5yIc8AVw+hgE8hntBVuacrkPRmaviwwkms7IjsvpKsI3AkEAgjhs |
| 267 5ZIX3RXHHVtO3EvVP86+mmdAEO+TzdHOVlMZ+1ohsOx8t5I+8QEnszNaZbvw6Lua |
| 268 W/UjgkXmgR1UFTJMnwJBAKErmAw21/g3SST0a4wlyaGT/MbXL8Ouwnb5IOKQVe55 |
| 269 CZdeVeSh6cJ4hAcQKfr2s1JaZTJFIBPGKAif5HqpydA= |
| 270 -----END RSA PRIVATE KEY----- |
| 271 ''' |
| 272 |
| 273 # Root certificate CN |
| 274 issuer_cn = "Testing CA" |
| 275 |
| 276 # All certificates are issued under this policy OID, in the Google arc: |
| 277 certPolicyOID = OID([1, 3, 6, 1, 4, 1, 11129, 2, 4, 1]) |
| 278 |
| 279 # These result in the following root certificate: |
| 280 # -----BEGIN CERTIFICATE----- |
| 281 # MIIB0TCCATqgAwIBAgIBATANBgkqhkiG9w0BAQUFADAVMRMwEQYDVQQDEwpUZXN0aW5nIENBMB4X |
| 282 # DTEwMDEwMTA2MDAwMFoXDTMyMTIwMTA2MDAwMFowFTETMBEGA1UEAxMKVGVzdGluZyBDQTCBnTAN |
| 283 # BgkqhkiG9w0BAQEFAAOBiwAwgYcCgYEApxmY8pML/nPQMah/Ez0vN47u7tUqd+RND8n/bwf/Msvz |
| 284 # 2pmd5O1lgyr8sIB/mHh1BlOdJYoM48LHeWdlMJmpA0qbEVqHbDmoxOTtSs0MZAlZRvs57utHoHBN |
| 285 # uwGKz0jDocS4lfxAn7SjQKmGsa/EVRmrnspHwwGFx3HGSqXs8H0CAQOjMzAxMBIGA1UdEwEB/wQI |
| 286 # MAYBAf8CAQAwGwYDVR0gAQEABBEwDzANBgsrBgEEAdZ5AgHODzANBgkqhkiG9w0BAQUFAAOBgQA/ |
| 287 # STb40A6D+93jMfLGQzXc997IsaJZdoPt7tYa8PqGJBL62EiTj+erd/H5pDZx/2/bcpOG4m9J56yg |
| 288 # wOohbllw2TM+oeEd8syzV6X+1SIPnGI56JRrm3UXcHYx1Rq5loM9WKAiz/WmIWmskljsEQ7+542p |
| 289 # q0pkHjs8nuXovSkUYA== |
| 290 # -----END CERTIFICATE----- |
| 291 |
| 292 # If you update any of the above, you can generate a new root with the |
| 293 # following line: |
| 294 # print DERToPEM(MakeCertificate(issuer_cn, issuer_cn, 1, key, key, None)) |
| 295 |
| 296 # Various OIDs |
| 297 |
| 298 aia_ocsp = OID([1, 3, 6, 1, 5, 5, 7, 48, 1]) |
| 299 authority_information_access = OID([1, 3, 6, 1, 5, 5, 7, 1, 1]) |
| 300 basic_constraints = OID([2, 5, 29, 19]) |
| 301 cert_policies = OID([2, 5, 29, 32]) |
| 302 common_name = OID([2, 5, 4, 3]) |
| 303 country = OID([2, 5, 4, 6]) |
| 304 hash_sha1 = OID([1, 3, 14, 3, 2, 26]) |
| 305 ocsp_type_basic = OID([1, 3, 6, 1, 5, 5, 7, 48, 1, 1]) |
| 306 organization = OID([2, 5, 4, 10]) |
| 307 public_key_rsa = OID([1, 2, 840, 113549, 1, 1, 1]) |
| 308 sha1_with_rsa_encryption = OID([1, 2, 840, 113549, 1, 1, 5]) |
| 309 |
| 310 def MakeCertificate( |
| 311 issuer_cn, subject_cn, serial, pubkey, privkey, ocsp_url = None): |
| 312 '''MakeCertificate returns a DER encoded certificate, signed by privkey.''' |
| 313 extensions = SEQUENCE([]) |
| 314 |
| 315 # Default subject name fields |
| 316 c = "XX" |
| 317 o = "Testing Org" |
| 318 |
| 319 if issuer_cn == subject_cn: |
| 320 # Root certificate. |
| 321 c = None |
| 322 o = None |
| 323 extensions.children.append( |
| 324 SEQUENCE([ |
| 325 basic_constraints, |
| 326 True, |
| 327 OCTETSTRING(ToDER(SEQUENCE([ |
| 328 True, # IsCA |
| 329 0, # Path len |
| 330 ]))), |
| 331 ])) |
| 332 |
| 333 if ocsp_url is not None: |
| 334 extensions.children.append( |
| 335 SEQUENCE([ |
| 336 authority_information_access, |
| 337 False, |
| 338 OCTETSTRING(ToDER(SEQUENCE([ |
| 339 SEQUENCE([ |
| 340 aia_ocsp, |
| 341 Raw(TagAndLength(0x86, len(ocsp_url)) + ocsp_url), |
| 342 ]), |
| 343 ]))), |
| 344 ])) |
| 345 |
| 346 extensions.children.append( |
| 347 SEQUENCE([ |
| 348 cert_policies, |
| 349 False, |
| 350 OCTETSTRING(ToDER(SEQUENCE([ |
| 351 SEQUENCE([ # PolicyInformation |
| 352 certPolicyOID, |
| 353 ]), |
| 354 ]))), |
| 355 ]) |
| 356 ) |
| 357 |
| 358 tbsCert = ToDER(SEQUENCE([ |
| 359 Explicit(0, 2), # Version |
| 360 serial, |
| 361 SEQUENCE([sha1_with_rsa_encryption, None]), # SignatureAlgorithm |
| 362 Name(cn = issuer_cn), # Issuer |
| 363 SEQUENCE([ # Validity |
| 364 UTCTime("100101060000Z"), # NotBefore |
| 365 UTCTime("321201060000Z"), # NotAfter |
| 366 ]), |
| 367 Name(cn = subject_cn, c = c, o = o), # Subject |
| 368 SEQUENCE([ # SubjectPublicKeyInfo |
| 369 SEQUENCE([ # Algorithm |
| 370 public_key_rsa, |
| 371 None, |
| 372 ]), |
| 373 BitString(ToDER(key)), |
| 374 ]), |
| 375 Explicit(3, extensions), |
| 376 ])) |
| 377 |
| 378 return ToDER(SEQUENCE([ |
| 379 Raw(tbsCert), |
| 380 SEQUENCE([ |
| 381 sha1_with_rsa_encryption, |
| 382 None, |
| 383 ]), |
| 384 BitString(key.Sign(tbsCert)), |
| 385 ])) |
| 386 |
| 387 def MakeOCSPResponse(issuer_cn, issuer_key, serial, revoked): |
| 388 # https://tools.ietf.org/html/rfc2560 |
| 389 issuer_name_hash = OCTETSTRING( |
| 390 hashlib.sha1(ToDER(Name(cn = issuer_cn))).digest()) |
| 391 |
| 392 issuer_key_hash = OCTETSTRING(hashlib.sha1(ToDER(issuer_key)).digest()) |
| 393 |
| 394 cert_status = None |
| 395 if revoked: |
| 396 cert_status = Explicit(1, GeneralizedTime("20100101060000Z")) |
| 397 else: |
| 398 cert_status = Raw(TagAndLength(0x80 | 0, 0)) |
| 399 |
| 400 basic_resp_data_der = ToDER(SEQUENCE([ |
| 401 Explicit(2, issuer_key_hash), |
| 402 GeneralizedTime("20100101060000Z"), # producedAt |
| 403 SEQUENCE([ |
| 404 SEQUENCE([ # SingleResponse |
| 405 SEQUENCE([ # CertID |
| 406 SEQUENCE([ # hashAlgorithm |
| 407 hash_sha1, |
| 408 None, |
| 409 ]), |
| 410 issuer_name_hash, |
| 411 issuer_key_hash, |
| 412 serial, |
| 413 ]), |
| 414 cert_status, |
| 415 GeneralizedTime("20100101060000Z"), # thisUpdate |
| 416 Explicit(0, GeneralizedTime("20300101060000Z")), # nextUpdate |
| 417 ]), |
| 418 ]), |
| 419 ])) |
| 420 |
| 421 basic_resp = SEQUENCE([ |
| 422 Raw(basic_resp_data_der), |
| 423 SEQUENCE([ |
| 424 sha1_with_rsa_encryption, |
| 425 None, |
| 426 ]), |
| 427 BitString(key.Sign(basic_resp_data_der)), |
| 428 ]) |
| 429 |
| 430 resp = SEQUENCE([ |
| 431 ENUMERATED(0), |
| 432 Explicit(0, SEQUENCE([ |
| 433 ocsp_type_basic, |
| 434 OCTETSTRING(ToDER(basic_resp)), |
| 435 ])) |
| 436 ]) |
| 437 |
| 438 return ToDER(resp) |
| 439 |
| 440 def DERToPEM(der): |
| 441 pem = '-----BEGIN CERTIFICATE-----\n' |
| 442 pem += der.encode('base64') |
| 443 pem += '-----END CERTIFICATE-----\n' |
| 444 return pem |
| 445 |
| 446 def GenerateCertKeyAndOCSP(subject = "127.0.0.1", |
| 447 ocsp_url = "http://127.0.0.1", |
| 448 ocsp_revoked = False): |
| 449 '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: |
| 450 * cert_and_key_pem contains a certificate and private key in PEM format |
| 451 with the given subject common name and OCSP URL. |
| 452 * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is |
| 453 None''' |
| 454 |
| 455 serial = RandomNumber(16) |
| 456 cert_der = MakeCertificate(issuer_cn, subject, serial, key, key, ocsp_url) |
| 457 cert_pem = DERToPEM(cert_der) |
| 458 |
| 459 ocsp_der = None |
| 460 if ocsp_url is not None: |
| 461 ocsp_der = MakeOCSPResponse(issuer_cn, key, serial, ocsp_revoked) |
| 462 |
| 463 return (cert_pem + key_pem, ocsp_der) |
OLD | NEW |