Newer
Older
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022 FIT-Connect contributors
#
# SPDX-License-Identifier: EUPL-1.2
import argparse
import json
import os.path
import pathlib
import random
import sys
import os
import certificateValidation as verify
from OpenSSL import crypto as openssl_crypto
from cryptography.hazmat.primitives.serialization import pkcs12
from jwcrypto import jwk
# Custom exceptions for pytest checks.
class KeyLengthError(Exception):
pass
class CertificateChainError(Exception):
pass
class KeyUsageError(Exception):
pass
class CertificateAlgorithmError(Exception):
pass
def read_password():
pkcs12_pass_from_env = os.getenv("PKCS12_CONTAINER_PASS")
if pkcs12_pass_from_env is not None:
return pkcs12_pass_from_env
return input("Please enter the password for PKCS12 keystore: ")
def read_pkcs12(path, environment):
"""Read p12 file"""
# Print error in case the file type is incorrect or faulty
extension = os.path.splitext(path)[1]
allowed_extensions = [".p12", ".pfx"]
if not extension or extension not in allowed_extensions:
exit_msg = (
"Error: Invalid PKCS12 file extension '{}'. Valid ones are: {}".format(
extension, ", ".join(allowed_extensions)
)
)
sys.exit(exit_msg)
try:
f = open(path, "rb")
except FileNotFoundError:
sys.exit("Error: No such file. Exiting program.")
try:
(
private_key,
certificate,
additional_certificates,
) = pkcs12.load_key_and_certificates(f.read(), read_password().encode())
# convert `cryptography` private key and certificates to `pyOpenSSL` objects
private_key_pyopenssl = openssl_crypto.PKey.from_cryptography_key(private_key)
certificate_pyopenssl = openssl_crypto.X509.from_cryptography(certificate)
additional_certificates_pyopenssl = [
openssl_crypto.X509.from_cryptography(cert)
for cert in additional_certificates
]
# validate certificate chain
if not verify.verify_certificate_chain(
certificate_pyopenssl, additional_certificates_pyopenssl, environment
):
raise CertificateChainError(
"Unable to verify certificate chain.\n"
"Did you select the right Environment?"
)
# verify key usage
if not verify.check_cert_key_usage(certificate_pyopenssl):
raise KeyUsageError("Certificate keyUsage could not be verified!")
return (
private_key_pyopenssl,
certificate_pyopenssl,
additional_certificates_pyopenssl,
)
except ValueError as e:
sys.exit("Error: " + str(e))
except Exception as e:
sys.exit("Error:" + str(e))
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def cert_to_x5c(cert):
# export certificate as ASN1 (DER)
cert_asn1 = openssl_crypto.dump_certificate(openssl_crypto.FILETYPE_PEM, cert)
cert_asn1_base64 = (
cert_asn1.replace(b"-----BEGIN CERTIFICATE-----\n", b"")
.replace(b"\n-----END CERTIFICATE-----\n", b"")
.replace(b"\n", b"")
)
cert_asn1_base64_str = cert_asn1_base64.decode("UTF-8")
return cert_asn1_base64_str
def write_jwk_files(private_key, certificate, chain, output_dir, overwrite_files):
"""create 4 JWK's from the p12 file
- JWK with publicKey and key_ops "wrapKey" for encryption
- JWK with privateKey and key_ops "unwrapKey" for encryption
- JWK with publicKey and key_ops "verify" for signature creation
- JWK with publicKey and key_ops "sign" for signature validation
"""
# check that file path exists
if not output_dir.is_dir():
sys.exit("Error: Output directory does not exist")
# check if files already exist
publicKey_wrapkey_file = pathlib.Path(output_dir, "publicKey_encryption.jwk.json")
publicKey_verify_file = pathlib.Path(
output_dir, "publicKey_signature_verification.jwk.json"
)
privateKey_unwrapkey_file = pathlib.Path(
output_dir, "privateKey_decryption.jwk.json"
)
privateKey_sign_file = pathlib.Path(output_dir, "privateKey_signing.jwk.json")
if not overwrite_files and (
os.path.isfile(publicKey_wrapkey_file)
or os.path.isfile(publicKey_verify_file)
or os.path.isfile(privateKey_unwrapkey_file)
or os.path.isfile(privateKey_sign_file)
):
sys.exit(
"Error: File already exists. If this should be overwritten try the -f flag."
)
# verify key length
if certificate.get_pubkey().bits() < 4096:
raise KeyLengthError("Certificate key to weak. Should be at least 4096 bit!")
# verify signature and hash algorithms
if not verify.verify_certificate_algorithms(certificate):
raise CertificateAlgorithmError(
"Certificate Signature or Hash algorithm could not be verified!"
)
# export certificate and private key as PEM
certificate_pem = openssl_crypto.dump_certificate(
openssl_crypto.FILETYPE_PEM, certificate
)
private_key_pem = openssl_crypto.dump_privatekey(
openssl_crypto.FILETYPE_PEM, private_key
)
# build x5c array
# see https://datatracker.ietf.org/doc/html/rfc7517#section-4.7
x5c = [cert_to_x5c(certificate)]
for chain_cert in chain:
x5c.append(cert_to_x5c(chain_cert))
# create encryption JWK's
# derive public key (encryption)
jwk_wrapKey = jwk.JWK.from_pem(certificate_pem)
jwk_wrapKey.setdefault("alg", "RSA-OAEP-256")
jwk_wrapKey.setdefault("x5c", x5c)
jwk_wrapKey.setdefault("key_ops", ["wrapKey"])
# derive private key (decryption)
jwk_unwrapKey = jwk.JWK.from_pem(private_key_pem)
jwk_unwrapKey.setdefault("alg", "RSA-OAEP-256")
jwk_unwrapKey.setdefault("x5c", x5c)
jwk_unwrapKey.setdefault("key_ops", ["unwrapKey"])
# create signature JWK's
# derive public key (signature verification)
jwk_verify = jwk.JWK.from_pem(certificate_pem)
jwk_verify.setdefault("alg", "PS512")
jwk_verify.setdefault("x5c", x5c)
jwk_verify.setdefault("key_ops", ["verify"])
# derive private key (signing)
jwk_sign = jwk.JWK.from_pem(private_key_pem)
jwk_sign.setdefault("alg", "PS512")
jwk_sign.setdefault("x5c", x5c)
posterwinter
committed
# validate key length
if not verify.check_jwk_key_length(
[jwk_wrapKey, jwk_verify, jwk_unwrapKey, jwk_sign]
):
posterwinter
committed
sys.exit("Jwk key length could not be verified!")
# validate that the jwk contains the entire chain of certificates
if not verify.validate_jwk_x5c_chain(
{jwk_wrapKey, jwk_verify, jwk_unwrapKey, jwk_sign},
certificate.to_cryptography(),
):
posterwinter
committed
sys.exit("Jwk contain incorrect x5c certificate chain.")
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# write public keys to file
with open(publicKey_wrapkey_file, "wb") as f:
exp = jwk_wrapKey.export(private_key=False)
tmp = json.loads(exp)
kid = tmp["kid"] + "-wrapKey"
tmp["kid"] = kid
f.write(json.dumps(tmp).encode("UTF-8"))
with open(publicKey_verify_file, "wb") as f:
exp = jwk_verify.export(private_key=False)
tmp = json.loads(exp)
kid = tmp["kid"] + "-verify"
tmp["kid"] = kid
f.write(json.dumps(tmp).encode("UTF-8"))
# write private keys to file
with open(privateKey_unwrapkey_file, "wb") as f:
exp = jwk_unwrapKey.export(private_key=True)
tmp = json.loads(exp)
kid = tmp["kid"] + "-wrapKey"
tmp["kid"] = kid
f.write(json.dumps(tmp).encode("UTF-8"))
with open(privateKey_sign_file, "wb") as f:
exp = jwk_sign.export(private_key=True)
tmp = json.loads(exp)
kid = tmp["kid"] + "-verify"
tmp["kid"] = kid
f.write(json.dumps(tmp).encode("UTF-8"))
print(
f"🔒 Wrote JWK representation of encryption public key (key_use=wrapKey) to {publicKey_wrapkey_file}"
)
print(
f"🔒 Wrote JWK representation of signature validation public key (key_use=verify) to {publicKey_verify_file}"
)
print(
"Please upload these keys when creating a destination in the self service portal."
)
print()
print(
f"🔒 Wrote JWK representation of decryption private key (key_use=unwrapKey) to {privateKey_unwrapkey_file}"
)
print(
f"🔒 Wrote JWK representation of signing private key (key_use=sign) to {privateKey_sign_file}"
)
print("These keys can be used to sign and decrypt in your client application.")
# TESTING
# instead of passing a p12 keystore - create a self generated keypair for testing
def create_self_signed_cert():
print("Creating Self Signed certificate..")
print("This should only be used for testing purposes!")
# create key pair
keypair = openssl_crypto.PKey()
keypair.generate_key(openssl_crypto.TYPE_RSA, 4096)
# create self-signed cert
cert = openssl_crypto.X509()
cert.get_subject().C = "DE"
cert.get_subject().O = "Testbehoerde"
cert.get_subject().CN = "FIT Connect Testzertifikat"
cert.set_serial_number(random.randint(50000000, 100000000))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(keypair)
cert.sign(keypair, "sha512")
return keypair, cert
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate JWKs from PKCS12 (p12) file."
)
parser.add_argument(
"-i",
"--input",
help="The pkcs12 file that will be used to generate JWKs.",
required=True,
type=pathlib.Path,
)
# optional input parameter specifying location for key storage
# default location is the working directory
parser.add_argument(
"-o",
"--output",
default="",
help="Directory to store the generated JWKs. Defaults to the working directory",
type=pathlib.Path,
)
# the tool does not override any files.
# this behavior can be overwritten with the -f / --force input flag
parser.add_argument(
"-f",
"--force",
action="store_true",
help="Overwrite existing JWK files. Default: disabled",
)
parser.add_argument(
"-e",
"--environment",
choices=verify.TRUSTED_ROOT_CAS.keys(),
help="Select the PKI environment from which the provided p12 file originates (test or production)",
required=True,
)
args = parser.parse_args()
if args.force:
print("Warning: Existing JWK files at location will be overwritten!")
if str(args.input) == "debug":
private_key, certificate = create_self_signed_cert()
certificate_chain = []
else:
posterwinter
committed
(
private_key,
certificate,
certificate_chain,
) = read_pkcs12(args.input, args.environment)
# TODO: check hash and signature algorithms of all certificates in certificate chain
write_jwk_files(
private_key, certificate, certificate_chain, args.output, args.force
)