# SPDX-FileCopyrightText: 2022 FIT-Connect contributors # # SPDX-License-Identifier: EUPL-1.2 import os from OpenSSL import crypto as openssl_crypto from cryptography.hazmat.primitives import hashes from cryptography import x509 # list of directories which contain trusted root CA certificates TRUSTED_ROOT_CAS = { "prod": "trusted-root-certificates/v-pki-prod", "test": "trusted-root-certificates/v-pki-test", "unit-test": "trusted-root-certificates/unit-tests", } REQUIRED_KEY_LENGTH = 4096 def check_key_length(certificate): if certificate.get_pubkey().bits() < REQUIRED_KEY_LENGTH: return False return True def check_cert_key_usage(cert): """check the key usages from the certificates allow for this""" found_ext = False allow_signing = False allow_encryption = False for i in range(0, cert.get_extension_count()): if cert.get_extension(i).get_short_name() == b"keyUsage": found_ext = True usage = str(cert.get_extension(i)) print("Info: KeyUsage:", usage) if "Digital Signature" in usage: allow_signing = True if "Key Encipherment" in usage: allow_encryption = True if not found_ext: print("Error: KeyUsage Extension not found!") return False if not allow_signing: print("Error: KeyUsage Digital Signature is missing!") return False if not allow_encryption: print("Error: KeyUsage Key Encipherment is missing!") return False return True def x5c_to_cert(x5c): # RFC coded certificates # check that the delimiters: "------BEGIN CERTIFICATE------" exist in the string. if "-----BEGIN CERTIFICATE-----" not in x5c: x5c = "-----BEGIN CERTIFICATE-----\n" + x5c + "\n-----END CERTIFICATE-----" return x509.load_pem_x509_certificate(x5c.encode()) def validate_jwk_x5c_chain(jwks, base_cert): # the generated jwks have to contain the entire certificate chain # also, the leaf certificate in the x5c field has to be the deposited certificate! for tmp_jwk in jwks: is_leaf_certificate = True for x5c_cert in tmp_jwk.get("x5c"): cert = x5c_to_cert(x5c_cert) if is_leaf_certificate: # the leaf certificate in the x5c chain has to be the certificate with the key! is_leaf_certificate = False issuer = cert.issuer if cert != base_cert: print('Error! Leaf certificate in jwk("x5c") field is incorrect.') return False else: # each subsequent certificate has to be the one used to certify the previous if issuer != cert.subject: print( "Error, each subsequent certificate in jwk x5c chain " "has to be the one issuing the previous certificate." ) print( f"Issuer {issuer.rfc4514_string()} and subject {cert.subject.rfc4514_string()} don't match!" ) return False else: issuer = cert.issuer return True def verify_certificate_algorithms(certificate): x509_certificate = certificate.to_cryptography() # TODO - VPKI hat falsche algorithmen return True if not x509_certificate.signature_hash_algorithm.__class__ == hashes.SHA512: print( "Certificate is using hash algorithm {} but needs {}".format( x509_certificate.signature_hash_algorithm, hashes.SHA512 ) ) return False # base_certificate = openssl_crypto.X509.from_cryptography(base_certificate) if certificate.get_signature_algorithm() != b"sha512WithRSAEncryption": print( "Certificate is using signature algorithm {} but needs {}".format( str(certificate.get_signature_algorithm()), "RSASSA-PSS" ) ) return False return True def check_jwk_key_length(jwks): for tmp_jwk in jwks: b64encoded_string = tmp_jwk.get("n") # fill padding if necessary b64encoded_string += "=" * ((4 - len(b64encoded_string) % 4) % 4) key_length = len(b64encoded_string) * 8 if key_length < 4096: print( "JWK with id {} has wrong key length. Is {} but should be 4096 at least.".format( tmp_jwk.get("kid"), len(b64encoded_string) ) ) return False return True def verify_certificate_chain(cert, certificate_chain, environment): """Verify that certificates are from DOI CA and verify certificate chain""" # create list of file paths to trusted CA certificates in PEM format trusted_root_certs = [ os.path.join(TRUSTED_ROOT_CAS[environment], filename) for filename in os.listdir(TRUSTED_ROOT_CAS[environment]) ] try: # Create a certificate store and add trusted certs store = openssl_crypto.X509Store() # Add root certs from trusted_root_certs to certificate store for root_cert_path in trusted_root_certs: # skip all files other than pem files (e.g. .license files) extension = os.path.splitext(root_cert_path)[1] if not extension or extension != ".pem": continue # read certificate root_cert_file = open(root_cert_path, "r") root_cert_data = root_cert_file.read() root_cert = openssl_crypto.load_certificate( openssl_crypto.FILETYPE_PEM, root_cert_data ) # Add root certificate to X509Store # WARNING: only add root certificates here! # `add_cert()` will treat the certificate as ultimately trusted! # see https://duo.com/labs/research/chain-of-fools#section2 store.add_cert(root_cert) # Create a certificate context using the store, the certificate and the certificate chain to verify store_ctx = openssl_crypto.X509StoreContext( store, cert, chain=certificate_chain ) # Verify the certificate, returns None if the certificate cannot be # validated. # WARNING: `verify_certificate()` will succeed if any of the # certificates added to the X509Store signed the leaf certificate, even # if the root didn't sign the intermediate! if store_ctx.verify_certificate() is None: return True except Exception as e: print("Error", e) print("Certificate chain: ") for cert in certificate_chain: print(get_certificate_issuer_and_subject_string(cert)) return False # `store_ctx.verify_certificate()` will either return None or raise an exception raise RuntimeError("Unexpected Error: This code should never be reached") def get_certificate_issuer_and_subject_string(cert): subject = str(cert.get_subject()).replace("<X509Name object '", "Subject: ") subject = "Subject: " + subject.replace("'>", " ") issuer = str(cert.get_issuer()).replace("<X509Name object '", "Subject: ") issuer = "Issuer: " + issuer.replace("'>", "") return subject + issuer