#!/usr/bin/env python3 # SPDX-FileCopyrightText: 2022 FIT-Connect contributors # # SPDX-License-Identifier: EUPL-1.2 import argparse import json import os.path import pathlib import random import sys import os import certificateValidation as verify from OpenSSL import crypto as openssl_crypto from cryptography.hazmat.primitives.serialization import pkcs12 from jwcrypto import jwk # Custom exceptions for pytest checks. class KeyLengthError(Exception): pass class CertificateChainError(Exception): pass class KeyUsageError(Exception): pass class CertificateAlgorithmError(Exception): pass def read_password(): pkcs12_pass_from_env = os.getenv("PKCS12_CONTAINER_PASS") if pkcs12_pass_from_env is not None: return pkcs12_pass_from_env return input("Please enter the password for PKCS12 keystore: ") def read_pkcs12(path, environment): """Read p12 file""" # Print error in case the file type is incorrect or faulty extension = os.path.splitext(path)[1] allowed_extensions = [".p12", ".pfx"] if not extension or extension not in allowed_extensions: exit_msg = ( "Error: Invalid PKCS12 file extension '{}'. Valid ones are: {}".format( extension, ", ".join(allowed_extensions) ) ) sys.exit(exit_msg) try: f = open(path, "rb") except FileNotFoundError: sys.exit("Error: No such file. Exiting program.") try: ( private_key, certificate, additional_certificates, ) = pkcs12.load_key_and_certificates(f.read(), read_password().encode()) # convert `cryptography` private key and certificates to `pyOpenSSL` objects private_key_pyopenssl = openssl_crypto.PKey.from_cryptography_key(private_key) certificate_pyopenssl = openssl_crypto.X509.from_cryptography(certificate) additional_certificates_pyopenssl = [ openssl_crypto.X509.from_cryptography(cert) for cert in additional_certificates ] # validate certificate chain if not verify.verify_certificate_chain( certificate_pyopenssl, additional_certificates_pyopenssl, environment ): raise CertificateChainError( "Unable to verify certificate chain.\n" "Did you select the right Environment?" ) # verify key usage if not verify.check_cert_key_usage(certificate_pyopenssl): raise KeyUsageError("Certificate keyUsage could not be verified!") return ( private_key_pyopenssl, certificate_pyopenssl, additional_certificates_pyopenssl, ) except ValueError as e: sys.exit("Error: " + str(e)) except Exception as e: sys.exit("Error:" + str(e)) def cert_to_x5c(cert): # export certificate as ASN1 (DER) cert_asn1 = openssl_crypto.dump_certificate(openssl_crypto.FILETYPE_PEM, cert) cert_asn1_base64 = ( cert_asn1.replace(b"-----BEGIN CERTIFICATE-----\n", b"") .replace(b"\n-----END CERTIFICATE-----\n", b"") .replace(b"\n", b"") ) cert_asn1_base64_str = cert_asn1_base64.decode("UTF-8") return cert_asn1_base64_str def write_jwk_files(private_key, certificate, chain, output_dir, overwrite_files): """create 4 JWK's from the p12 file - JWK with publicKey and key_ops "wrapKey" for encryption - JWK with privateKey and key_ops "unwrapKey" for encryption - JWK with publicKey and key_ops "verify" for signature creation - JWK with publicKey and key_ops "sign" for signature validation """ # check that file path exists if not output_dir.is_dir(): sys.exit("Error: Output directory does not exist") # check if files already exist publicKey_wrapkey_file = pathlib.Path(output_dir, "publicKey_encryption.jwk.json") publicKey_verify_file = pathlib.Path( output_dir, "publicKey_signature_verification.jwk.json" ) privateKey_unwrapkey_file = pathlib.Path( output_dir, "privateKey_decryption.jwk.json" ) privateKey_sign_file = pathlib.Path(output_dir, "privateKey_signing.jwk.json") if not overwrite_files and ( os.path.isfile(publicKey_wrapkey_file) or os.path.isfile(publicKey_verify_file) or os.path.isfile(privateKey_unwrapkey_file) or os.path.isfile(privateKey_sign_file) ): sys.exit( "Error: File already exists. If this should be overwritten try the -f flag." ) # verify key length if not verify.check_key_length(certificate): raise KeyLengthError("Certificate key to weak. Should be at least 4096 bit!") # verify signature and hash algorithms if not verify.verify_certificate_algorithms(certificate): raise CertificateAlgorithmError( "Certificate Signature or Hash algorithm could not be verified!" ) # export certificate and private key as PEM certificate_pem = openssl_crypto.dump_certificate( openssl_crypto.FILETYPE_PEM, certificate ) private_key_pem = openssl_crypto.dump_privatekey( openssl_crypto.FILETYPE_PEM, private_key ) # build x5c array # see https://datatracker.ietf.org/doc/html/rfc7517#section-4.7 x5c = [cert_to_x5c(certificate)] for chain_cert in chain: x5c.append(cert_to_x5c(chain_cert)) # create encryption JWK's # derive public key (encryption) jwk_wrapKey = jwk.JWK.from_pem(certificate_pem) jwk_wrapKey.setdefault("alg", "RSA-OAEP-256") jwk_wrapKey.setdefault("x5c", x5c) jwk_wrapKey.setdefault("key_ops", ["wrapKey"]) # derive private key (decryption) jwk_unwrapKey = jwk.JWK.from_pem(private_key_pem) jwk_unwrapKey.setdefault("alg", "RSA-OAEP-256") jwk_unwrapKey.setdefault("x5c", x5c) jwk_unwrapKey.setdefault("key_ops", ["unwrapKey"]) # create signature JWK's # derive public key (signature verification) jwk_verify = jwk.JWK.from_pem(certificate_pem) jwk_verify.setdefault("alg", "PS512") jwk_verify.setdefault("x5c", x5c) jwk_verify.setdefault("key_ops", ["verify"]) # derive private key (signing) jwk_sign = jwk.JWK.from_pem(private_key_pem) jwk_sign.setdefault("alg", "PS512") jwk_sign.setdefault("x5c", x5c) jwk_sign.setdefault("key_ops", ["sign"]) # validate key length if not verify.check_jwk_key_length( [jwk_wrapKey, jwk_verify, jwk_unwrapKey, jwk_sign] ): sys.exit("Jwk key length could not be verified!") # validate that the jwk contains the entire chain of certificates if not verify.validate_jwk_x5c_chain( {jwk_wrapKey, jwk_verify, jwk_unwrapKey, jwk_sign}, certificate.to_cryptography(), ): sys.exit("Jwk contain incorrect x5c certificate chain.") # write public keys to file with open(publicKey_wrapkey_file, "wb") as f: exp = jwk_wrapKey.export(private_key=False) tmp = json.loads(exp) kid = tmp["kid"] + "-wrapKey" tmp["kid"] = kid f.write(json.dumps(tmp).encode("UTF-8")) with open(publicKey_verify_file, "wb") as f: exp = jwk_verify.export(private_key=False) tmp = json.loads(exp) kid = tmp["kid"] + "-verify" tmp["kid"] = kid f.write(json.dumps(tmp).encode("UTF-8")) # write private keys to file with open(privateKey_unwrapkey_file, "wb") as f: exp = jwk_unwrapKey.export(private_key=True) tmp = json.loads(exp) kid = tmp["kid"] + "-wrapKey" tmp["kid"] = kid f.write(json.dumps(tmp).encode("UTF-8")) with open(privateKey_sign_file, "wb") as f: exp = jwk_sign.export(private_key=True) tmp = json.loads(exp) kid = tmp["kid"] + "-verify" tmp["kid"] = kid f.write(json.dumps(tmp).encode("UTF-8")) print( f"🔒 Wrote JWK representation of encryption public key (key_use=wrapKey) to {publicKey_wrapkey_file}" ) print( f"🔒 Wrote JWK representation of signature validation public key (key_use=verify) to {publicKey_verify_file}" ) print( "Please upload these keys when creating a destination in the self service portal." ) print() print( f"🔒 Wrote JWK representation of decryption private key (key_use=unwrapKey) to {privateKey_unwrapkey_file}" ) print( f"🔒 Wrote JWK representation of signing private key (key_use=sign) to {privateKey_sign_file}" ) print("These keys can be used to sign and decrypt in your client application.") # TESTING # instead of passing a p12 keystore - create a self generated keypair for testing def create_self_signed_cert(): print("Creating Self Signed certificate..") print("This should only be used for testing purposes!") # create key pair keypair = openssl_crypto.PKey() keypair.generate_key(openssl_crypto.TYPE_RSA, 4096) # create self-signed cert cert = openssl_crypto.X509() cert.get_subject().C = "DE" cert.get_subject().O = "Testbehoerde" cert.get_subject().CN = "FIT Connect Testzertifikat" cert.set_serial_number(random.randint(50000000, 100000000)) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) cert.set_issuer(cert.get_subject()) cert.set_pubkey(keypair) cert.sign(keypair, "sha512") return keypair, cert if __name__ == "__main__": parser = argparse.ArgumentParser( description="Generate JWKs from PKCS12 (p12) file." ) parser.add_argument( "-i", "--input", help="The pkcs12 file that will be used to generate JWKs.", required=True, type=pathlib.Path, ) # optional input parameter specifying location for key storage # default location is the working directory parser.add_argument( "-o", "--output", default="", help="Directory to store the generated JWKs. Defaults to the working directory", type=pathlib.Path, ) # the tool does not override any files. # this behavior can be overwritten with the -f / --force input flag parser.add_argument( "-f", "--force", action="store_true", help="Overwrite existing JWK files. Default: disabled", ) parser.add_argument( "-e", "--environment", choices=verify.TRUSTED_ROOT_CAS.keys(), help="Select the PKI environment from which the provided p12 file originates (test or production)", required=True, ) args = parser.parse_args() if args.force: print("Warning: Existing JWK files at location will be overwritten!") if str(args.input) == "debug": private_key, certificate = create_self_signed_cert() certificate_chain = [] else: ( private_key, certificate, certificate_chain, ) = read_pkcs12(args.input, args.environment) # TODO: check hash and signature algorithms of all certificates in certificate chain write_jwk_files( private_key, certificate, certificate_chain, args.output, args.force )