python中使用openssl构建root证书,CSR,并验证

发布于:2025-03-26 ⋅ 阅读:(26) ⋅ 点赞:(0)

构建证书

# 生成证书
def create_certification(private_path,public_path,ca_path,password=None):
    if (not os.path.exists(private_path)) \
        or (not os.path.exists(public_path)) \
            or (not os.path.exists(ca_path)):
        print("秘钥文件不存在,创建秘钥")
        root_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=3072,
            backend=default_backend())
        print("生成秘钥对成功")
        with open(private_path,"wb") as fp:
            fp.write(root_key.private_bytes(
                serialization.Encoding.PEM,
                serialization.PrivateFormat.TraditionalOpenSSL,
                serialization.NoEncryption() if password == None 
                else serialization.BestAvailableEncryption(password)))
        print("保存私钥成功")
        with open(public_path,"wb") as fp:
            fp.write(root_key.public_key().public_bytes(
                serialization.Encoding.PEM,
                serialization.PublicFormat.SubjectPublicKeyInfo))
        print("保存公钥成功")
        
        # 创建CA证书
        root_public_key = root_key.public_key()
        builder = x509.CertificateBuilder()
        builder = builder.subject_name(x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, "My Root CA"),
            x509.NameAttribute(NameOID.COMMON_NAME, "My Root CA"),
        ]))
        builder = builder.issuer_name(x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, "My Root CA"),
            x509.NameAttribute(NameOID.COMMON_NAME, "My Root CA"),
        ]))
        builder = builder.not_valid_before(datetime.now(timezone.utc))
        builder = builder.not_valid_after(datetime.now(timezone.utc) + timedelta(days=3650))
        builder = builder.serial_number(x509.random_serial_number())
        builder = builder.public_key(root_public_key)
        builder = builder.add_extension(
            x509.BasicConstraints(ca=True, path_length=None),
            critical=True,
        )
        root_certificate = builder.sign(
            private_key=root_key,
            algorithm=hashes.SHA256(),
            backend=default_backend()
        )
        with open(ca_path, "wb") as f:
            f.write(root_certificate.public_bytes(serialization.Encoding.PEM))  
        print("保存CA证书成功")

加载证书

def load_certification(private_path,public_path,ca_path,password=None):
    private_key = None
    with open(private_path, "rb") as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=password,
            backend=default_backend()
        )
    print("加载私钥成功")
    public_key = None
    with open(public_path, "rb") as key_file:
        public_key = serialization.load_pem_public_key(
            key_file.read(),
            backend=default_backend()
        )
    print("加载公钥成功")
    ca_key = None
    with open(ca_path,"rb") as key_file:
        ca_key = x509.load_pem_x509_certificate(key_file.read(),default_backend())
    print("加载CA证书成功")
    return private_key,public_key,ca_key

创建CSR

def create_csr(private_key):
    csr = None
    if not os.path.exists(SUB_CSR_PATH):
        # 创建CSR
        csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Beijing"),
            x509.NameAttribute(NameOID.LOCALITY_NAME, "Beijing"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, "My Company"),
            x509.NameAttribute(NameOID.COMMON_NAME, "mysite.com"),
        ])).sign(private_key, hashes.SHA256(), default_backend())
        print("创建CSR成功")
        with open(SUB_CSR_PATH,"wb") as fp:
            fp.write(csr.public_bytes(serialization.Encoding.PEM))
        print("保存CSR成功")
    else:
        with open(SUB_CSR_PATH,"rb") as fp:
            csr = x509.load_pem_x509_csr(fp.read(),default_backend())
        print("加载CSR成功")
    return csr

签名CSR

