1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00

add sssd_info module (#11120)

* add sssd_info module

* fix f-stings and remove support python2

* fix imports custom lib

* fix whitespace and add missing_required_lib

* fix str and add version

* try add mock test

* fix module and mock tests check

* fix required in main module

* fix spaces

* fix linters

* add final newline

* fix version of module

* fix description and error handling

* swap literal to dict

* fix str

* remove comment in methods

* remove _get in methods

* fix name method in test

* add botmeta

* fix description of server_type

* fix name of maintainer

* remove choices

* fix author

* fix type hint

* fix result

* fix spaces

* fix choices and empty returns

* fix mypy test result

* fix result

* run andebox yaml-doc

* remake simple try/exc for result

* fix tests

* add any type for testing mypy

* ruff formated

* fix docs

* remove unittest.main

* rename acc on git for official name

---------

Co-authored-by: Александр Габидуллин <agabidullin@astralinux.ru>
This commit is contained in:
Aleksandr Gabidullin 2025-12-22 18:55:28 +04:00 committed by GitHub
parent 02b185932c
commit 61b559c4fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 444 additions and 0 deletions

2
.github/BOTMETA.yml vendored
View file

@ -1502,6 +1502,8 @@ files:
maintainers: vbotka maintainers: vbotka
$tests/fqdn_valid.py: $tests/fqdn_valid.py:
maintainers: vbotka maintainers: vbotka
$modules/sssd_info.py:
maintainers: a-gabidullin
######################### #########################
docs/docsite/rst/filter_guide.rst: {} docs/docsite/rst/filter_guide.rst: {}
docs/docsite/rst/filter_guide_abstract_informations.rst: {} docs/docsite/rst/filter_guide_abstract_informations.rst: {}

View file

@ -0,0 +1,239 @@
#!/usr/bin/python
# Copyright (c) 2025 Aleksandr Gabidullin <qualittv@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
DOCUMENTATION = r"""
module: sssd_info
version_added: 12.2.0
short_description: Check SSSD domain status using D-Bus
description:
- Check the online status of SSSD domains, list domains, and retrieve active servers using D-Bus.
author: "Aleksandr Gabidullin (@a-gabidullin)"
requirements:
- dbus
attributes:
check_mode:
support: full
diff_mode:
support: none
platform:
platforms: posix
description: This action requires a system with D-Bus and SSSD running.
support: full
options:
action:
description:
- The action to perform.
type: str
required: true
choices:
domain_status: Check if domain is online.
domain_list: List all configured domains.
active_servers: Get active servers for domain.
list_servers: List all servers for domain.
domain:
description:
- Domain name to check.
- Required unless O(action=domain_list).
- When O(action=domain_list), this parameter is ignored and the module returns a list of all configured domains.
type: str
server_type:
description:
- Required parameter when O(action=active_servers) and O(action=list_servers).
- Optional and ignored for all other actions.
- At this point, the module supports ONLY the types C(IPA) for FreeIPA servers and C(AD).
type: str
choices: ['IPA', 'AD']
extends_documentation_fragment:
- community.general.attributes
"""
EXAMPLES = r"""
- name: Check SSSD domain status
community.general.sssd_info:
action: domain_status
domain: example.com
register: sssd_status_result
- name: Get domain list
community.general.sssd_info:
action: domain_list
register: domain_list_result
- name: Get active IPA servers for a domain
community.general.sssd_info:
action: active_servers
domain: example.com
server_type: IPA
register: active_servers_result
- name: List servers for a domain
community.general.sssd_info:
action: list_servers
domain: example.com
server_type: AD
register: list_servers_result
"""
RETURN = r"""
online:
description: The online status of the SSSD domain.
type: str
returned: when O(action=domain_status)
sample: online
domain_list:
description: List of SSSD domains.
type: list
elements: str
returned: when O(action=domain_list)
sample: ["ipa.domain", "winad.test"]
servers:
description: Active servers for the specified domain and type.
type: dict
returned: when O(action=active_servers)
sample: {"Global Catalog": "server1.winad.test", "Domain Server": "server2.winad.test"}
list_servers:
description: List of servers for the specified domain.
type: list
elements: str
returned: when O(action=list_servers)
sample: ["server1.winad.test", "server2.winad.test"]
"""
from ansible.module_utils.basic import AnsibleModule
from typing import Any
from ansible_collections.community.general.plugins.module_utils import deps
with deps.declare("dbus"):
import dbus
class SSSDHandler:
"""SSSD D-Bus handler"""
BUS_NAME = "org.freedesktop.sssd.infopipe"
DOMAIN_INTERFACE = "org.freedesktop.sssd.infopipe.Domains.Domain"
INFOPIPE_INTERFACE = "org.freedesktop.sssd.infopipe"
def __init__(self) -> None:
"""Initialize SSSD D-Bus connection."""
self.bus = dbus.SystemBus()
self.sssd_obj = self.bus.get_object(self.BUS_NAME, "/org/freedesktop/sssd/infopipe")
self.infopipe_iface = dbus.Interface(self.sssd_obj, dbus_interface=self.INFOPIPE_INTERFACE)
def domain_path(self, domain: str) -> str:
return f"/org/freedesktop/sssd/infopipe/Domains/{domain.replace('.', '_2e')}"
def domain_object(self, domain: str) -> dbus.proxies.ProxyObject:
domain_path = self.domain_path(domain)
try:
return self.bus.get_object(self.BUS_NAME, domain_path)
except dbus.exceptions.DBusException as e:
raise Exception(f"Domain not found: {domain}. Error: {e}") from e
def check_domain_status(self, domain: str) -> str:
domain_obj = self.domain_object(domain)
iface = dbus.Interface(domain_obj, dbus_interface=self.DOMAIN_INTERFACE)
return "online" if iface.IsOnline() else "offline"
def active_servers(self, domain: str, server_type: str) -> dict[str, str]:
"""Get active servers for domain.
Args:
domain: Domain name to get servers for.
server_type: Type of servers ('IPA' or 'AD').
Returns:
Dictionary with server types as keys and server names as values.
"""
domain_obj = self.domain_object(domain)
iface = dbus.Interface(domain_obj, dbus_interface=self.DOMAIN_INTERFACE)
if server_type == "IPA":
server_name = f"{server_type} Server"
return {server_name: iface.ActiveServer(server_type)}
else:
return {
"AD Global Catalog": iface.ActiveServer(f"sd_gc_{domain}"),
"AD Domain Controller": iface.ActiveServer(f"sd_{domain}"),
}
def list_servers(self, domain: str, server_type: str) -> list[str]:
"""List all servers for domain.
Args:
domain: Domain name to list servers for.
server_type: Type of servers ('IPA' or 'AD').
Returns:
List of server names.
"""
domain_obj = self.domain_object(domain)
iface = dbus.Interface(domain_obj, dbus_interface=self.DOMAIN_INTERFACE)
if server_type == "IPA":
return iface.ListServers(server_type)
else:
return iface.ListServers(f"sd_{domain}")
def domain_list(self) -> list[str]:
"""Get list of all domains.
Returns:
List of domain names.
"""
response = self.infopipe_iface.ListDomains()
return [domain.rsplit("/", maxsplit=1)[-1].replace("_2e", ".") for domain in response]
def main() -> None:
"""Main function for the Ansible module."""
module = AnsibleModule(
argument_spec=dict(
action=dict(
type="str",
required=True,
choices=["domain_status", "domain_list", "active_servers", "list_servers"],
),
domain=dict(type="str"),
server_type=dict(type="str", choices=["IPA", "AD"]),
),
supports_check_mode=True,
required_if=[
("action", "domain_status", ["domain"]),
("action", "list_servers", ["domain", "server_type"]),
("action", "active_servers", ["domain", "server_type"]),
],
)
deps.validate(module)
action = module.params["action"]
domain = module.params.get("domain")
server_type = module.params.get("server_type")
sssd = SSSDHandler()
result: dict[str, Any] = {}
try:
if action == "domain_status":
result["online"] = sssd.check_domain_status(domain)
elif action == "domain_list":
result["domain_list"] = sssd.domain_list()
elif action == "active_servers":
result["servers"] = sssd.active_servers(domain, server_type)
elif action == "list_servers":
result["list_servers"] = sssd.list_servers(domain, server_type)
except Exception as e:
module.fail_json(msg=f"Error: {e}")
module.exit_json(**result)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,203 @@
# Copyright (c) 2025 Aleksandr Gabidullin <qualittv@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
import sys
import unittest
from unittest.mock import Mock, patch
class TestSssdInfo(unittest.TestCase):
"""Unit tests for the sssd_info module."""
@classmethod
def setUpClass(cls):
"""Mock dbus module before importing the module."""
# Mock the entire dbus module
cls.mock_dbus = Mock()
cls.mock_dbus.SystemBus = Mock()
cls.mock_dbus.Interface = Mock()
# Create mock exceptions
class MockDBusException(Exception):
def __init__(self, *args, **kwargs):
super().__init__(*args)
self._dbus_error_name = kwargs.get("dbus_error_name", "org.freedesktop.DBus.Error.UnknownObject")
def get_dbus_name(self):
return self._dbus_error_name
cls.mock_dbus.exceptions = Mock()
cls.mock_dbus.exceptions.DBusException = MockDBusException
# Mock the dbus module in sys.modules
sys.modules["dbus"] = cls.mock_dbus
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
# Remove the mocked dbus module
if "dbus" in sys.modules and sys.modules["dbus"] == cls.mock_dbus:
del sys.modules["dbus"]
def setUp(self):
"""Set up test fixtures."""
# Ensure the mocked dbus is in sys.modules
sys.modules["dbus"] = self.mock_dbus
# Reset all mocks
self.mock_dbus.reset_mock()
self.mock_dbus.SystemBus.reset_mock()
self.mock_dbus.Interface.reset_mock()
# Create fresh mocks for each test
self.mock_bus = Mock()
self.mock_sssd_obj = Mock()
self.mock_infopipe_iface = Mock()
self.mock_domain_obj = Mock()
self.mock_domain_iface = Mock()
# Configure the mock chain
self.mock_dbus.SystemBus.return_value = self.mock_bus
self.mock_bus.get_object.return_value = self.mock_sssd_obj
# Fix the Interface mock to accept dbus_interface parameter
def interface_side_effect(obj, dbus_interface=None):
if dbus_interface == "org.freedesktop.sssd.infopipe":
return self.mock_infopipe_iface
elif dbus_interface == "org.freedesktop.sssd.infopipe.Domains.Domain":
return self.mock_domain_iface
return Mock()
self.mock_dbus.Interface.side_effect = interface_side_effect
def tearDown(self):
"""Clean up after test."""
# Restore the mocked dbus module in sys.modules
sys.modules["dbus"] = self.mock_dbus
def test_domain_list_success(self):
"""Test successful retrieval of domain list."""
# Mock the ListDomains response
self.mock_infopipe_iface.ListDomains.return_value = [
"/org/freedesktop/sssd/infopipe/Domains/ipa_2eexample_2ecom",
"/org/freedesktop/sssd/infopipe/Domains/ad_2eexample_2ecom",
]
# Import the module (dbus is already mocked in sys.modules)
from ansible_collections.community.general.plugins.modules import sssd_info
# Mock AnsibleModule
with patch.object(sssd_info, "AnsibleModule") as mock_module_class:
mock_module = Mock()
mock_module.params = {"action": "domain_list"}
mock_module.fail_json = Mock(side_effect=Exception("fail_json called"))
mock_module.exit_json = Mock()
mock_module_class.return_value = mock_module
# Run the module
sssd_info.main()
# Verify exit_json was called with correct results
mock_module.exit_json.assert_called_once()
result = (
mock_module.exit_json.call_args[0][0]
if mock_module.exit_json.call_args[0]
else mock_module.exit_json.call_args[1]
)
self.assertIn("domain_list", result)
self.assertEqual(result["domain_list"], ["ipa.example.com", "ad.example.com"])
def test_domain_status_online(self):
"""Test checking online domain status."""
# Mock domain status as online
self.mock_domain_iface.IsOnline.return_value = True
# Setup mock chain for domain object
self.mock_bus.get_object.return_value = self.mock_domain_obj
# Import the module
from ansible_collections.community.general.plugins.modules import sssd_info
# Mock AnsibleModule
with patch.object(sssd_info, "AnsibleModule") as mock_module_class:
mock_module = Mock()
mock_module.params = {"action": "domain_status", "domain": "example.com"}
mock_module.fail_json = Mock(side_effect=Exception("fail_json called"))
mock_module.exit_json = Mock()
mock_module_class.return_value = mock_module
# Run the module
sssd_info.main()
# Verify exit_json was called with correct results
mock_module.exit_json.assert_called_once()
result = (
mock_module.exit_json.call_args[0][0]
if mock_module.exit_json.call_args[0]
else mock_module.exit_json.call_args[1]
)
self.assertIn("online", result)
self.assertEqual(result["online"], "online")
def test_domain_status_offline(self):
"""Test checking offline domain status."""
# Mock domain status as offline
self.mock_domain_iface.IsOnline.return_value = False
# Setup mock chain for domain object
self.mock_bus.get_object.return_value = self.mock_domain_obj
# Import the module
from ansible_collections.community.general.plugins.modules import sssd_info
# Mock AnsibleModule
with patch.object(sssd_info, "AnsibleModule") as mock_module_class:
mock_module = Mock()
mock_module.params = {"action": "domain_status", "domain": "example.com"}
mock_module.fail_json = Mock(side_effect=Exception("fail_json called"))
mock_module.exit_json = Mock()
mock_module_class.return_value = mock_module
# Run the module
sssd_info.main()
# Verify exit_json was called with correct results
mock_module.exit_json.assert_called_once()
result = (
mock_module.exit_json.call_args[0][0]
if mock_module.exit_json.call_args[0]
else mock_module.exit_json.call_args[1]
)
self.assertIn("online", result)
self.assertEqual(result["online"], "offline")
def test_domain_not_found(self):
"""Test error when domain is not found."""
# Mock DBusException for domain not found
from ansible_collections.community.general.plugins.modules import sssd_info
# Mock AnsibleModule
with patch.object(sssd_info, "AnsibleModule") as mock_module_class:
mock_module = Mock()
mock_module.params = {"action": "domain_status", "domain": "nonexistent.com"}
mock_module.fail_json = Mock()
mock_module.exit_json = Mock()
mock_module_class.return_value = mock_module
# Mock the exception in domain_object
with patch.object(sssd_info.SSSDHandler, "domain_object") as mock_get_domain:
mock_get_domain.side_effect = Exception("Domain not found: nonexistent.com. Error: Domain not found")
# Run the module
sssd_info.main()
# Verify fail_json was called with error message
mock_module.fail_json.assert_called_once()
error_msg = mock_module.fail_json.call_args[1].get("msg", "")
self.assertIn("Domain not found: nonexistent.com", error_msg)