From 61b559c4fd77e0525c201370835d32fb3dc1daea Mon Sep 17 00:00:00 2001 From: Aleksandr Gabidullin <101321307+a-gabidullin@users.noreply.github.com> Date: Mon, 22 Dec 2025 18:55:28 +0400 Subject: [PATCH] add sssd_info module (#11120) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add sssd_info module * fix f-stings and remove support python2 * fix imports custom lib * fix whitespace and add missing_required_lib * fix str and add version * try add mock test * fix module and mock tests check * fix required in main module * fix spaces * fix linters * add final newline * fix version of module * fix description and error handling * swap literal to dict * fix str * remove comment in methods * remove _get in methods * fix name method in test * add botmeta * fix description of server_type * fix name of maintainer * remove choices * fix author * fix type hint * fix result * fix spaces * fix choices and empty returns * fix mypy test result * fix result * run andebox yaml-doc * remake simple try/exc for result * fix tests * add any type for testing mypy * ruff formated * fix docs * remove unittest.main * rename acc on git for official name --------- Co-authored-by: Александр Габидуллин --- .github/BOTMETA.yml | 2 + plugins/modules/sssd_info.py | 239 +++++++++++++++++++ tests/unit/plugins/modules/test_sssd_info.py | 203 ++++++++++++++++ 3 files changed, 444 insertions(+) create mode 100644 plugins/modules/sssd_info.py create mode 100644 tests/unit/plugins/modules/test_sssd_info.py diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index c50e006d7b..58c82c2859 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -1502,6 +1502,8 @@ files: maintainers: vbotka $tests/fqdn_valid.py: maintainers: vbotka + $modules/sssd_info.py: + maintainers: a-gabidullin ######################### docs/docsite/rst/filter_guide.rst: {} docs/docsite/rst/filter_guide_abstract_informations.rst: {} diff --git a/plugins/modules/sssd_info.py b/plugins/modules/sssd_info.py new file mode 100644 index 0000000000..11c415d5b5 --- /dev/null +++ b/plugins/modules/sssd_info.py @@ -0,0 +1,239 @@ +#!/usr/bin/python +# Copyright (c) 2025 Aleksandr Gabidullin +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + + +DOCUMENTATION = r""" +module: sssd_info +version_added: 12.2.0 +short_description: Check SSSD domain status using D-Bus +description: + - Check the online status of SSSD domains, list domains, and retrieve active servers using D-Bus. +author: "Aleksandr Gabidullin (@a-gabidullin)" +requirements: + - dbus +attributes: + check_mode: + support: full + diff_mode: + support: none + platform: + platforms: posix + description: This action requires a system with D-Bus and SSSD running. + support: full +options: + action: + description: + - The action to perform. + type: str + required: true + choices: + domain_status: Check if domain is online. + domain_list: List all configured domains. + active_servers: Get active servers for domain. + list_servers: List all servers for domain. + domain: + description: + - Domain name to check. + - Required unless O(action=domain_list). + - When O(action=domain_list), this parameter is ignored and the module returns a list of all configured domains. + type: str + server_type: + description: + - Required parameter when O(action=active_servers) and O(action=list_servers). + - Optional and ignored for all other actions. + - At this point, the module supports ONLY the types C(IPA) for FreeIPA servers and C(AD). + type: str + choices: ['IPA', 'AD'] +extends_documentation_fragment: + - community.general.attributes +""" + +EXAMPLES = r""" +- name: Check SSSD domain status + community.general.sssd_info: + action: domain_status + domain: example.com + register: sssd_status_result + +- name: Get domain list + community.general.sssd_info: + action: domain_list + register: domain_list_result + +- name: Get active IPA servers for a domain + community.general.sssd_info: + action: active_servers + domain: example.com + server_type: IPA + register: active_servers_result + +- name: List servers for a domain + community.general.sssd_info: + action: list_servers + domain: example.com + server_type: AD + register: list_servers_result +""" + +RETURN = r""" +online: + description: The online status of the SSSD domain. + type: str + returned: when O(action=domain_status) + sample: online +domain_list: + description: List of SSSD domains. + type: list + elements: str + returned: when O(action=domain_list) + sample: ["ipa.domain", "winad.test"] +servers: + description: Active servers for the specified domain and type. + type: dict + returned: when O(action=active_servers) + sample: {"Global Catalog": "server1.winad.test", "Domain Server": "server2.winad.test"} +list_servers: + description: List of servers for the specified domain. + type: list + elements: str + returned: when O(action=list_servers) + sample: ["server1.winad.test", "server2.winad.test"] +""" + + +from ansible.module_utils.basic import AnsibleModule +from typing import Any +from ansible_collections.community.general.plugins.module_utils import deps + +with deps.declare("dbus"): + import dbus + + +class SSSDHandler: + """SSSD D-Bus handler""" + + BUS_NAME = "org.freedesktop.sssd.infopipe" + DOMAIN_INTERFACE = "org.freedesktop.sssd.infopipe.Domains.Domain" + INFOPIPE_INTERFACE = "org.freedesktop.sssd.infopipe" + + def __init__(self) -> None: + """Initialize SSSD D-Bus connection.""" + self.bus = dbus.SystemBus() + self.sssd_obj = self.bus.get_object(self.BUS_NAME, "/org/freedesktop/sssd/infopipe") + self.infopipe_iface = dbus.Interface(self.sssd_obj, dbus_interface=self.INFOPIPE_INTERFACE) + + def domain_path(self, domain: str) -> str: + return f"/org/freedesktop/sssd/infopipe/Domains/{domain.replace('.', '_2e')}" + + def domain_object(self, domain: str) -> dbus.proxies.ProxyObject: + domain_path = self.domain_path(domain) + try: + return self.bus.get_object(self.BUS_NAME, domain_path) + except dbus.exceptions.DBusException as e: + raise Exception(f"Domain not found: {domain}. Error: {e}") from e + + def check_domain_status(self, domain: str) -> str: + domain_obj = self.domain_object(domain) + iface = dbus.Interface(domain_obj, dbus_interface=self.DOMAIN_INTERFACE) + return "online" if iface.IsOnline() else "offline" + + def active_servers(self, domain: str, server_type: str) -> dict[str, str]: + """Get active servers for domain. + + Args: + domain: Domain name to get servers for. + server_type: Type of servers ('IPA' or 'AD'). + + Returns: + Dictionary with server types as keys and server names as values. + """ + domain_obj = self.domain_object(domain) + iface = dbus.Interface(domain_obj, dbus_interface=self.DOMAIN_INTERFACE) + + if server_type == "IPA": + server_name = f"{server_type} Server" + return {server_name: iface.ActiveServer(server_type)} + else: + return { + "AD Global Catalog": iface.ActiveServer(f"sd_gc_{domain}"), + "AD Domain Controller": iface.ActiveServer(f"sd_{domain}"), + } + + def list_servers(self, domain: str, server_type: str) -> list[str]: + """List all servers for domain. + + Args: + domain: Domain name to list servers for. + server_type: Type of servers ('IPA' or 'AD'). + + Returns: + List of server names. + """ + domain_obj = self.domain_object(domain) + iface = dbus.Interface(domain_obj, dbus_interface=self.DOMAIN_INTERFACE) + if server_type == "IPA": + return iface.ListServers(server_type) + else: + return iface.ListServers(f"sd_{domain}") + + def domain_list(self) -> list[str]: + """Get list of all domains. + + Returns: + List of domain names. + """ + response = self.infopipe_iface.ListDomains() + return [domain.rsplit("/", maxsplit=1)[-1].replace("_2e", ".") for domain in response] + + +def main() -> None: + """Main function for the Ansible module.""" + module = AnsibleModule( + argument_spec=dict( + action=dict( + type="str", + required=True, + choices=["domain_status", "domain_list", "active_servers", "list_servers"], + ), + domain=dict(type="str"), + server_type=dict(type="str", choices=["IPA", "AD"]), + ), + supports_check_mode=True, + required_if=[ + ("action", "domain_status", ["domain"]), + ("action", "list_servers", ["domain", "server_type"]), + ("action", "active_servers", ["domain", "server_type"]), + ], + ) + + deps.validate(module) + + action = module.params["action"] + domain = module.params.get("domain") + server_type = module.params.get("server_type") + + sssd = SSSDHandler() + result: dict[str, Any] = {} + + try: + if action == "domain_status": + result["online"] = sssd.check_domain_status(domain) + elif action == "domain_list": + result["domain_list"] = sssd.domain_list() + elif action == "active_servers": + result["servers"] = sssd.active_servers(domain, server_type) + elif action == "list_servers": + result["list_servers"] = sssd.list_servers(domain, server_type) + + except Exception as e: + module.fail_json(msg=f"Error: {e}") + + module.exit_json(**result) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/plugins/modules/test_sssd_info.py b/tests/unit/plugins/modules/test_sssd_info.py new file mode 100644 index 0000000000..0776bc7bc5 --- /dev/null +++ b/tests/unit/plugins/modules/test_sssd_info.py @@ -0,0 +1,203 @@ +# Copyright (c) 2025 Aleksandr Gabidullin +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import sys +import unittest +from unittest.mock import Mock, patch + + +class TestSssdInfo(unittest.TestCase): + """Unit tests for the sssd_info module.""" + + @classmethod + def setUpClass(cls): + """Mock dbus module before importing the module.""" + # Mock the entire dbus module + cls.mock_dbus = Mock() + cls.mock_dbus.SystemBus = Mock() + cls.mock_dbus.Interface = Mock() + + # Create mock exceptions + class MockDBusException(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args) + self._dbus_error_name = kwargs.get("dbus_error_name", "org.freedesktop.DBus.Error.UnknownObject") + + def get_dbus_name(self): + return self._dbus_error_name + + cls.mock_dbus.exceptions = Mock() + cls.mock_dbus.exceptions.DBusException = MockDBusException + + # Mock the dbus module in sys.modules + sys.modules["dbus"] = cls.mock_dbus + + @classmethod + def tearDownClass(cls): + """Clean up after all tests.""" + # Remove the mocked dbus module + if "dbus" in sys.modules and sys.modules["dbus"] == cls.mock_dbus: + del sys.modules["dbus"] + + def setUp(self): + """Set up test fixtures.""" + # Ensure the mocked dbus is in sys.modules + sys.modules["dbus"] = self.mock_dbus + + # Reset all mocks + self.mock_dbus.reset_mock() + self.mock_dbus.SystemBus.reset_mock() + self.mock_dbus.Interface.reset_mock() + + # Create fresh mocks for each test + self.mock_bus = Mock() + self.mock_sssd_obj = Mock() + self.mock_infopipe_iface = Mock() + self.mock_domain_obj = Mock() + self.mock_domain_iface = Mock() + + # Configure the mock chain + self.mock_dbus.SystemBus.return_value = self.mock_bus + self.mock_bus.get_object.return_value = self.mock_sssd_obj + + # Fix the Interface mock to accept dbus_interface parameter + def interface_side_effect(obj, dbus_interface=None): + if dbus_interface == "org.freedesktop.sssd.infopipe": + return self.mock_infopipe_iface + elif dbus_interface == "org.freedesktop.sssd.infopipe.Domains.Domain": + return self.mock_domain_iface + return Mock() + + self.mock_dbus.Interface.side_effect = interface_side_effect + + def tearDown(self): + """Clean up after test.""" + # Restore the mocked dbus module in sys.modules + sys.modules["dbus"] = self.mock_dbus + + def test_domain_list_success(self): + """Test successful retrieval of domain list.""" + # Mock the ListDomains response + self.mock_infopipe_iface.ListDomains.return_value = [ + "/org/freedesktop/sssd/infopipe/Domains/ipa_2eexample_2ecom", + "/org/freedesktop/sssd/infopipe/Domains/ad_2eexample_2ecom", + ] + + # Import the module (dbus is already mocked in sys.modules) + from ansible_collections.community.general.plugins.modules import sssd_info + + # Mock AnsibleModule + with patch.object(sssd_info, "AnsibleModule") as mock_module_class: + mock_module = Mock() + mock_module.params = {"action": "domain_list"} + mock_module.fail_json = Mock(side_effect=Exception("fail_json called")) + mock_module.exit_json = Mock() + mock_module_class.return_value = mock_module + + # Run the module + sssd_info.main() + + # Verify exit_json was called with correct results + mock_module.exit_json.assert_called_once() + result = ( + mock_module.exit_json.call_args[0][0] + if mock_module.exit_json.call_args[0] + else mock_module.exit_json.call_args[1] + ) + + self.assertIn("domain_list", result) + self.assertEqual(result["domain_list"], ["ipa.example.com", "ad.example.com"]) + + def test_domain_status_online(self): + """Test checking online domain status.""" + # Mock domain status as online + self.mock_domain_iface.IsOnline.return_value = True + + # Setup mock chain for domain object + self.mock_bus.get_object.return_value = self.mock_domain_obj + + # Import the module + from ansible_collections.community.general.plugins.modules import sssd_info + + # Mock AnsibleModule + with patch.object(sssd_info, "AnsibleModule") as mock_module_class: + mock_module = Mock() + mock_module.params = {"action": "domain_status", "domain": "example.com"} + mock_module.fail_json = Mock(side_effect=Exception("fail_json called")) + mock_module.exit_json = Mock() + mock_module_class.return_value = mock_module + + # Run the module + sssd_info.main() + + # Verify exit_json was called with correct results + mock_module.exit_json.assert_called_once() + result = ( + mock_module.exit_json.call_args[0][0] + if mock_module.exit_json.call_args[0] + else mock_module.exit_json.call_args[1] + ) + + self.assertIn("online", result) + self.assertEqual(result["online"], "online") + + def test_domain_status_offline(self): + """Test checking offline domain status.""" + # Mock domain status as offline + self.mock_domain_iface.IsOnline.return_value = False + + # Setup mock chain for domain object + self.mock_bus.get_object.return_value = self.mock_domain_obj + + # Import the module + from ansible_collections.community.general.plugins.modules import sssd_info + + # Mock AnsibleModule + with patch.object(sssd_info, "AnsibleModule") as mock_module_class: + mock_module = Mock() + mock_module.params = {"action": "domain_status", "domain": "example.com"} + mock_module.fail_json = Mock(side_effect=Exception("fail_json called")) + mock_module.exit_json = Mock() + mock_module_class.return_value = mock_module + + # Run the module + sssd_info.main() + + # Verify exit_json was called with correct results + mock_module.exit_json.assert_called_once() + result = ( + mock_module.exit_json.call_args[0][0] + if mock_module.exit_json.call_args[0] + else mock_module.exit_json.call_args[1] + ) + + self.assertIn("online", result) + self.assertEqual(result["online"], "offline") + + def test_domain_not_found(self): + """Test error when domain is not found.""" + # Mock DBusException for domain not found + from ansible_collections.community.general.plugins.modules import sssd_info + + # Mock AnsibleModule + with patch.object(sssd_info, "AnsibleModule") as mock_module_class: + mock_module = Mock() + mock_module.params = {"action": "domain_status", "domain": "nonexistent.com"} + mock_module.fail_json = Mock() + mock_module.exit_json = Mock() + mock_module_class.return_value = mock_module + + # Mock the exception in domain_object + with patch.object(sssd_info.SSSDHandler, "domain_object") as mock_get_domain: + mock_get_domain.side_effect = Exception("Domain not found: nonexistent.com. Error: Domain not found") + + # Run the module + sssd_info.main() + + # Verify fail_json was called with error message + mock_module.fail_json.assert_called_once() + error_msg = mock_module.fail_json.call_args[1].get("msg", "") + self.assertIn("Domain not found: nonexistent.com", error_msg)