Skip to content
Snippets Groups Projects
certificateValidation.py 7.18 KiB
Newer Older
# SPDX-FileCopyrightText: 2022 FIT-Connect contributors
#
# SPDX-License-Identifier: EUPL-1.2
from OpenSSL import crypto as openssl_crypto
from cryptography.hazmat.primitives import hashes
from cryptography import x509

# list of directories which contain trusted root CA certificates
TRUSTED_ROOT_CAS = {
    "prod": "trusted-root-certificates/v-pki-prod",
    "test": "trusted-root-certificates/v-pki-test",
    "unit-test": "trusted-root-certificates/unit-tests",
posterwinter's avatar
posterwinter committed
REQUIRED_KEY_LENGTH = 4096


def check_key_length(certificate):
    if certificate.get_pubkey().bits() < REQUIRED_KEY_LENGTH:
        return False
    return True


def check_cert_key_usage(cert):
    """check the key usages from the certificates allow for this"""
    found_ext = False
    allow_signing = False
    allow_encryption = False

    for i in range(0, cert.get_extension_count()):
        if cert.get_extension(i).get_short_name() == b"keyUsage":
            found_ext = True
            usage = str(cert.get_extension(i))
            print("Info: KeyUsage:", usage)
            if "Digital Signature" in usage:
                allow_signing = True
            if "Key Encipherment" in usage:
                allow_encryption = True

    if not found_ext:
        print("Error: KeyUsage Extension not found!")
        return False

    if not allow_signing:
        print("Error: KeyUsage Digital Signature is missing!")
        return False

    if not allow_encryption:
        print("Error: KeyUsage Key Encipherment is missing!")
        return False

    return True


def x5c_to_cert(x5c):
    # RFC coded certificates
    # check that the delimiters: "------BEGIN CERTIFICATE------" exist in the string.
    if "-----BEGIN CERTIFICATE-----" not in x5c:
        x5c = "-----BEGIN CERTIFICATE-----\n" + x5c + "\n-----END CERTIFICATE-----"
    return x509.load_pem_x509_certificate(x5c.encode())


def validate_jwk_x5c_chain(jwks, base_cert):
    # the generated jwks have to contain the entire certificate chain
    # also, the leaf certificate in the x5c field has to be the deposited certificate!
    for tmp_jwk in jwks:
        is_leaf_certificate = True
        for x5c_cert in tmp_jwk.get("x5c"):
            cert = x5c_to_cert(x5c_cert)
            if is_leaf_certificate:
                # the leaf certificate in the x5c chain has to be the certificate with the key!
                is_leaf_certificate = False
                issuer = cert.issuer
                if cert != base_cert:
                    print('Error! Leaf certificate in jwk("x5c") field is incorrect.')
                    return False
            else:
                # each subsequent certificate has to be the one used to certify the previous
                if issuer != cert.subject:
                    print(
                        "Error, each subsequent certificate in jwk x5c chain "
                        "has to be the one issuing the previous certificate."
                    )
                    print(
                        f"Issuer {issuer.rfc4514_string()} and subject {cert.subject.rfc4514_string()} don't match!"
                    )
                    return False
                else:
                    issuer = cert.issuer
    return True


def verify_certificate_algorithms(certificate):
    x509_certificate = certificate.to_cryptography()
    # TODO - VPKI hat falsche algorithmen
    if not x509_certificate.signature_hash_algorithm.__class__ == hashes.SHA512:
        print(
            "Certificate is using hash algorithm {} but needs {}".format(
                x509_certificate.signature_hash_algorithm, hashes.SHA512
    # base_certificate = openssl_crypto.X509.from_cryptography(base_certificate)
    if certificate.get_signature_algorithm() != b"sha512WithRSAEncryption":
        print(
            "Certificate is using signature algorithm {} but needs {}".format(
                str(certificate.get_signature_algorithm()), "RSASSA-PSS"
            )
        )
        return False

    return True


def check_jwk_key_length(jwks):
    for tmp_jwk in jwks:
        b64encoded_string = tmp_jwk.get("n")
        # fill padding if necessary
        b64encoded_string += "=" * ((4 - len(b64encoded_string) % 4) % 4)
        key_length = len(b64encoded_string) * 8
        if key_length < 4096:
            print(
                "JWK with id {} has wrong key length. Is {} but should be 4096 at least.".format(
                    tmp_jwk.get("kid"), len(b64encoded_string)
                )
            )
            return False
    return True


def verify_certificate_chain(cert, certificate_chain, environment):
    """Verify that certificates are from DOI CA and verify certificate chain"""

    # create list of file paths to trusted CA certificates in PEM format
    trusted_root_certs = [
        os.path.join(TRUSTED_ROOT_CAS[environment], filename)
        for filename in os.listdir(TRUSTED_ROOT_CAS[environment])
    ]

    try:
        # Create a certificate store and add trusted certs
        store = openssl_crypto.X509Store()

        # Add root certs from trusted_root_certs to certificate store
        for root_cert_path in trusted_root_certs:

            # skip all files other than pem files (e.g. .license files)
            extension = os.path.splitext(root_cert_path)[1]
            if not extension or extension != ".pem":
                continue

            # read certificate
            root_cert_file = open(root_cert_path, "r")
            root_cert_data = root_cert_file.read()
            root_cert = openssl_crypto.load_certificate(
                openssl_crypto.FILETYPE_PEM, root_cert_data
            )

            # Add root certificate to X509Store
            # WARNING: only add root certificates here!
            # `add_cert()` will treat the certificate as ultimately trusted!
            # see https://duo.com/labs/research/chain-of-fools#section2
            store.add_cert(root_cert)

        # Create a certificate context using the store, the certificate and the certificate chain to verify
        store_ctx = openssl_crypto.X509StoreContext(
            store, cert, chain=certificate_chain
        )

        # Verify the certificate, returns None if the certificate cannot be
        # validated.
        # WARNING: `verify_certificate()` will succeed if any of the
        # certificates added to the X509Store signed the leaf certificate, even
        # if the root didn't sign the intermediate!
        if store_ctx.verify_certificate() is None:
            return True

    except Exception as e:
        print("Error", e)
        print("Certificate chain: ")
        for cert in certificate_chain:
            print(get_certificate_issuer_and_subject_string(cert))
        return False

    # `store_ctx.verify_certificate()` will either return None or raise an exception
    raise RuntimeError("Unexpected Error: This code should never be reached")


def get_certificate_issuer_and_subject_string(cert):
    subject = str(cert.get_subject()).replace("<X509Name object '", "Subject: ")
    subject = "Subject: " + subject.replace("'>", " ")
    issuer = str(cert.get_issuer()).replace("<X509Name object '", "Subject: ")
    issuer = "Issuer: " + issuer.replace("'>", "")
    return subject + issuer