Skip to content
Snippets Groups Projects
pkcs12ToJwk.py 11.6 KiB
Newer Older
Pascal Osterwinter's avatar
Pascal Osterwinter committed
#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2022 FIT-Connect contributors
#
# SPDX-License-Identifier: EUPL-1.2

import argparse
import json
import os.path
import pathlib
import random
import sys
import certificateValidation as verify
Pascal Osterwinter's avatar
Pascal Osterwinter committed
from OpenSSL import crypto as openssl_crypto
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.x509.oid import NameOID
# Custom exceptions for pytest checks.
class KeyLengthError(Exception):
    pass


class CertificateChainError(Exception):
    pass


class KeyUsageError(Exception):
    pass


class CertificateAlgorithmError(Exception):
    pass


def read_password():
    pkcs12_pass_from_env = os.getenv("PKCS12_CONTAINER_PASS")
    if pkcs12_pass_from_env is not None:
        return pkcs12_pass_from_env
    return input("Please enter the password for PKCS12 keystore: ")
def read_pkcs12(path, environment):
Pascal Osterwinter's avatar
Pascal Osterwinter committed
    """Read p12 file"""

    # Print error in case the file type is incorrect or faulty
    extension = os.path.splitext(path)[1]
    allowed_extensions = [".p12", ".pfx"]
    if not extension or extension not in allowed_extensions:
        exit_msg = (
            "Error: Invalid PKCS12 file extension '{}'. Valid ones are: {}".format(
                extension, ", ".join(allowed_extensions)
            )
        )
        sys.exit(exit_msg)

    try:
        f = open(path, "rb")
    except FileNotFoundError:
        sys.exit("Error: No such file. Exiting program.")

    try:
        (
            private_key,
            certificate,
            additional_certificates,
        ) = pkcs12.load_key_and_certificates(f.read(), read_password().encode())
        # convert `cryptography` private key and certificate to `pyOpenSSL` objects
Pascal Osterwinter's avatar
Pascal Osterwinter committed
        private_key_pyopenssl = openssl_crypto.PKey.from_cryptography_key(private_key)
        certificate_pyopenssl = openssl_crypto.X509.from_cryptography(certificate)

        # check order of certificate chain
        if (
            len(additional_certificates)
            and additional_certificates[0].subject == additional_certificates[0].issuer
        ):
            # If subject and issuer matches, this is the root certificate.
            # The root certificate must be the last cert in the chain.
            # Hence, the certificate chain is sorted in the wrong order.
            print("Incorrectly ordered certificate chain. Reversing...")
            additional_certificates = reversed(additional_certificates)

        # convert `cryptography` certificate chain to `pyOpenSSL` object
Pascal Osterwinter's avatar
Pascal Osterwinter committed
        additional_certificates_pyopenssl = [
            openssl_crypto.X509.from_cryptography(cert)
            for cert in additional_certificates
        ]

        # validate certificate chain

        if not verify.verify_certificate_chain(
            certificate_pyopenssl, additional_certificates_pyopenssl, environment
        ):
            raise CertificateChainError(
                "Unable to verify certificate chain.\n"
                "Did you select the right Environment?"
            )

        # verify key usage
        if not verify.check_cert_key_usage(certificate_pyopenssl):
            raise KeyUsageError("Certificate keyUsage could not be verified!")

Pascal Osterwinter's avatar
Pascal Osterwinter committed
        return (
            private_key_pyopenssl,
            certificate_pyopenssl,
            additional_certificates_pyopenssl,
        )
    except ValueError as e:
        sys.exit("Error: " + str(e))
    except Exception as e:
Pascal Osterwinter's avatar
Pascal Osterwinter committed


def cert_to_x5c(cert):
    # export certificate as ASN1 (DER)
    cert_asn1 = openssl_crypto.dump_certificate(openssl_crypto.FILETYPE_PEM, cert)
    cert_asn1_base64 = (
        cert_asn1.replace(b"-----BEGIN CERTIFICATE-----\n", b"")
        .replace(b"\n-----END CERTIFICATE-----\n", b"")
        .replace(b"\n", b"")
    )
    cert_asn1_base64_str = cert_asn1_base64.decode("UTF-8")
    return cert_asn1_base64_str