def sign_certification(root_certificate,csr):
    signed_cert = None
    if not os.path.exists(SUB_SIGN_PATH):
        # 使用根证书签名CSR
        cert_builder = x509.CertificateBuilder()
        cert_builder = cert_builder.subject_name(csr.subject)
        cert_builder = cert_builder.issuer_name(root_certificate.subject)
        cert_builder = cert_builder.public_key(csr.public_key())
        cert_builder = cert_builder.serial_number(x509.random_serial_number())
        cert_builder = cert_builder.not_valid_before(datetime.now(timezone.utc))
        cert_builder = cert_builder.not_valid_after(datetime.now(timezone.utc) + timedelta(days=365))
        cert_builder = cert_builder.add_extension(
            x509.BasicConstraints(ca=False, path_length=None),
            critical=True,
        )
        # 签名证书
        signed_cert = cert_builder.sign(
            private_key=root_private_key,
            algorithm=hashes.SHA256(),
            backend=default_backend()
        )
        print("根证书签名CSR成功")
        # 保存签名后的证书
        with open(SUB_SIGN_PATH, "wb") as f:
            f.write(signed_cert.public_bytes(serialization.Encoding.PEM))
        print("签名CSR保存成功")  
    else:
        with open(SUB_SIGN_PATH,"rb") as fp:
            signed_cert = x509.load_pem_x509_certificate(fp.read(),default_backend())
        print("加载签名CSR成功") 
    return signed_cert

验证签名

def verify_cerfication(root_ca_key,signed_cert):
        # 验证证书链
    try:
        # 检查证书是否由根证书签名
        public_key = root_ca_key.public_key()
        public_key.verify(
            signed_cert.signature,
            signed_cert.tbs_certificate_bytes,
            padding.PKCS1v15(),
            signed_cert.signature_hash_algorithm
        )
        print("证书签名验证成功")
        
        # 检查证书有效期
        now = datetime.now(timezone.utc)
        not_before = signed_cert.not_valid_before_utc
        not_after = signed_cert.not_valid_after_utc
        print(f"证书有效期: {not_before}{not_after}")
        if not_before <= now <= not_after:
            print("证书在有效期内")
        else:
            print("证书已过期或尚未生效")
            
        # 检查主题名称
        common_name = signed_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
        print(f"证书主题: {common_name}")
        
    except Exception as e:
        print(f"验证失败: {str(e)}")

流程代码

from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
import os
from cryptography import x509
from cryptography.x509.oid import NameOID
from datetime import datetime, timedelta,timezone
from cryptography.hazmat.primitives.asymmetric import padding

ROOT_PRIVATE_KEY_PATH = "./certifications/root_private.pem"
ROOT_PUBLIC_KEY_PATH = "./certifications/root_public.pem"
ROOT_CA_PATH = "./certifications/root_ca.pem"
ROOT_PASSWORD = b"123456"

SUB_PRIVATE_KEY_PATH = "./certifications/sub_private.pem"
SUB_PUBLIC_KEY_PATH = "./certifications/sub_public.pem"
SUB_CA_PATH = "./certifications/sub_ca.pem"

SUB_CSR_PATH = "./certifications/sub_csr.pem"
SUB_SIGN_PATH = "./certifications/sign_csr.pem"

if __name__ == "__main__":
    print("hello certification")
    print("--- root ---")
    create_certification(ROOT_PRIVATE_KEY_PATH,ROOT_PUBLIC_KEY_PATH,
                         ROOT_CA_PATH,ROOT_PASSWORD)
    root_private_key,root_public_key,root_ca_key = load_certification(
        ROOT_PRIVATE_KEY_PATH,ROOT_PUBLIC_KEY_PATH,ROOT_CA_PATH,ROOT_PASSWORD)
    print("--- sub ---")
    create_certification(SUB_PRIVATE_KEY_PATH,SUB_PUBLIC_KEY_PATH,SUB_CA_PATH)
    sub_private_key,sub_public_key,sub_ca_key = load_certification(
        SUB_PRIVATE_KEY_PATH,SUB_PUBLIC_KEY_PATH,SUB_CA_PATH)
    print("--- create csr ---")
    csr = create_csr(sub_private_key)
    print("--- sign csr ---")
    signed_cert = sign_certification(root_ca_key,csr)
    print("--- verify csr ---")
    verify_cerfication(root_ca_key,signed_cert)