Index: net/tools/testserver/minica.py |
diff --git a/net/tools/testserver/minica.py b/net/tools/testserver/minica.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5e7ec1ab580a947b0d958edbdeaded0c34557bae |
--- /dev/null |
+++ b/net/tools/testserver/minica.py |
@@ -0,0 +1,463 @@ |
+# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import hashlib |
+import os |
+import types |
+ |
+# This file implements very minimal certificate and OCSP generation. It's |
+# designed to test revocation checking. |
+ |
+def RandomNumber(length_in_bytes): |
+ '''RandomNumber returns a random number of length 8*|length_in_bytes| bits''' |
+ rand = os.urandom(length_in_bytes) |
+ n = 0 |
+ for x in rand: |
+ n <<= 8 |
+ n |= ord(x) |
+ return n |
+ |
+def ModExp(n, e, p): |
+ '''ModExp returns n^e mod p''' |
+ r = 1 |
+ while e != 0: |
+ if e & 1: |
+ r = (r*n) % p |
+ e >>= 1 |
+ n = (n*n) % p |
+ return r |
+ |
+class RSA(object): |
+ def __init__(self, modulus, e, d): |
+ self.m = modulus |
+ self.e = e |
+ self.d = d |
+ |
+ self.modlen = 0 |
+ m = modulus |
+ while m != 0: |
+ self.modlen += 1 |
+ m >>= 8 |
+ |
+ def Sign(self, message): |
+ digest = hashlib.sha1(message).digest() |
+ # This is the ASN.1 prefix for a SHA1 signature. |
+ prefix = '3021300906052b0e03021a05000414'.decode('hex') |
+ |
+ em = ['\xff'] * (self.modlen - 1 - len(prefix) - len(digest)) |
+ em[0] = '\x00' |
+ em[1] = '\x01' |
+ em += "\x00" + prefix + digest |
+ |
+ n = 0 |
+ for x in em: |
+ n <<= 8 |
+ n |= ord(x) |
+ |
+ s = ModExp(n, self.d, self.m) |
+ out = [] |
+ while s != 0: |
+ out.append(s & 0xff) |
+ s >>= 8 |
+ out.reverse() |
+ return '\x00' * (self.modlen - len(out)) + ToBytes(out) |
+ |
+ def ToDER(self): |
+ return ToDER(SEQUENCE([self.m, self.e])) |
+ |
+# The following functions and classes implement a trivial ASN.1 DER encoder. |
+ |
+def ToDER(obj): |
+ '''ToDER converts the given object into DER encoding''' |
+ if type(obj) == types.NoneType: |
+ # None turns into NULL |
+ return TagAndLength(5, 0) |
+ if type(obj) == types.StringType: |
+ # Strings are PRINTABLESTRING |
+ return TagAndLength(19, len(obj)) + obj |
+ if type(obj) == types.BooleanType: |
+ val = "\x00" |
+ if obj: |
+ val = "\xff" |
+ return TagAndLength(1, 1) + val |
+ if type(obj) == types.IntType or type(obj) == types.LongType: |
+ big_endian = [] |
+ val = obj |
+ while val != 0: |
+ big_endian.append(val & 0xff) |
+ val >>= 8 |
+ |
+ if len(big_endian) == 0 or big_endian[-1] >= 128: |
+ big_endian.append(0) |
+ |
+ big_endian.reverse() |
+ return TagAndLength(2, len(big_endian)) + ToBytes(big_endian) |
+ |
+ return obj.ToDER() |
+ |
+def ToBytes(array_of_bytes): |
+ '''ToBytes converts the array of byte values into a binary string''' |
+ return ''.join([chr(x) for x in array_of_bytes]) |
+ |
+def TagAndLength(tag, length): |
+ der = [tag] |
+ if length < 128: |
+ der.append(length) |
+ elif length < 256: |
+ der.append(0x81) |
+ der.append(length) |
+ elif length < 65535: |
+ der.append(0x82) |
+ der.append(length >> 8) |
+ der.append(length & 0xff) |
+ else: |
+ raise "too long" |
+ |
+ return ToBytes(der) |
+ |
+class Raw(object): |
+ '''Raw contains raw DER encoded bytes that are used verbatim''' |
+ def __init__(self, der): |
+ self.der = der |
+ |
+ def ToDER(self): |
+ return self.der |
+ |
+class Explicit(object): |
+ '''Explicit prepends an explicit tag''' |
+ def __init__(self, tag, child): |
+ self.tag = tag |
+ self.child = child |
+ |
+ def ToDER(self): |
+ der = ToDER(self.child) |
+ tag = self.tag |
+ tag |= 0x80 # content specific |
+ tag |= 0x20 # complex |
+ return TagAndLength(tag, len(der)) + der |
+ |
+class ENUMERATED(object): |
+ def __init__(self, value): |
+ self.value = value |
+ |
+ def ToDER(self): |
+ return TagAndLength(10, 1) + chr(self.value) |
+ |
+class SEQUENCE(object): |
+ def __init__(self, children): |
+ self.children = children |
+ |
+ def ToDER(self): |
+ der = ''.join([ToDER(x) for x in self.children]) |
+ return TagAndLength(0x30, len(der)) + der |
+ |
+class SET(object): |
+ def __init__(self, children): |
+ self.children = children |
+ |
+ def ToDER(self): |
+ der = ''.join([ToDER(x) for x in self.children]) |
+ return TagAndLength(0x31, len(der)) + der |
+ |
+class OCTETSTRING(object): |
+ def __init__(self, val): |
+ self.val = val |
+ |
+ def ToDER(self): |
+ return TagAndLength(4, len(self.val)) + self.val |
+ |
+class OID(object): |
+ def __init__(self, parts): |
+ self.parts = parts |
+ |
+ def ToDER(self): |
+ if len(self.parts) < 2 or self.parts[0] > 6 or self.parts[1] >= 40: |
+ raise "invalid OID" |
+ |
+ der = [self.parts[0]*40 + self.parts[1]] |
+ for x in self.parts[2:]: |
+ if x == 0: |
+ der.append(0) |
+ else: |
+ octets = [] |
+ while x != 0: |
+ v = x & 0x7f |
+ if len(octets) > 0: |
+ v |= 0x80 |
+ octets.append(v) |
+ x >>= 7 |
+ octets.reverse() |
+ der = der + octets |
+ |
+ return TagAndLength(6, len(der)) + ToBytes(der) |
+ |
+class UTCTime(object): |
+ def __init__(self, time_str): |
+ self.time_str = time_str |
+ |
+ def ToDER(self): |
+ return TagAndLength(23, len(self.time_str)) + self.time_str |
+ |
+class GeneralizedTime(object): |
+ def __init__(self, time_str): |
+ self.time_str = time_str |
+ |
+ def ToDER(self): |
+ return TagAndLength(24, len(self.time_str)) + self.time_str |
+ |
+class BitString(object): |
+ def __init__(self, bits): |
+ self.bits = bits |
+ |
+ def ToDER(self): |
+ return TagAndLength(3, 1 + len(self.bits)) + "\x00" + self.bits |
+ |
+def Name(cn = None, c = None, o = None): |
+ names = SEQUENCE([]) |
+ |
+ if cn is not None: |
+ names.children.append( |
+ SET([ |
+ SEQUENCE([ |
+ common_name, cn, |
+ ]) |
+ ]) |
+ ) |
+ |
+ if c is not None: |
+ names.children.append( |
+ SET([ |
+ SEQUENCE([ |
+ country, c, |
+ ]) |
+ ]) |
+ ) |
+ |
+ if o is not None: |
+ names.children.append( |
+ SET([ |
+ SEQUENCE([ |
+ organization, o, |
+ ]) |
+ ]) |
+ ) |
+ |
+ return names |
+ |
+# The private key and root certificate name are hard coded here: |
+ |
+# This is the private key |
+key = RSA(0x00a71998f2930bfe73d031a87f133d2f378eeeeed52a77e44d0fc9ff6f07ff32cbf3da999de4ed65832afcb0807f98787506539d258a0ce3c2c77967653099a9034a9b115a876c39a8c4e4ed4acd0c64095946fb39eeeb47a0704dbb018acf48c3a1c4b895fc409fb4a340a986b1afc45519ab9eca47c30185c771c64aa5ecf07d, |
+ 3, |
+ 0x6f6665f70cb2a9a28acbc5aa0cd374cfb49f49e371a542de0a86aa4a0554cc87f7e71113edf399021ca875aaffbafaf8aee268c3b15ded2c84fb9a4375bbc6011d841e57833bc6f998d25daf6fa7f166b233e3e54a4bae7a5aaaba21431324967d5ff3e1d4f413827994262115ca54396e7068d0afa7af787a5782bc7040e6d3) |
+ |
+# And the same thing in PEM format |
+key_pem = '''-----BEGIN RSA PRIVATE KEY----- |
+MIICXAIBAAKBgQCnGZjykwv+c9AxqH8TPS83ju7u1Sp35E0Pyf9vB/8yy/PamZ3k |
+7WWDKvywgH+YeHUGU50ligzjwsd5Z2UwmakDSpsRWodsOajE5O1KzQxkCVlG+znu |
+60egcE27AYrPSMOhxLiV/ECftKNAqYaxr8RVGaueykfDAYXHccZKpezwfQIBAwKB |
+gG9mZfcMsqmiisvFqgzTdM+0n0njcaVC3gqGqkoFVMyH9+cRE+3zmQIcqHWq/7r6 |
++K7iaMOxXe0shPuaQ3W7xgEdhB5XgzvG+ZjSXa9vp/FmsjPj5UpLrnpaqrohQxMk |
+ln1f8+HU9BOCeZQmIRXKVDlucGjQr6eveHpXgrxwQObTAkEA2wBAfuduw5G0/VfN |
+Wx66D5fbPccfYFqLM5LuTimLmNqzK2gIKXckB2sm44gJZ6wVlumaB1CSNug2LNYx |
+3cAjUwJBAMNUo1hbI8ugqqwI9kpxv9+2Heea4BlnXbS6tYF8pvkHMoliuxNbXmmB |
+u4zNB5iZ6V0ZZ4nvtUNo2cGr/h/Lcu8CQQCSACr/RPSCYSNTj948vya1D+d+hL+V |
+kbIiYfQ0G7Jl5yIc8AVw+hgE8hntBVuacrkPRmaviwwkms7IjsvpKsI3AkEAgjhs |
+5ZIX3RXHHVtO3EvVP86+mmdAEO+TzdHOVlMZ+1ohsOx8t5I+8QEnszNaZbvw6Lua |
+W/UjgkXmgR1UFTJMnwJBAKErmAw21/g3SST0a4wlyaGT/MbXL8Ouwnb5IOKQVe55 |
+CZdeVeSh6cJ4hAcQKfr2s1JaZTJFIBPGKAif5HqpydA= |
+-----END RSA PRIVATE KEY----- |
+''' |
+ |
+# Root certificate CN |
+issuer_cn = "Testing CA" |
+ |
+# All certificates are issued under this policy OID, in the Google arc: |
+certPolicyOID = OID([1, 3, 6, 1, 4, 1, 11129, 2, 4, 1]) |
+ |
+# These result in the following root certificate: |
+# -----BEGIN CERTIFICATE----- |
+# MIIB0TCCATqgAwIBAgIBATANBgkqhkiG9w0BAQUFADAVMRMwEQYDVQQDEwpUZXN0aW5nIENBMB4X |
+# DTEwMDEwMTA2MDAwMFoXDTMyMTIwMTA2MDAwMFowFTETMBEGA1UEAxMKVGVzdGluZyBDQTCBnTAN |
+# BgkqhkiG9w0BAQEFAAOBiwAwgYcCgYEApxmY8pML/nPQMah/Ez0vN47u7tUqd+RND8n/bwf/Msvz |
+# 2pmd5O1lgyr8sIB/mHh1BlOdJYoM48LHeWdlMJmpA0qbEVqHbDmoxOTtSs0MZAlZRvs57utHoHBN |
+# uwGKz0jDocS4lfxAn7SjQKmGsa/EVRmrnspHwwGFx3HGSqXs8H0CAQOjMzAxMBIGA1UdEwEB/wQI |
+# MAYBAf8CAQAwGwYDVR0gAQEABBEwDzANBgsrBgEEAdZ5AgHODzANBgkqhkiG9w0BAQUFAAOBgQA/ |
+# STb40A6D+93jMfLGQzXc997IsaJZdoPt7tYa8PqGJBL62EiTj+erd/H5pDZx/2/bcpOG4m9J56yg |
+# wOohbllw2TM+oeEd8syzV6X+1SIPnGI56JRrm3UXcHYx1Rq5loM9WKAiz/WmIWmskljsEQ7+542p |
+# q0pkHjs8nuXovSkUYA== |
+# -----END CERTIFICATE----- |
+ |
+# If you update any of the above, you can generate a new root with the |
+# following line: |
+# print DERToPEM(MakeCertificate(issuer_cn, issuer_cn, 1, key, key, None)) |
+ |
+# Various OIDs |
+ |
+aia_ocsp = OID([1, 3, 6, 1, 5, 5, 7, 48, 1]) |
+authority_information_access = OID([1, 3, 6, 1, 5, 5, 7, 1, 1]) |
+basic_constraints = OID([2, 5, 29, 19]) |
+cert_policies = OID([2, 5, 29, 32]) |
+common_name = OID([2, 5, 4, 3]) |
+country = OID([2, 5, 4, 6]) |
+hash_sha1 = OID([1, 3, 14, 3, 2, 26]) |
+ocsp_type_basic = OID([1, 3, 6, 1, 5, 5, 7, 48, 1, 1]) |
+organization = OID([2, 5, 4, 10]) |
+public_key_rsa = OID([1, 2, 840, 113549, 1, 1, 1]) |
+sha1_with_rsa_encryption = OID([1, 2, 840, 113549, 1, 1, 5]) |
+ |
+def MakeCertificate( |
+ issuer_cn, subject_cn, serial, pubkey, privkey, ocsp_url = None): |
+ '''MakeCertificate returns a DER encoded certificate, signed by privkey.''' |
+ extensions = SEQUENCE([]) |
+ |
+ # Default subject name fields |
+ c = "XX" |
+ o = "Testing Org" |
+ |
+ if issuer_cn == subject_cn: |
+ # Root certificate. |
+ c = None |
+ o = None |
+ extensions.children.append( |
+ SEQUENCE([ |
+ basic_constraints, |
+ True, |
+ OCTETSTRING(ToDER(SEQUENCE([ |
+ True, # IsCA |
+ 0, # Path len |
+ ]))), |
+ ])) |
+ |
+ if ocsp_url is not None: |
+ extensions.children.append( |
+ SEQUENCE([ |
+ authority_information_access, |
+ False, |
+ OCTETSTRING(ToDER(SEQUENCE([ |
+ SEQUENCE([ |
+ aia_ocsp, |
+ Raw(TagAndLength(0x86, len(ocsp_url)) + ocsp_url), |
+ ]), |
+ ]))), |
+ ])) |
+ |
+ extensions.children.append( |
+ SEQUENCE([ |
+ cert_policies, |
+ False, |
+ OCTETSTRING(ToDER(SEQUENCE([ |
+ SEQUENCE([ # PolicyInformation |
+ certPolicyOID, |
+ ]), |
+ ]))), |
+ ]) |
+ ) |
+ |
+ tbsCert = ToDER(SEQUENCE([ |
+ Explicit(0, 2), # Version |
+ serial, |
+ SEQUENCE([sha1_with_rsa_encryption, None]), # SignatureAlgorithm |
+ Name(cn = issuer_cn), # Issuer |
+ SEQUENCE([ # Validity |
+ UTCTime("100101060000Z"), # NotBefore |
+ UTCTime("321201060000Z"), # NotAfter |
+ ]), |
+ Name(cn = subject_cn, c = c, o = o), # Subject |
+ SEQUENCE([ # SubjectPublicKeyInfo |
+ SEQUENCE([ # Algorithm |
+ public_key_rsa, |
+ None, |
+ ]), |
+ BitString(ToDER(key)), |
+ ]), |
+ Explicit(3, extensions), |
+ ])) |
+ |
+ return ToDER(SEQUENCE([ |
+ Raw(tbsCert), |
+ SEQUENCE([ |
+ sha1_with_rsa_encryption, |
+ None, |
+ ]), |
+ BitString(key.Sign(tbsCert)), |
+ ])) |
+ |
+def MakeOCSPResponse(issuer_cn, issuer_key, serial, revoked): |
+ # https://tools.ietf.org/html/rfc2560 |
+ issuer_name_hash = OCTETSTRING( |
+ hashlib.sha1(ToDER(Name(cn = issuer_cn))).digest()) |
+ |
+ issuer_key_hash = OCTETSTRING(hashlib.sha1(ToDER(issuer_key)).digest()) |
+ |
+ cert_status = None |
+ if revoked: |
+ cert_status = Explicit(1, GeneralizedTime("20100101060000Z")) |
+ else: |
+ cert_status = Raw(TagAndLength(0x80 | 0, 0)) |
+ |
+ basic_resp_data_der = ToDER(SEQUENCE([ |
+ Explicit(2, issuer_key_hash), |
+ GeneralizedTime("20100101060000Z"), # producedAt |
+ SEQUENCE([ |
+ SEQUENCE([ # SingleResponse |
+ SEQUENCE([ # CertID |
+ SEQUENCE([ # hashAlgorithm |
+ hash_sha1, |
+ None, |
+ ]), |
+ issuer_name_hash, |
+ issuer_key_hash, |
+ serial, |
+ ]), |
+ cert_status, |
+ GeneralizedTime("20100101060000Z"), # thisUpdate |
+ Explicit(0, GeneralizedTime("20300101060000Z")), # nextUpdate |
+ ]), |
+ ]), |
+ ])) |
+ |
+ basic_resp = SEQUENCE([ |
+ Raw(basic_resp_data_der), |
+ SEQUENCE([ |
+ sha1_with_rsa_encryption, |
+ None, |
+ ]), |
+ BitString(key.Sign(basic_resp_data_der)), |
+ ]) |
+ |
+ resp = SEQUENCE([ |
+ ENUMERATED(0), |
+ Explicit(0, SEQUENCE([ |
+ ocsp_type_basic, |
+ OCTETSTRING(ToDER(basic_resp)), |
+ ])) |
+ ]) |
+ |
+ return ToDER(resp) |
+ |
+def DERToPEM(der): |
+ pem = '-----BEGIN CERTIFICATE-----\n' |
+ pem += der.encode('base64') |
+ pem += '-----END CERTIFICATE-----\n' |
+ return pem |
+ |
+def GenerateCertKeyAndOCSP(subject = "127.0.0.1", |
+ ocsp_url = "http://127.0.0.1", |
+ ocsp_revoked = False): |
+ '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: |
+ * cert_and_key_pem contains a certificate and private key in PEM format |
+ with the given subject common name and OCSP URL. |
+ * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is |
+ None''' |
+ |
+ serial = RandomNumber(16) |
+ cert_der = MakeCertificate(issuer_cn, subject, serial, key, key, ocsp_url) |
+ cert_pem = DERToPEM(cert_der) |
+ |
+ ocsp_der = None |
+ if ocsp_url is not None: |
+ ocsp_der = MakeOCSPResponse(issuer_cn, key, serial, ocsp_revoked) |
+ |
+ return (cert_pem + key_pem, ocsp_der) |