def write_jwk_files(private_key, certificate, chain, output_dir, overwrite_files):
    """create 4 JWK's from the p12 file
    - JWK with publicKey and key_ops "wrapKey" for encryption
    - JWK with privateKey and key_ops "unwrapKey" for encryption
    - JWK with publicKey and key_ops "verify" for signature creation
    - JWK with publicKey and key_ops "sign" for signature validation
    """

    # check that file path exists
    if not output_dir.is_dir():
        sys.exit("Error: Output directory does not exist")

    # check if files already exist
    publicKey_wrapkey_file = pathlib.Path(output_dir, "publicKey_encryption.jwk.json")
    publicKey_verify_file = pathlib.Path(
        output_dir, "publicKey_signature_verification.jwk.json"
    )
    privateKey_unwrapkey_file = pathlib.Path(
        output_dir, "privateKey_decryption.jwk.json"
    )
    privateKey_sign_file = pathlib.Path(output_dir, "privateKey_signing.jwk.json")

    if not overwrite_files and (
        os.path.isfile(publicKey_wrapkey_file)
        or os.path.isfile(publicKey_verify_file)
        or os.path.isfile(privateKey_unwrapkey_file)
        or os.path.isfile(privateKey_sign_file)
    ):
        sys.exit(
            "Error: File already exists. If this should be overwritten try the -f flag."
        )

posterwinter's avatar
posterwinter committed
    if not verify.check_key_length(certificate):
        raise KeyLengthError("Certificate key to weak. Should be at least 4096 bit!")

    # verify signature and hash algorithms
    if not verify.verify_certificate_algorithms(certificate):
        raise CertificateAlgorithmError(
            "Certificate Signature or Hash algorithm could not be verified!"
        )

Pascal Osterwinter's avatar
Pascal Osterwinter committed
    # export certificate and private key as PEM
    certificate_pem = openssl_crypto.dump_certificate(
        openssl_crypto.FILETYPE_PEM, certificate
    )
    private_key_pem = openssl_crypto.dump_privatekey(
        openssl_crypto.FILETYPE_PEM, private_key
    )

    # build x5c array
    # see https://datatracker.ietf.org/doc/html/rfc7517#section-4.7
    x5c = [cert_to_x5c(certificate)]
    for chain_cert in chain:
        x5c.append(cert_to_x5c(chain_cert))

    # create encryption JWK's

    # derive public key (encryption)
    jwk_wrapKey = jwk.JWK.from_pem(certificate_pem)
    jwk_wrapKey.setdefault("alg", "RSA-OAEP-256")
    jwk_wrapKey.setdefault("x5c", x5c)
    jwk_wrapKey.setdefault("key_ops", ["wrapKey"])

    # derive private key (decryption)
    jwk_unwrapKey = jwk.JWK.from_pem(private_key_pem)
    jwk_unwrapKey.setdefault("alg", "RSA-OAEP-256")
    jwk_unwrapKey.setdefault("x5c", x5c)
Pascal Osterwinter's avatar
Pascal Osterwinter committed
    jwk_unwrapKey.setdefault("key_ops", ["unwrapKey"])

    # create signature JWK's

    # derive public key (signature verification)
    jwk_verify = jwk.JWK.from_pem(certificate_pem)
    jwk_verify.setdefault("alg", "PS512")
    jwk_verify.setdefault("x5c", x5c)
    jwk_verify.setdefault("key_ops", ["verify"])

    # derive private key (signing)
    jwk_sign = jwk.JWK.from_pem(private_key_pem)
    jwk_sign.setdefault("alg", "PS512")
    jwk_sign.setdefault("x5c", x5c)
Pascal Osterwinter's avatar
Pascal Osterwinter committed
    jwk_sign.setdefault("key_ops", ["sign"])

    if not verify.check_jwk_key_length(
        [jwk_wrapKey, jwk_verify, jwk_unwrapKey, jwk_sign]
    ):
        sys.exit("Jwk key length could not be verified!")

    # validate that the jwk contains the entire chain of certificates
    if not verify.validate_jwk_x5c_chain(
        {jwk_wrapKey, jwk_verify, jwk_unwrapKey, jwk_sign},
        certificate.to_cryptography(),
    ):
        sys.exit("Jwk contain incorrect x5c certificate chain.")

