From ed5fb4ec5ca93cdfce2e1674e48058214cb9e0a1 Mon Sep 17 00:00:00 2001
From: posterwinter <pascal.osterwinter@gmail.com>
Date: Wed, 23 Feb 2022 17:20:29 +0100
Subject: [PATCH] Added cli_jwk_tool

---
 cli_jwk_tool.py | 299 ++++++++++++++++++++++++++++++++++++++++++++++++
 pyproject.toml  |  19 +++
 2 files changed, 318 insertions(+)
 create mode 100644 cli_jwk_tool.py
 create mode 100644 pyproject.toml

diff --git a/cli_jwk_tool.py b/cli_jwk_tool.py
new file mode 100644
index 0000000..3b98a1d
--- /dev/null
+++ b/cli_jwk_tool.py
@@ -0,0 +1,299 @@
+# Beantragte Zertifikate aus der DOI-CA liegen zunächst als p12-Keystore vor.
+# Um die darin enthaltenen Zertifikate für die Hinterlegung im Zustelldienst nutzbar zu machen,
+# sollen aus diesem Keystore entsprechende JSON Web Keys abgeleitet werden. 
+# Hierzu soll ein entsprechendes Tool entwickelt werden,
+# dass die Anforderungen aus [docs#18 (moved)] umsetzt.
+# Eine Konvertierung von Schlüsseln/Zertifikaten im Self-Service-Portal
+# oder im Zustelldienst ist damit nicht mehr nötig.
+import argparse
+import pathlib
+from OpenSSL import crypto
+import random
+import sys
+import os.path
+
+# p12 file as input
+# print error in case the file type is incorrect or faulty
+# ask for file password over commandline
+# print error in case the password is incorrect
+from jwcrypto import jwk
+
+
+def open_pkcs12(path):
+    fileType = str(path).split(".")
+    if fileType[-1] != "p12" and fileType[-1] != "pfx":
+        print('Incorrect file type. ".p12" or ".pfx" file required! - "' + fileType[-1] + '" given.')
+        return None
+    try:
+        f = open(path, "rb")
+    except FileNotFoundError:
+        print("No such file. Exiting program..")
+        sys.exit(1)
+    print("Enter password: ")
+    password = input()
+
+    password = "fqe0fka3nai1rfz4"
+
+    try:
+        pkcs12 = crypto.load_pkcs12(f.read(), password.encode())
+        return pkcs12
+    except crypto.Error:
+        print("Incorrect password! Exiting program..")
+        sys.exit(1)
+        return
+    except Exception as e:
+        print("File might be corrupt.")
+        print("Error " + e)
+
+    return None
+
+
+# verify that the p12 certificates are from the DOI CA
+# verify certificate chain
+# print error in case either test is invalid
+def verify_certificate_chain(cert, trusted_certs):
+    # Create a certificate store and add your trusted certs
+    try:
+        store = crypto.X509Store()
+
+        # Assuming the certificates are in PEM format in a trusted_certs list
+        for _cert in trusted_certs:
+            cert_file = open(_cert, 'r')
+            cert_data = cert_file.read()
+            client_certificate = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data)
+            store.add_cert(client_certificate)
+
+        # Create a certificate context using the store and the downloaded certificate
+        store_ctx = crypto.X509StoreContext(store, cert)
+
+        # Verify the certificate, returns None if it can validate the certificate
+        store_ctx.verify_certificate()
+
+        return True
+
+    except Exception as e:
+        print(e)
+        return False
+
+
+def check_cert_key_usage(cert):
+    found_ext = False
+    allow_signing = False
+    allow_encryption = False
+    for i in range(0, cert.get_extension_count()):
+        if cert.get_extension(i).get_short_name() == b'keyUsage':
+            found_ext = True
+            usage = cert.get_extension(i).__str__()
+            print("KeyUsage: " + usage)
+            if "Digital Signature" in usage and "Key Encipherment" in usage:
+                allow_signing = True
+            if "Key Encipherment" in usage:
+                allow_encryption = True
+    if not found_ext:
+        print("KeyUsage Extension not found!")
+        return False
+    if not allow_signing:
+        print("KeyUsage Digital Signature is missing!")
+        return False
+    if not allow_encryption:
+        print("KeyUsage Key Encipherment is missing!")
+        return False
+    return True
+
+
+# create 4 JWK's from the p12 file
+# check the key usages from the certificates allow for this
+# - JWK with publicKey and key_ops "wrapKey" for encryption
+# - JWK with privateKey and key_ops "unwrapKey" for encryption
+# - JWK with publicKey and key_ops "verify" for signature creation
+# - JWK with publicKey and key_ops "sign" for signature validation
+# print error in case this is not permitted
+# TODO
+def create_jwk(output_dir, p12, force):
+    # check if files already exist
+    if (not force):
+        if os.path.isfile(str(output_dir) + "publicKey_encryption.json") \
+                or os.path.isfile(str(output_dir) + "publicKey_encryption.json") \
+                or os.path.isfile(str(output_dir) + "publicKey_signature_verification.json") \
+                or os.path.isfile(str(output_dir) + "privateKey_signing.json"):
+            print("File already exists. If this should be overwritten try the -f flag.")
+            print("Exiting program..")
+            sys.exit(1)
+
+    # create encryption JWK's
+
+    # derive public key
+    jwk_wrapKey = jwk.JWK.from_pem(crypto.dump_certificate(crypto.FILETYPE_PEM, p12.get_certificate()))
+    jwk_wrapKey.setdefault("alg", "RSA-OAEP-256")
+    jwk_wrapKey.setdefault("x5c", [cert_to_x5c(p12.get_certificate())])
+    jwk_wrapKey.setdefault("key_ops", ["wrapKey"])
+
+    # derive private key
+    jwk_unwrapKey = jwk.JWK.from_pem(crypto.dump_privatekey(crypto.FILETYPE_PEM, p12.get_privatekey()))
+    jwk_unwrapKey.setdefault("alg", "RSA-OAEP-256")
+    jwk_unwrapKey.setdefault("key_ops", ["unwrapKey"])
+
+    # create signature JWK's
+
+    # derive public key
+    jwk_verify = jwk.JWK.from_pem(crypto.dump_certificate(crypto.FILETYPE_PEM, p12.get_certificate()))
+    jwk_verify.setdefault("alg", "PS512")
+    jwk_verify.setdefault("x5c", [cert_to_x5c(p12.get_certificate())])
+    jwk_verify.setdefault("key_ops", ["verify"])
+
+    # derive private key
+    jwk_sign = jwk.JWK.from_pem(crypto.dump_privatekey(crypto.FILETYPE_PEM, p12.get_privatekey()))
+    jwk_sign.setdefault("alg", "PS512")
+    jwk_sign.setdefault("key_ops", ["sign"])
+
+    # create JWK'S of public keys
+    jwks = jwk.JWKSet()
+    jwks.add(jwk_wrapKey)
+    jwks.add(jwk_verify)
+
+    # define file paths
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    keySet_file = pathlib.Path(output_dir, "set-public-keys.json")
+    publicKey_wrapkey_file = pathlib.Path(output_dir, "publicKey_encryption.json")
+    publicKey_verify_file = pathlib.Path(output_dir, "publicKey_signature_verification.json")
+    privateKey_unwrapkey_file = pathlib.Path(output_dir, "privateKey_decryption.json")
+    privateKey_sign_file = pathlib.Path(output_dir, "privateKey_signing.json")
+
+    # write JWKS to file
+    with open(keySet_file, "wb") as f:
+        exp = jwks.export(private_keys=False)
+        f.write(exp.encode("UTF-8"))
+
+    # write public keys to file
+    with open(publicKey_wrapkey_file, "wb") as f:
+        exp = jwk_wrapKey.export(private_key=False)
+        f.write(exp.encode("UTF-8"))
+
+    with open(publicKey_verify_file, "wb") as f:
+        exp = jwk_verify.export(private_key=False)
+        f.write(exp.encode("UTF-8"))
+
+    # write private keys to file
+    with open(privateKey_unwrapkey_file, "wb") as f:
+        exp = jwk_unwrapKey.export(private_key=True)
+        f.write(exp.encode("UTF-8"))
+
+    with open(privateKey_sign_file, "wb") as f:
+        exp = jwk_sign.export(private_key=True)
+        f.write(exp.encode("UTF-8"))
+
+    print(f"🔒 Wrote JWK representation of encryption public key (key_use=wrapKey) to {publicKey_wrapkey_file}")
+    print(f"🔒 Wrote JWK representation of signature validation public key (key_use=verify) to {publicKey_verify_file}")
+    print("Please upload these keys when creating a destination in the self service portal.")
+    print()
+    print(f"🔒 Wrote JWKS of Public Keys to {keySet_file}")
+    print(
+        "This key set can be used to update (rotate) keys via the Submission-API (PUT /destinations/\{destinationID\})")
+    print()
+    print(f"🔒 Wrote JWK representation of decryption private key (key_use=unwrapKey) to {privateKey_unwrapkey_file}")
+    print(f"🔒 Wrote JWK representation of signing private key (key_use=sign) to {privateKey_sign_file}")
+    print("These keys can be used to sign and decrypt in your client application.")
+
+
+# all created keys must correspond to cryptographic requirements
+# -> they contain the entire certificate chain
+# TODO
+
+# The content of kid header parameter shall be the base64 (IETF RFC 4648) encoding
+# of one DER-encoded instance of type IssuerSerial type defined in IETF RFC 5035
+# TODO
+
+# dependencies are managed via python-poetry
+# TODO
+
+# tool usage is documented in this article:
+# https://docs.fitko.de/fit-connect/docs/receiving/certificate/#ableitung-eines-fit-connect-kompatiblen-json-web-keys-aus-einem-zertifikat
+# the following url can be used as a template
+# https://docs.fitko.de/fit-connect/docs/details/jwk-creation/
+# TODO
+
+
+def cert_to_x5c(cert):
+    # export certificate as ASN1 (DER)
+    cert_asn1 = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+    cert_asn1_base64 = cert_asn1.replace(b'-----BEGIN CERTIFICATE-----\n', b'').replace(
+        b'\n-----END CERTIFICATE-----\n', b'').replace(b'\n', b'')
+    cert_asn1_base64_str = cert_asn1_base64.decode('UTF-8')
+    return cert_asn1_base64_str
+
+
+# TESTING
+# instead of passing a p12 keystore - create a self generated keypair for testing
+def create_self_signed_cert():
+    print("Creating Self Signed certificate..")
+    print("This should only be used for testing purposes!")
+    # create key pair
+    keypair = crypto.PKey()
+    keypair.generate_key(crypto.TYPE_RSA, 4096)
+
+    # create self-signed cert
+    cert = crypto.X509()
+    cert.get_subject().C = "DE"
+    cert.get_subject().O = "Testbehoerde"
+    cert.get_subject().CN = "FIT Connect Testzertifikat"
+    cert.set_serial_number(random.randint(50000000, 100000000))
+    cert.gmtime_adj_notBefore(0)
+    cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
+    cert.set_issuer(cert.get_subject())
+    cert.set_pubkey(keypair)
+    cert.sign(keypair, 'sha512')
+
+    p12 = crypto.PKCS12()
+    p12.set_certificate(cert)
+    p12.set_privatekey(keypair)
+
+    return p12
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="Generate SET JWKS.")
+    parser.add_argument(
+        "-i",
+        "--input",
+        help="The pkcs12 file that will be used to generate JWK's.",
+        type=pathlib.Path,
+        required=True
+    )
+    # the tool must not save any privateKeys into /tmp
+    # optional input parameter specifying location for key storage
+    # default location is the working directory
+    parser.add_argument(
+        "-o",
+        "--output",
+        default="output/",
+        help="Directory to store the generated SET JWKS in. Default: working_directory/output",
+        type=pathlib.Path,
+    )
+    # the tool must not override any files.
+    # this can however be ignored with the input flag -f / --force
+    parser.add_argument(
+        "-f",
+        "--force",
+        action='store_true',
+        help="With this flag the program will overwrite existing JWKS. Default: disabled"
+    )
+    args = parser.parse_args()
+    if args.force:
+        print("existing certificates at location will be overwritten!")
+    if str(args.input) == "" or str(args.input) == ".":
+        p12_file = create_self_signed_cert()
+    else:
+        p12_file = open_pkcs12(args.input)
+        trusted_certificates = {"res/DOI Test-CA 10.pem", "res/TEST-PCA20.pem"}
+        if not verify_certificate_chain(p12_file.get_certificate(), trusted_certificates):
+            print("Invalid certificate! \nExiting program..")
+            sys.exit(1)
+        print("Valid certificate!")
+        if check_cert_key_usage(p12_file.get_certificate()):
+            print("KeyUsage verified!")
+        else:
+            print("Certificate keyUsage could not be verified! \nExiting program..")
+            sys.exit(1)
+
+    create_jwk(args.output, p12_file, args.force)
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..3272b8f
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,19 @@
+[tool.poetry]
+name = "cli_jwk_tool"
+version = "0.1.0"
+description = "A tool for generating JWKs from a .p12 file."
+authors = ["Pascal Osterwinter <pascal.osterwinter@spotgroup.de>"]
+
+[tool.poetry.dependencies]
+python = "^3.6"
+argparse = "^1.4.0"
+pathlib = "^1.0.1"
+pyOpenSSL = "^22.0.0"
+jwcrypto = "^1.0"
+
+[tool.poetry.dev-dependencies]
+cryptography = "^36.0.1"
+
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
-- 
GitLab