Newer
Older
# SPDX-FileCopyrightText: 2022 FIT-Connect contributors
#
# SPDX-License-Identifier: EUPL-1.2
from OpenSSL import crypto as openssl_crypto
from cryptography.hazmat.primitives import hashes
from cryptography import x509
# list of directories which contain trusted root CA certificates
TRUSTED_ROOT_CAS = {
"prod": "trusted-root-certificates/v-pki-prod",
"test": "trusted-root-certificates/v-pki-test",
"unit-test": "trusted-root-certificates/unit-tests",
REQUIRED_KEY_LENGTH = 4096
def check_key_length(certificate):
if certificate.get_pubkey().bits() < REQUIRED_KEY_LENGTH:
return False
return True
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def check_cert_key_usage(cert):
"""check the key usages from the certificates allow for this"""
found_ext = False
allow_signing = False
allow_encryption = False
for i in range(0, cert.get_extension_count()):
if cert.get_extension(i).get_short_name() == b"keyUsage":
found_ext = True
usage = str(cert.get_extension(i))
print("Info: KeyUsage:", usage)
if "Digital Signature" in usage:
allow_signing = True
if "Key Encipherment" in usage:
allow_encryption = True
if not found_ext:
print("Error: KeyUsage Extension not found!")
return False
if not allow_signing:
print("Error: KeyUsage Digital Signature is missing!")
return False
if not allow_encryption:
print("Error: KeyUsage Key Encipherment is missing!")
return False
return True
def x5c_to_cert(x5c):
# RFC coded certificates
# check that the delimiters: "------BEGIN CERTIFICATE------" exist in the string.
if "-----BEGIN CERTIFICATE-----" not in x5c:
x5c = "-----BEGIN CERTIFICATE-----\n" + x5c + "\n-----END CERTIFICATE-----"
return x509.load_pem_x509_certificate(x5c.encode())
def validate_jwk_x5c_chain(jwks, base_cert):
# the generated jwks have to contain the entire certificate chain
# also, the leaf certificate in the x5c field has to be the deposited certificate!
for tmp_jwk in jwks:
is_leaf_certificate = True
for x5c_cert in tmp_jwk.get("x5c"):
cert = x5c_to_cert(x5c_cert)
if is_leaf_certificate:
# the leaf certificate in the x5c chain has to be the certificate with the key!
is_leaf_certificate = False
issuer = cert.issuer
if cert != base_cert:
print('Error! Leaf certificate in jwk("x5c") field is incorrect.')
return False
else:
# each subsequent certificate has to be the one used to certify the previous
if issuer != cert.subject:
print(
"Error, each subsequent certificate in jwk x5c chain "
"has to be the one issuing the previous certificate."
)
return False
else:
issuer = cert.issuer
return True
def verify_certificate_algorithms(certificate):
x509_certificate = certificate.to_cryptography()
# TODO - VPKI hat falsche algorithmen
return True
if not x509_certificate.signature_hash_algorithm.__class__ == hashes.SHA512:
print(
"Certificate is using hash algorithm {} but needs {}".format(
x509_certificate.signature_hash_algorithm, hashes.SHA512
)
)
return False
# base_certificate = openssl_crypto.X509.from_cryptography(base_certificate)
if certificate.get_signature_algorithm() != b"sha512WithRSAEncryption":
print(
"Certificate is using signature algorithm {} but needs {}".format(
str(certificate.get_signature_algorithm()), "RSASSA-PSS"
)
)
return False
return True
def check_jwk_key_length(jwks):
for tmp_jwk in jwks:
b64encoded_string = tmp_jwk.get("n")
# fill padding if necessary
b64encoded_string += "=" * ((4 - len(b64encoded_string) % 4) % 4)
key_length = len(b64encoded_string) * 8
if key_length < 4096:
print(
"JWK with id {} has wrong key length. Is {} but should be 4096 at least.".format(
tmp_jwk.get("kid"), len(b64encoded_string)
)
)
return False
return True
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def verify_certificate_chain(cert, certificate_chain, environment):
"""Verify that certificates are from DOI CA and verify certificate chain"""
# create list of file paths to trusted CA certificates in PEM format
trusted_root_certs = [
os.path.join(TRUSTED_ROOT_CAS[environment], filename)
for filename in os.listdir(TRUSTED_ROOT_CAS[environment])
]
try:
# Create a certificate store and add trusted certs
store = openssl_crypto.X509Store()
# Add root certs from trusted_root_certs to certificate store
for root_cert_path in trusted_root_certs:
# skip all files other than pem files (e.g. .license files)
extension = os.path.splitext(root_cert_path)[1]
if not extension or extension != ".pem":
continue
# read certificate
root_cert_file = open(root_cert_path, "r")
root_cert_data = root_cert_file.read()
root_cert = openssl_crypto.load_certificate(
openssl_crypto.FILETYPE_PEM, root_cert_data
)
# Add root certificate to X509Store
# WARNING: only add root certificates here!
# `add_cert()` will treat the certificate as ultimately trusted!
# see https://duo.com/labs/research/chain-of-fools#section2
store.add_cert(root_cert)
# Create a certificate context using the store, the certificate and the certificate chain to verify
store_ctx = openssl_crypto.X509StoreContext(
store, cert, chain=certificate_chain
)
# Verify the certificate, returns None if the certificate cannot be
# validated.
# WARNING: `verify_certificate()` will succeed if any of the
# certificates added to the X509Store signed the leaf certificate, even
# if the root didn't sign the intermediate!
if store_ctx.verify_certificate() is None:
return True
except Exception as e:
print("Error", e)
return False
# `store_ctx.verify_certificate()` will either return None or raise an exception
raise RuntimeError("Unexpected Error: This code should never be reached")