Pascal Osterwinter's avatar
Pascal Osterwinter committed
    # write public keys to file
    with open(publicKey_wrapkey_file, "wb") as f:
        exp = jwk_wrapKey.export(private_key=False)
        tmp = json.loads(exp)
        kid = tmp["kid"] + "-wrapKey"
        tmp["kid"] = kid
        f.write(json.dumps(tmp).encode("UTF-8"))

    with open(publicKey_verify_file, "wb") as f:
        exp = jwk_verify.export(private_key=False)
        tmp = json.loads(exp)
        kid = tmp["kid"] + "-verify"
        tmp["kid"] = kid
        f.write(json.dumps(tmp).encode("UTF-8"))

    # write private keys to file
    with open(privateKey_unwrapkey_file, "wb") as f:
        exp = jwk_unwrapKey.export(private_key=True)
        tmp = json.loads(exp)
        kid = tmp["kid"] + "-wrapKey"
        tmp["kid"] = kid
        f.write(json.dumps(tmp).encode("UTF-8"))

    with open(privateKey_sign_file, "wb") as f:
        exp = jwk_sign.export(private_key=True)
        tmp = json.loads(exp)
        kid = tmp["kid"] + "-verify"
        tmp["kid"] = kid
        f.write(json.dumps(tmp).encode("UTF-8"))

    print(
        f"🔒 Wrote JWK representation of encryption public key (key_use=wrapKey) to {publicKey_wrapkey_file}"
    )
    print(
        f"🔒 Wrote JWK representation of signature validation public key (key_use=verify) to {publicKey_verify_file}"
    )
    print(
        "Please upload these keys when creating a destination in the self service portal."
    )
    print()
    print(
        f"🔒 Wrote JWK representation of decryption private key (key_use=unwrapKey) to {privateKey_unwrapkey_file}"
    )
    print(
        f"🔒 Wrote JWK representation of signing private key (key_use=sign) to {privateKey_sign_file}"
    )
    print("These keys can be used to sign and decrypt in your client application.")


# TESTING
# instead of passing a p12 keystore - create a self generated keypair for testing
def create_self_signed_cert():
    print("Creating Self Signed certificate..")
    print("This should only be used for testing purposes!")
    # create key pair
    keypair = openssl_crypto.PKey()
    keypair.generate_key(openssl_crypto.TYPE_RSA, 4096)

    # create self-signed cert
    cert = openssl_crypto.X509()
    cert.get_subject().C = "DE"
    cert.get_subject().O = "Testbehoerde"
    cert.get_subject().CN = "FIT Connect Testzertifikat"
    cert.set_serial_number(random.randint(50000000, 100000000))
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
    cert.set_issuer(cert.get_subject())
    cert.set_pubkey(keypair)
    cert.sign(keypair, "sha512")

    return keypair, cert


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Generate JWKs from PKCS12 (p12) file."
    )
    parser.add_argument(
        "-i",
        "--input",
        help="The pkcs12 file that will be used to generate JWKs.",
        required=True,
        type=pathlib.Path,
    )

    # optional input parameter specifying location for key storage
    # default location is the working directory
    parser.add_argument(
        "-o",
        "--output",
        default="",
        help="Directory to store the generated JWKs. Defaults to the working directory",
        type=pathlib.Path,
    )

    # the tool does not override any files.
    # this behavior can be overwritten with the -f / --force input flag
    parser.add_argument(
        "-f",
        "--force",
        action="store_true",
        help="Overwrite existing JWK files. Default: disabled",
    )

    parser.add_argument(
        "-e",
        "--environment",
        choices=verify.TRUSTED_ROOT_CAS.keys(),
Pascal Osterwinter's avatar
Pascal Osterwinter committed
        help="Select the PKI environment from which the provided p12 file originates (test or production)",
        required=True,
    )

    args = parser.parse_args()

    if args.force:
        print("Warning: Existing JWK files at location will be overwritten!")

    if str(args.input) == "debug":
        private_key, certificate = create_self_signed_cert()
        certificate_chain = []
    else:
        ) = read_pkcs12(args.input, args.environment)
Pascal Osterwinter's avatar
Pascal Osterwinter committed

        # TODO: check hash and signature algorithms of all certificates in certificate chain

    write_jwk_files(
        private_key, certificate, certificate_chain, args.output, args.force
    )