# Copyright (c) 2018, Ansible Project # Copyright (c) 2018, Abhijeet Kasurde # # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations import os from unittest.mock import Mock, patch from ansible.module_utils.basic import AnsibleModule # noqa: F401 # pylint: disable=unused-import from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import ( ModuleTestCase, set_module_args, ) from ansible_collections.community.general.plugins.modules.java_keystore import JavaKeystore, create_module class TestCreateJavaKeystore(ModuleTestCase): """Test the creation of a Java keystore.""" def setUp(self): """Setup.""" super().setUp() orig_exists = os.path.exists self.mock_create_file = patch("ansible_collections.community.general.plugins.modules.java_keystore.create_file") self.mock_create_path = patch("ansible_collections.community.general.plugins.modules.java_keystore.create_path") self.mock_current_type = patch( "ansible_collections.community.general.plugins.modules.java_keystore.JavaKeystore.current_type" ) self.mock_run_command = patch("ansible.module_utils.basic.AnsibleModule.run_command") self.mock_get_bin_path = patch("ansible.module_utils.basic.AnsibleModule.get_bin_path") self.mock_preserved_copy = patch("ansible.module_utils.basic.AnsibleModule.preserved_copy") self.mock_atomic_move = patch("ansible.module_utils.basic.AnsibleModule.atomic_move") self.mock_os_path_exists = patch( "os.path.exists", side_effect=lambda path: True if path == "/path/to/keystore.jks" else orig_exists(path) ) self.mock_selinux_context = patch( "ansible.module_utils.basic.AnsibleModule.selinux_context", side_effect=lambda path: ["unconfined_u", "object_r", "user_home_t", "s0"], ) self.mock_is_special_selinux_path = patch( "ansible.module_utils.basic.AnsibleModule.is_special_selinux_path", side_effect=lambda path: (False, None) ) self.run_command = self.mock_run_command.start() self.get_bin_path = self.mock_get_bin_path.start() self.preserved_copy = self.mock_preserved_copy.start() self.atomic_move = self.mock_atomic_move.start() self.create_file = self.mock_create_file.start() self.create_path = self.mock_create_path.start() self.current_type = self.mock_current_type.start() self.selinux_context = self.mock_selinux_context.start() self.is_special_selinux_path = self.mock_is_special_selinux_path.start() self.os_path_exists = self.mock_os_path_exists.start() def tearDown(self): """Teardown.""" super().tearDown() self.mock_create_file.stop() self.mock_create_path.stop() self.mock_current_type.stop() self.mock_run_command.stop() self.mock_get_bin_path.stop() self.mock_preserved_copy.stop() self.mock_atomic_move.stop() self.mock_selinux_context.stop() self.mock_is_special_selinux_path.stop() self.mock_os_path_exists.stop() def test_create_jks_success(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="test", password="changeit", ) ): module = create_module() with patch("os.remove", return_value=True): self.create_path.side_effect = ["/tmp/tmpgrzm2ah7"] self.create_file.side_effect = ["/tmp/etacifitrec", "/tmp/yek_etavirp", ""] self.run_command.side_effect = [(0, "", ""), (0, "", "")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] jks = JavaKeystore(module) assert jks.create() == { "changed": True, "cmd": [ "keytool", "-importkeystore", "-destkeystore", "/path/to/keystore.jks", "-srckeystore", "/tmp/tmpgrzm2ah7", "-srcstoretype", "pkcs12", "-alias", "test", "-noprompt", ], "msg": "", "rc": 0, } def test_create_jks_keypass_fail_export_pkcs12(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", private_key_passphrase="passphrase-foo", dest="/path/to/keystore.jks", name="test", password="changeit", ) ): module = create_module() module.exit_json = Mock() module.fail_json = Mock() with patch("os.remove", return_value=True): self.create_path.side_effect = ["/tmp/tmp1cyp12xa"] self.create_file.side_effect = ["/tmp/tmpvalcrt32", "/tmp/tmpwh4key0c", ""] self.run_command.side_effect = [(1, "", "Oops"), (0, "", "")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] jks = JavaKeystore(module) jks.create() module.fail_json.assert_called_once_with( cmd=[ "openssl", "pkcs12", "-export", "-name", "test", "-in", "/tmp/tmpvalcrt32", "-inkey", "/tmp/tmpwh4key0c", "-out", "/tmp/tmp1cyp12xa", "-passout", "stdin", "-passin", "stdin", ], msg="", err="Oops", rc=1, ) def test_create_jks_fail_export_pkcs12(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="test", password="changeit", ) ): module = create_module() module.exit_json = Mock() module.fail_json = Mock() with patch("os.remove", return_value=True): self.create_path.side_effect = ["/tmp/tmp1cyp12xa"] self.create_file.side_effect = ["/tmp/tmpvalcrt32", "/tmp/tmpwh4key0c", ""] self.run_command.side_effect = [(1, "", "Oops"), (0, "", "")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] jks = JavaKeystore(module) jks.create() module.fail_json.assert_called_once_with( cmd=[ "openssl", "pkcs12", "-export", "-name", "test", "-in", "/tmp/tmpvalcrt32", "-inkey", "/tmp/tmpwh4key0c", "-out", "/tmp/tmp1cyp12xa", "-passout", "stdin", ], msg="", err="Oops", rc=1, ) def test_create_jks_fail_import_key(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="test", password="changeit", ) ): module = create_module() module.exit_json = Mock() module.fail_json = Mock() with patch("os.remove", return_value=True): self.create_path.side_effect = ["/tmp/tmpgrzm2ah7"] self.create_file.side_effect = ["/tmp/etacifitrec", "/tmp/yek_etavirp", ""] self.run_command.side_effect = [(0, "", ""), (1, "", "Oops")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] jks = JavaKeystore(module) jks.create() module.fail_json.assert_called_once_with( cmd=[ "keytool", "-importkeystore", "-destkeystore", "/path/to/keystore.jks", "-srckeystore", "/tmp/tmpgrzm2ah7", "-srcstoretype", "pkcs12", "-alias", "test", "-noprompt", ], msg="", err="Oops", rc=1, ) class TestCertChanged(ModuleTestCase): """Test if the cert has changed.""" def setUp(self): """Setup.""" super().setUp() self.mock_create_file = patch("ansible_collections.community.general.plugins.modules.java_keystore.create_file") self.mock_current_type = patch( "ansible_collections.community.general.plugins.modules.java_keystore.JavaKeystore.current_type" ) self.mock_run_command = patch("ansible.module_utils.basic.AnsibleModule.run_command") self.mock_get_bin_path = patch("ansible.module_utils.basic.AnsibleModule.get_bin_path") self.mock_preserved_copy = patch("ansible.module_utils.basic.AnsibleModule.preserved_copy") self.mock_atomic_move = patch("ansible.module_utils.basic.AnsibleModule.atomic_move") self.run_command = self.mock_run_command.start() self.create_file = self.mock_create_file.start() self.get_bin_path = self.mock_get_bin_path.start() self.current_type = self.mock_current_type.start() self.preserved_copy = self.mock_preserved_copy.start() self.atomic_move = self.mock_atomic_move.start() def tearDown(self): """Teardown.""" super().tearDown() self.mock_create_file.stop() self.mock_current_type.stop() self.mock_run_command.stop() self.mock_get_bin_path.stop() self.mock_preserved_copy.stop() self.mock_atomic_move.stop() def test_cert_unchanged_same_fingerprint(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="foo", password="changeit", ) ): module = create_module() with patch("os.remove", return_value=True): self.create_file.side_effect = ["/tmp/placeholder", ""] self.run_command.side_effect = [(0, "foo=abcd:1234:efgh", ""), (0, "SHA256: abcd:1234:efgh", "")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] self.current_type.side_effect = ["jks"] jks = JavaKeystore(module) result = jks.cert_changed() self.assertFalse(result, "Fingerprint is identical") def test_cert_changed_fingerprint_mismatch(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="foo", password="changeit", ) ): module = create_module() with patch("os.remove", return_value=True): self.create_file.side_effect = ["/tmp/placeholder", ""] self.run_command.side_effect = [(0, "foo=abcd:1234:efgh", ""), (0, "SHA256: wxyz:9876:stuv", "")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] self.current_type.side_effect = ["jks"] jks = JavaKeystore(module) result = jks.cert_changed() self.assertTrue(result, "Fingerprint mismatch") def test_cert_changed_alias_does_not_exist(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="foo", password="changeit", ) ): module = create_module() with patch("os.remove", return_value=True): self.create_file.side_effect = ["/tmp/placeholder", ""] self.run_command.side_effect = [ (0, "foo=abcd:1234:efgh", ""), (1, "keytool error: java.lang.Exception: Alias does not exist", ""), ] self.get_bin_path.side_effect = ["keytool", "openssl", ""] jks = JavaKeystore(module) result = jks.cert_changed() self.assertTrue(result, "Alias mismatch detected") def test_cert_changed_password_mismatch(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="foo", password="changeit", ) ): module = create_module() with patch("os.remove", return_value=True): self.create_file.side_effect = ["/tmp/placeholder", ""] self.run_command.side_effect = [ (0, "foo=abcd:1234:efgh", ""), (1, "keytool error: java.io.IOException: Keystore password was incorrect", ""), ] self.get_bin_path.side_effect = ["keytool", "openssl", ""] jks = JavaKeystore(module) result = jks.cert_changed() self.assertTrue(result, "Password mismatch detected") def test_cert_changed_fail_read_cert(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="foo", password="changeit", ) ): module = create_module() module.exit_json = Mock() module.fail_json = Mock() with patch("os.remove", return_value=True): self.create_file.side_effect = ["/tmp/tmpdj6bvvme", ""] self.run_command.side_effect = [(1, "", "Oops"), (0, "SHA256: wxyz:9876:stuv", "")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] self.current_type.side_effect = ["jks"] jks = JavaKeystore(module) jks.cert_changed() module.fail_json.assert_called_once_with( cmd=["openssl", "x509", "-noout", "-in", "/tmp/tmpdj6bvvme", "-fingerprint", "-sha256"], msg="", err="Oops", rc=1, ) def test_cert_changed_fail_read_keystore(self): with set_module_args( dict( certificate="cert-foo", private_key="private-foo", dest="/path/to/keystore.jks", name="foo", password="changeit", ) ): module = create_module() module.exit_json = Mock() module.fail_json = Mock(return_value=True) with patch("os.remove", return_value=True): self.create_file.side_effect = ["/tmp/placeholder", ""] self.run_command.side_effect = [(0, "foo: wxyz:9876:stuv", ""), (1, "", "Oops")] self.get_bin_path.side_effect = ["keytool", "openssl", ""] jks = JavaKeystore(module) jks.cert_changed() module.fail_json.assert_called_with( cmd=["keytool", "-list", "-alias", "foo", "-keystore", "/path/to/keystore.jks", "-v"], msg="", err="Oops", rc=1, )