1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00

Reformat everything.

This commit is contained in:
Felix Fontein 2025-11-01 12:08:41 +01:00
parent 3f2213791a
commit 340ff8586d
1008 changed files with 61301 additions and 58309 deletions

View file

@ -200,6 +200,7 @@ try:
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import backend
HAS_CRYPTOGRAPHY_PKCS12 = True
except ImportError:
HAS_CRYPTOGRAPHY_PKCS12 = False
@ -210,72 +211,73 @@ class JavaKeystore:
self.module = module
self.result = dict()
self.keytool_bin = module.get_bin_path('keytool', True)
self.keytool_bin = module.get_bin_path("keytool", True)
self.certificate = module.params['certificate']
self.keypass = module.params['private_key_passphrase']
self.keystore_path = module.params['dest']
self.name = module.params['name']
self.password = module.params['password']
self.private_key = module.params['private_key']
self.ssl_backend = module.params['ssl_backend']
self.keystore_type = module.params['keystore_type']
self.certificate = module.params["certificate"]
self.keypass = module.params["private_key_passphrase"]
self.keystore_path = module.params["dest"]
self.name = module.params["name"]
self.password = module.params["password"]
self.private_key = module.params["private_key"]
self.ssl_backend = module.params["ssl_backend"]
self.keystore_type = module.params["keystore_type"]
if self.ssl_backend == 'openssl':
self.openssl_bin = module.get_bin_path('openssl', True)
if self.ssl_backend == "openssl":
self.openssl_bin = module.get_bin_path("openssl", True)
else:
if not HAS_CRYPTOGRAPHY_PKCS12:
self.module.fail_json(msg=missing_required_lib('cryptography >= 3.0'))
self.module.fail_json(msg=missing_required_lib("cryptography >= 3.0"))
if module.params['certificate_path'] is None:
if module.params["certificate_path"] is None:
self.certificate_path = create_file(self.certificate)
self.module.add_cleanup_file(self.certificate_path)
else:
self.certificate_path = module.params['certificate_path']
self.certificate_path = module.params["certificate_path"]
if module.params['private_key_path'] is None:
if module.params["private_key_path"] is None:
self.private_key_path = create_file(self.private_key)
self.module.add_cleanup_file(self.private_key_path)
else:
self.private_key_path = module.params['private_key_path']
self.private_key_path = module.params["private_key_path"]
def update_permissions(self):
file_args = self.module.load_file_common_arguments(self.module.params, path=self.keystore_path)
return self.module.set_fs_attributes_if_different(file_args, False)
def read_certificate_fingerprint(self, cert_format='PEM'):
if self.ssl_backend == 'cryptography':
if cert_format == 'PEM':
def read_certificate_fingerprint(self, cert_format="PEM"):
if self.ssl_backend == "cryptography":
if cert_format == "PEM":
cert_loader = load_pem_x509_certificate
else:
cert_loader = load_der_x509_certificate
try:
with open(self.certificate_path, 'rb') as cert_file:
cert = cert_loader(
cert_file.read(),
backend=backend
)
with open(self.certificate_path, "rb") as cert_file:
cert = cert_loader(cert_file.read(), backend=backend)
except (OSError, ValueError) as e:
self.module.fail_json(msg=f"Unable to read the provided certificate: {e}")
fp = cert.fingerprint(hashes.SHA256()).hex().upper()
fingerprint = ':'.join([fp[i:i + 2] for i in range(0, len(fp), 2)])
fingerprint = ":".join([fp[i : i + 2] for i in range(0, len(fp), 2)])
else:
current_certificate_fingerprint_cmd = [
self.openssl_bin, "x509", "-noout", "-in", self.certificate_path, "-fingerprint", "-sha256"
self.openssl_bin,
"x509",
"-noout",
"-in",
self.certificate_path,
"-fingerprint",
"-sha256",
]
(rc, current_certificate_fingerprint_out, current_certificate_fingerprint_err) = self.module.run_command(
current_certificate_fingerprint_cmd,
environ_update=None,
check_rc=False
current_certificate_fingerprint_cmd, environ_update=None, check_rc=False
)
if rc != 0:
return self.module.fail_json(
msg=current_certificate_fingerprint_out,
err=current_certificate_fingerprint_err,
cmd=current_certificate_fingerprint_cmd,
rc=rc
rc=rc,
)
current_certificate_match = re.search(r"=([\w:]+)", current_certificate_fingerprint_out)
@ -283,7 +285,7 @@ class JavaKeystore:
return self.module.fail_json(
msg=f"Unable to find the current certificate fingerprint in {current_certificate_fingerprint_out}",
cmd=current_certificate_fingerprint_cmd,
rc=rc
rc=rc,
)
fingerprint = current_certificate_match.group(1)
@ -291,25 +293,33 @@ class JavaKeystore:
def read_stored_certificate_fingerprint(self):
stored_certificate_fingerprint_cmd = [
self.keytool_bin, "-list", "-alias", self.name,
"-keystore", self.keystore_path, "-v"
self.keytool_bin,
"-list",
"-alias",
self.name,
"-keystore",
self.keystore_path,
"-v",
]
(rc, stored_certificate_fingerprint_out, stored_certificate_fingerprint_err) = self.module.run_command(
stored_certificate_fingerprint_cmd, data=self.password, check_rc=False)
stored_certificate_fingerprint_cmd, data=self.password, check_rc=False
)
if rc != 0:
if f"keytool error: java.lang.Exception: Alias <{self.name}> does not exist" \
in stored_certificate_fingerprint_out:
if (
f"keytool error: java.lang.Exception: Alias <{self.name}> does not exist"
in stored_certificate_fingerprint_out
):
return "alias mismatch"
if re.match(
r"keytool error: java\.io\.IOException: [Kk]eystore( was tampered with, or)? password was incorrect",
stored_certificate_fingerprint_out
r"keytool error: java\.io\.IOException: [Kk]eystore( was tampered with, or)? password was incorrect",
stored_certificate_fingerprint_out,
):
return "password mismatch"
return self.module.fail_json(
msg=stored_certificate_fingerprint_out,
err=stored_certificate_fingerprint_err,
cmd=stored_certificate_fingerprint_cmd,
rc=rc
rc=rc,
)
if self.keystore_type not in (None, self.current_type()):
@ -320,69 +330,52 @@ class JavaKeystore:
return self.module.fail_json(
msg=f"Unable to find the stored certificate fingerprint in {stored_certificate_fingerprint_out}",
cmd=stored_certificate_fingerprint_cmd,
rc=rc
rc=rc,
)
return stored_certificate_match.group(1)
def current_type(self):
magic_bytes = b'\xfe\xed\xfe\xed'
with open(self.keystore_path, 'rb') as fd:
magic_bytes = b"\xfe\xed\xfe\xed"
with open(self.keystore_path, "rb") as fd:
header = fd.read(4)
if header == magic_bytes:
return 'jks'
return 'pkcs12'
return "jks"
return "pkcs12"
def cert_changed(self):
current_certificate_fingerprint = self.read_certificate_fingerprint()
stored_certificate_fingerprint = self.read_stored_certificate_fingerprint()
return current_certificate_fingerprint != stored_certificate_fingerprint
def cryptography_create_pkcs12_bundle(self, keystore_p12_path, key_format='PEM', cert_format='PEM'):
if key_format == 'PEM':
def cryptography_create_pkcs12_bundle(self, keystore_p12_path, key_format="PEM", cert_format="PEM"):
if key_format == "PEM":
key_loader = load_pem_private_key
else:
key_loader = load_der_private_key
if cert_format == 'PEM':
if cert_format == "PEM":
cert_loader = load_pem_x509_certificate
else:
cert_loader = load_der_x509_certificate
try:
with open(self.private_key_path, 'rb') as key_file:
private_key = key_loader(
key_file.read(),
password=to_bytes(self.keypass),
backend=backend
)
with open(self.private_key_path, "rb") as key_file:
private_key = key_loader(key_file.read(), password=to_bytes(self.keypass), backend=backend)
except TypeError:
# Re-attempt with no password to match existing behavior
try:
with open(self.private_key_path, 'rb') as key_file:
private_key = key_loader(
key_file.read(),
password=None,
backend=backend
)
with open(self.private_key_path, "rb") as key_file:
private_key = key_loader(key_file.read(), password=None, backend=backend)
except (OSError, TypeError, ValueError, UnsupportedAlgorithm) as e:
self.module.fail_json(
msg=f"The following error occurred while loading the provided private_key: {e}"
)
self.module.fail_json(msg=f"The following error occurred while loading the provided private_key: {e}")
except (OSError, ValueError, UnsupportedAlgorithm) as e:
self.module.fail_json(
msg=f"The following error occurred while loading the provided private_key: {e}"
)
self.module.fail_json(msg=f"The following error occurred while loading the provided private_key: {e}")
try:
with open(self.certificate_path, 'rb') as cert_file:
cert = cert_loader(
cert_file.read(),
backend=backend
)
with open(self.certificate_path, "rb") as cert_file:
cert = cert_loader(cert_file.read(), backend=backend)
except (OSError, ValueError, UnsupportedAlgorithm) as e:
self.module.fail_json(
msg=f"The following error occurred while loading the provided certificate: {e}"
)
self.module.fail_json(msg=f"The following error occurred while loading the provided certificate: {e}")
if self.password:
encryption = BestAvailableEncryption(to_bytes(self.password))
@ -390,21 +383,30 @@ class JavaKeystore:
encryption = NoEncryption()
pkcs12_bundle = serialize_key_and_certificates(
name=to_bytes(self.name),
key=private_key,
cert=cert,
cas=None,
encryption_algorithm=encryption
name=to_bytes(self.name), key=private_key, cert=cert, cas=None, encryption_algorithm=encryption
)
with open(keystore_p12_path, 'wb') as p12_file:
with open(keystore_p12_path, "wb") as p12_file:
p12_file.write(pkcs12_bundle)
self.result.update(msg="PKCS#12 bundle created by cryptography backend")
def openssl_create_pkcs12_bundle(self, keystore_p12_path):
export_p12_cmd = [self.openssl_bin, "pkcs12", "-export", "-name", self.name, "-in", self.certificate_path,
"-inkey", self.private_key_path, "-out", keystore_p12_path, "-passout", "stdin"]
export_p12_cmd = [
self.openssl_bin,
"pkcs12",
"-export",
"-name",
self.name,
"-in",
self.certificate_path,
"-inkey",
self.private_key_path,
"-out",
keystore_p12_path,
"-passout",
"stdin",
]
# when keypass is provided, add -passin
cmd_stdin = ""
@ -420,42 +422,49 @@ class JavaKeystore:
self.result = dict(msg=export_p12_out, cmd=export_p12_cmd, rc=rc)
if rc != 0:
self.result['err'] = export_p12_err
self.result["err"] = export_p12_err
self.module.fail_json(**self.result)
def create(self):
"""Create the keystore, or replace it with a rollback in case of
keytool failure.
keytool failure.
"""
if self.module.check_mode:
self.result['changed'] = True
self.result["changed"] = True
return self.result
keystore_p12_path = create_path()
self.module.add_cleanup_file(keystore_p12_path)
if self.ssl_backend == 'cryptography':
if self.ssl_backend == "cryptography":
self.cryptography_create_pkcs12_bundle(keystore_p12_path)
else:
self.openssl_create_pkcs12_bundle(keystore_p12_path)
if self.keystore_type == 'pkcs12':
if self.keystore_type == "pkcs12":
# Preserve properties of the destination file, if any.
self.module.atomic_move(os.path.abspath(keystore_p12_path), os.path.abspath(self.keystore_path))
self.update_permissions()
self.result['changed'] = True
self.result["changed"] = True
return self.result
import_keystore_cmd = [self.keytool_bin, "-importkeystore",
"-destkeystore", self.keystore_path,
"-srckeystore", keystore_p12_path,
"-srcstoretype", "pkcs12",
"-alias", self.name,
"-noprompt"]
import_keystore_cmd = [
self.keytool_bin,
"-importkeystore",
"-destkeystore",
self.keystore_path,
"-srckeystore",
keystore_p12_path,
"-srcstoretype",
"pkcs12",
"-alias",
self.name,
"-noprompt",
]
if self.keystore_type == 'jks':
keytool_help = self.module.run_command([self.keytool_bin, '-importkeystore', '-help'])
if '-deststoretype' in keytool_help[1] + keytool_help[2]:
if self.keystore_type == "jks":
keytool_help = self.module.run_command([self.keytool_bin, "-importkeystore", "-help"])
if "-deststoretype" in keytool_help[1] + keytool_help[2]:
import_keystore_cmd.insert(4, "-deststoretype")
import_keystore_cmd.insert(5, self.keystore_type)
@ -467,7 +476,7 @@ class JavaKeystore:
os.remove(self.keystore_path)
(rc, import_keystore_out, import_keystore_err) = self.module.run_command(
import_keystore_cmd, data=f'{self.password}\n{self.password}\n{self.password}', check_rc=False
import_keystore_cmd, data=f"{self.password}\n{self.password}\n{self.password}", check_rc=False
)
self.result = dict(msg=import_keystore_out, cmd=import_keystore_cmd, rc=rc)
@ -477,13 +486,13 @@ class JavaKeystore:
if keystore_backup is not None:
self.module.preserved_copy(keystore_backup, self.keystore_path)
os.remove(keystore_backup)
self.result['err'] = import_keystore_err
self.result["err"] = import_keystore_err
return self.module.fail_json(**self.result)
self.update_permissions()
if keystore_backup is not None:
os.remove(keystore_backup)
self.result['changed'] = True
self.result["changed"] = True
return self.result
def exists(self):
@ -499,49 +508,48 @@ def create_path():
def create_file(content):
tmpfd, tmpfile = tempfile.mkstemp()
with os.fdopen(tmpfd, 'w') as f:
with os.fdopen(tmpfd, "w") as f:
f.write(content)
return tmpfile
def main():
choose_between = (['certificate', 'certificate_path'],
['private_key', 'private_key_path'])
choose_between = (["certificate", "certificate_path"], ["private_key", "private_key_path"])
module = AnsibleModule(
argument_spec=dict(
name=dict(type='str', required=True),
dest=dict(type='path', required=True),
certificate=dict(type='str', no_log=True),
certificate_path=dict(type='path'),
private_key=dict(type='str', no_log=True),
private_key_path=dict(type='path', no_log=False),
private_key_passphrase=dict(type='str', no_log=True),
password=dict(type='str', required=True, no_log=True),
ssl_backend=dict(type='str', default='openssl', choices=['openssl', 'cryptography']),
keystore_type=dict(type='str', choices=['jks', 'pkcs12']),
force=dict(type='bool', default=False),
name=dict(type="str", required=True),
dest=dict(type="path", required=True),
certificate=dict(type="str", no_log=True),
certificate_path=dict(type="path"),
private_key=dict(type="str", no_log=True),
private_key_path=dict(type="path", no_log=False),
private_key_passphrase=dict(type="str", no_log=True),
password=dict(type="str", required=True, no_log=True),
ssl_backend=dict(type="str", default="openssl", choices=["openssl", "cryptography"]),
keystore_type=dict(type="str", choices=["jks", "pkcs12"]),
force=dict(type="bool", default=False),
),
required_one_of=choose_between,
mutually_exclusive=choose_between,
supports_check_mode=True,
add_file_common_args=True,
)
module.run_command_environ_update = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')
module.run_command_environ_update = dict(LANG="C", LC_ALL="C", LC_MESSAGES="C")
result = dict()
jks = JavaKeystore(module)
if jks.exists():
if module.params['force'] or jks.cert_changed():
if module.params["force"] or jks.cert_changed():
result = jks.create()
else:
result['changed'] = jks.update_permissions()
result["changed"] = jks.update_permissions()
else:
result = jks.create()
module.exit_json(**result)
if __name__ == '__main__':
if __name__ == "__main__":
main()