From 9cdeb5a9b9b0372db170249dc34559889302c18f Mon Sep 17 00:00:00 2001 From: "patchback[bot]" <45432694+patchback[bot]@users.noreply.github.com> Date: Sun, 23 Nov 2025 08:36:34 +0100 Subject: [PATCH] [PR #11167/19757b3a backport][stable-12] Add type hints to action and test plugins and to plugin utils; fix some bugs, and improve input validation (#11191) Add type hints to action and test plugins and to plugin utils; fix some bugs, and improve input validation (#11167) * Add type hints to action and test plugins and to plugin utils. Also fix some bugs and add proper input validation. * Combine lines. * Extend changelog fragment. * Move task_vars initialization up. --------- (cherry picked from commit 19757b3a4c1ce017ac521e83a54553a2974eb42d) Co-authored-by: Felix Fontein Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com> --- changelogs/fragments/11167-typing.yml | 15 +++++ plugins/action/iptables_state.py | 16 ++++-- plugins/action/shutdown.py | 56 +++++++++--------- plugins/plugin_utils/ansible_type.py | 25 +++++---- plugins/plugin_utils/keys_filter.py | 81 ++++++++++++++------------- plugins/plugin_utils/unsafe.py | 47 +++++++++++++++- plugins/test/a_module.py | 10 +++- plugins/test/ansible_type.py | 12 ++-- plugins/test/fqdn_valid.py | 26 +++++++-- tests/sanity/ignore-2.17.txt | 1 + tests/sanity/ignore-2.18.txt | 1 + 11 files changed, 194 insertions(+), 96 deletions(-) create mode 100644 changelogs/fragments/11167-typing.yml diff --git a/changelogs/fragments/11167-typing.yml b/changelogs/fragments/11167-typing.yml new file mode 100644 index 0000000000..8b2cfeefb1 --- /dev/null +++ b/changelogs/fragments/11167-typing.yml @@ -0,0 +1,15 @@ +minor_changes: + - "iptables_state action plugin - add type hints (https://github.com/ansible-collections/community.general/pull/11167)." + - "shutdown action plugin - add type hints (https://github.com/ansible-collections/community.general/pull/11167)." + - "ansible_type plugin utils - add type hints (https://github.com/ansible-collections/community.general/pull/11167)." + - "keys_filter.py plugin utils - add type hints (https://github.com/ansible-collections/community.general/pull/11167)." + - "unsafe.py plugin utils - add type hints (https://github.com/ansible-collections/community.general/pull/11167)." + - "a_module test plugin - add proper parameter checking and type hints (https://github.com/ansible-collections/community.general/pull/11167)." + - "ansible_type test plugin - add type hints (https://github.com/ansible-collections/community.general/pull/11167)." + - "fqdn_valid test plugin - add proper parameter checking, and add type hints (https://github.com/ansible-collections/community.general/pull/11167)." +bugfixes: + - "keys_filter.py plugin utils - fixed requirements check so that other sequences than lists and strings are checked, + and corrected broken formatting during error reporting + (https://github.com/ansible-collections/community.general/pull/11167)." + - "ansible_type test plugin - fix parameter checking (https://github.com/ansible-collections/community.general/pull/11167)." + - "ansible_type plugin utils - avoid potential concatenation of non-strings when ``alias`` has non-string values (https://github.com/ansible-collections/community.general/pull/11167)." diff --git a/plugins/action/iptables_state.py b/plugins/action/iptables_state.py index 371a732fcc..e745e48c8f 100644 --- a/plugins/action/iptables_state.py +++ b/plugins/action/iptables_state.py @@ -5,6 +5,7 @@ from __future__ import annotations import time +import typing as t from ansible.plugins.action import ActionBase from ansible.errors import AnsibleActionFail, AnsibleConnectionFailure @@ -20,7 +21,7 @@ class ActionModule(ActionBase): DEFAULT_SUDOABLE = True @staticmethod - def msg_error__async_and_poll_not_zero(task_poll, task_async, max_timeout): + def msg_error__async_and_poll_not_zero(task_poll, task_async, max_timeout) -> str: return ( "This module doesn't support async>0 and poll>0 when its 'state' param " "is set to 'restored'. To enable its rollback feature (that needs the " @@ -30,7 +31,7 @@ class ActionModule(ActionBase): ) @staticmethod - def msg_warning__no_async_is_no_rollback(task_poll, task_async, max_timeout): + def msg_warning__no_async_is_no_rollback(task_poll, task_async, max_timeout) -> str: return ( "Attempts to restore iptables state without rollback in case of mistake " "may lead the ansible controller to loose access to the hosts and never " @@ -41,7 +42,7 @@ class ActionModule(ActionBase): ) @staticmethod - def msg_warning__async_greater_than_timeout(task_poll, task_async, max_timeout): + def msg_warning__async_greater_than_timeout(task_poll, task_async, max_timeout) -> str: return ( "You attempt to restore iptables state with rollback in case of mistake, " "but with settings that will lead this rollback to happen AFTER that the " @@ -50,7 +51,9 @@ class ActionModule(ActionBase): f"'ansible_timeout' (={max_timeout}) (recommended)." ) - def _async_result(self, async_status_args, task_vars, timeout): + def _async_result( + self, async_status_args: dict[str, t.Any], task_vars: dict[str, t.Any], timeout: int + ) -> dict[str, t.Any]: """ Retrieve results of the asynchronous task, and display them in place of the async wrapper results (those with the ansible_job_id key). @@ -81,10 +84,13 @@ class ActionModule(ActionBase): return async_result - def run(self, tmp=None, task_vars=None): + def run(self, tmp: str | None = None, task_vars: dict[str, t.Any] | None = None) -> dict[str, t.Any]: self._supports_check_mode = True self._supports_async = True + if task_vars is None: + task_vars = {} + result = super().run(tmp, task_vars) del tmp # tmp no longer has any effect diff --git a/plugins/action/shutdown.py b/plugins/action/shutdown.py index 1bf97a6fb4..46067e462c 100644 --- a/plugins/action/shutdown.py +++ b/plugins/action/shutdown.py @@ -7,19 +7,25 @@ from __future__ import annotations +import typing as t + from ansible.errors import AnsibleError, AnsibleConnectionFailure from ansible.module_utils.common.text.converters import to_native, to_text from ansible.module_utils.common.collections import is_string from ansible.plugins.action import ActionBase from ansible.utils.display import Display +if t.TYPE_CHECKING: + + class Distribution(t.TypedDict): + name: str + version: str + family: str + + display = Display() -def fmt(mapping, key): - return to_native(mapping[key]).strip() - - class TimedOutException(Exception): pass @@ -60,25 +66,23 @@ class ActionModule(ActionBase): def delay(self): return self._check_delay("delay", self.DEFAULT_PRE_SHUTDOWN_DELAY) - def _check_delay(self, key, default): + def _check_delay(self, key: str, default: int) -> int: """Ensure that the value is positive or zero""" value = int(self._task.args.get(key, default)) if value < 0: value = 0 return value - def _get_value_from_facts(self, variable_name, distribution, default_value): + @staticmethod + def _get_value_from_facts(data: dict[str, str], distribution: Distribution, default_value: str) -> str: """Get dist+version specific args first, then distribution, then family, lastly use default""" - attr = getattr(self, variable_name) - value = attr.get( + return data.get( distribution["name"] + distribution["version"], - attr.get(distribution["name"], attr.get(distribution["family"], getattr(self, default_value))), + data.get(distribution["name"], data.get(distribution["family"], default_value)), ) - return value - def get_distribution(self, task_vars): + def get_distribution(self, task_vars: dict[str, t.Any]) -> Distribution: # FIXME: only execute the module if we don't already have the facts we need - distribution = {} display.debug(f"{self._task.action}: running setup module to get distribution") module_output = self._execute_module( task_vars=task_vars, module_name="ansible.legacy.setup", module_args={"gather_subset": "min"} @@ -86,20 +90,20 @@ class ActionModule(ActionBase): try: if module_output.get("failed", False): raise AnsibleError( - f"Failed to determine system distribution. {fmt(module_output, 'module_stdout')}, {fmt(module_output, 'module_stderr')}" + f"Failed to determine system distribution. {to_native(module_output['module_stdout'])}, {to_native(module_output['module_stderr'])}" ) - distribution["name"] = module_output["ansible_facts"]["ansible_distribution"].lower() - distribution["version"] = to_text( - module_output["ansible_facts"]["ansible_distribution_version"].split(".")[0] - ) - distribution["family"] = to_text(module_output["ansible_facts"]["ansible_os_family"].lower()) + distribution: Distribution = { + "name": module_output["ansible_facts"]["ansible_distribution"].lower(), + "version": to_text(module_output["ansible_facts"]["ansible_distribution_version"].split(".")[0]), + "family": to_text(module_output["ansible_facts"]["ansible_os_family"].lower()), + } display.debug(f"{self._task.action}: distribution: {distribution}") return distribution except KeyError as ke: raise AnsibleError(f'Failed to get distribution information. Missing "{ke.args[0]}" in output.') from ke - def get_shutdown_command(self, task_vars, distribution): - def find_command(command, find_search_paths): + def get_shutdown_command(self, task_vars: dict[str, t.Any], distribution: Distribution) -> str: + def find_command(command: str, find_search_paths: list[str]) -> list[str]: display.debug( f'{self._task.action}: running find module looking in {find_search_paths} to get path for "{command}"' ) @@ -111,7 +115,7 @@ class ActionModule(ActionBase): ) return [x["path"] for x in find_result["files"]] - shutdown_bin = self._get_value_from_facts("SHUTDOWN_COMMANDS", distribution, "DEFAULT_SHUTDOWN_COMMAND") + shutdown_bin = self._get_value_from_facts(self.SHUTDOWN_COMMANDS, distribution, self.DEFAULT_SHUTDOWN_COMMAND) default_search_paths = ["/sbin", "/usr/sbin", "/usr/local/sbin"] search_paths = self._task.args.get("search_paths", default_search_paths) @@ -146,7 +150,7 @@ class ActionModule(ActionBase): return f"{full_path[0]} poweroff" # done, since we cannot use args with systemd shutdown # systemd case taken care of, here we add args to the command - args = self._get_value_from_facts("SHUTDOWN_COMMAND_ARGS", distribution, "DEFAULT_SHUTDOWN_COMMAND_ARGS") + args = self._get_value_from_facts(self.SHUTDOWN_COMMAND_ARGS, distribution, self.DEFAULT_SHUTDOWN_COMMAND_ARGS) # Convert seconds to minutes. If less that 60, set it to 0. delay_sec = self.delay shutdown_message = self._task.args.get("msg", self.DEFAULT_SHUTDOWN_MESSAGE) @@ -154,8 +158,8 @@ class ActionModule(ActionBase): af = args.format(delay_sec=delay_sec, delay_min=delay_sec // 60, message=shutdown_message) return f"{full_path[0]} {af}" - def perform_shutdown(self, task_vars, distribution): - result = {} + def perform_shutdown(self, task_vars, distribution) -> dict[str, t.Any]: + result: dict[str, t.Any] = {} shutdown_result = {} shutdown_command_exec = self.get_shutdown_command(task_vars, distribution) @@ -176,7 +180,7 @@ class ActionModule(ActionBase): result["failed"] = True result["shutdown"] = False result["msg"] = ( - f"Shutdown command failed. Error was {fmt(shutdown_result, 'stdout')}, {fmt(shutdown_result, 'stderr')}" + f"Shutdown command failed. Error was {to_native(shutdown_result['stdout'])}, {to_native(shutdown_result['stderr'])}" ) return result @@ -184,7 +188,7 @@ class ActionModule(ActionBase): result["shutdown_command"] = shutdown_command_exec return result - def run(self, tmp=None, task_vars=None): + def run(self, tmp: str | None = None, task_vars: dict[str, t.Any] | None = None) -> dict[str, t.Any]: self._supports_check_mode = True self._supports_async = True diff --git a/plugins/plugin_utils/ansible_type.py b/plugins/plugin_utils/ansible_type.py index d27adefe6c..f9683dbfe2 100644 --- a/plugins/plugin_utils/ansible_type.py +++ b/plugins/plugin_utils/ansible_type.py @@ -4,9 +4,11 @@ from __future__ import annotations -from ansible.errors import AnsibleFilterError +import typing as t from collections.abc import Mapping +from ansible.errors import AnsibleFilterError + try: # Introduced with Data Tagging (https://github.com/ansible/ansible/pull/84621): from ansible.module_utils.datatag import native_type_name as _native_type_name @@ -16,7 +18,7 @@ except ImportError: HAS_NATIVE_TYPE_NAME = False -def _atype(data, alias, *, use_native_type: bool = False): +def _atype(data: t.Any, alias: Mapping, *, use_native_type: bool = False) -> str: """ Returns the name of the type class. """ @@ -30,10 +32,10 @@ def _atype(data, alias, *, use_native_type: bool = False): data_type = "dict" elif data_type == "_AnsibleLazyTemplateList": data_type = "list" - return alias.get(data_type, data_type) + return str(alias.get(data_type, data_type)) -def _ansible_type(data, alias, *, use_native_type: bool = False): +def _ansible_type(data: t.Any, alias: t.Any, *, use_native_type: bool = False) -> str: """ Returns the Ansible data type. """ @@ -42,21 +44,20 @@ def _ansible_type(data, alias, *, use_native_type: bool = False): alias = {} if not isinstance(alias, Mapping): - msg = "The argument alias must be a dictionary. %s is %s" - raise AnsibleFilterError(msg % (alias, type(alias))) + raise AnsibleFilterError(f"The argument alias must be a dictionary. {alias!r} is {type(alias)}") data_type = _atype(data, alias, use_native_type=use_native_type) if data_type == "list" and len(data) > 0: - items = [_atype(i, alias, use_native_type=use_native_type) for i in data] - items_type = "|".join(sorted(set(items))) + items = {_atype(i, alias, use_native_type=use_native_type) for i in data} + items_type = "|".join(sorted(items)) return f"{data_type}[{items_type}]" if data_type == "dict" and len(data) > 0: - keys = [_atype(i, alias, use_native_type=use_native_type) for i in data.keys()] - vals = [_atype(i, alias, use_native_type=use_native_type) for i in data.values()] - keys_type = "|".join(sorted(set(keys))) - vals_type = "|".join(sorted(set(vals))) + keys = {_atype(i, alias, use_native_type=use_native_type) for i in data.keys()} + vals = {_atype(i, alias, use_native_type=use_native_type) for i in data.values()} + keys_type = "|".join(sorted(keys)) + vals_type = "|".join(sorted(vals)) return f"{data_type}[{keys_type}, {vals_type}]" return data_type diff --git a/plugins/plugin_utils/keys_filter.py b/plugins/plugin_utils/keys_filter.py index 01c4423cd7..2d49b32725 100644 --- a/plugins/plugin_utils/keys_filter.py +++ b/plugins/plugin_utils/keys_filter.py @@ -6,12 +6,17 @@ from __future__ import annotations import re +import typing as t -from ansible.errors import AnsibleFilterError from collections.abc import Mapping, Sequence +from ansible.errors import AnsibleFilterError +from ansible.module_utils.common.collections import is_sequence -def _keys_filter_params(data, matching_parameter): + +def _keys_filter_params( + data: t.Any, matching_parameter: t.Any +) -> tuple[Sequence[Mapping[str, t.Any]], t.Literal["equal", "starts_with", "ends_with", "regex"]]: """test parameters: * data must be a list of dictionaries. All keys must be strings. * matching_parameter is member of a list. @@ -21,27 +26,27 @@ def _keys_filter_params(data, matching_parameter): ml = ["equal", "starts_with", "ends_with", "regex"] if not isinstance(data, Sequence): - msg = "First argument must be a list. %s is %s" - raise AnsibleFilterError(msg % (data, type(data))) + msg = f"First argument must be a list. {data!r} is {type(data)}" + raise AnsibleFilterError(msg) for elem in data: if not isinstance(elem, Mapping): - msg = "The data items must be dictionaries. %s is %s" - raise AnsibleFilterError(msg % (elem, type(elem))) + msg = f"The data items must be dictionaries. {elem} is {type(elem)}" + raise AnsibleFilterError(msg) for elem in data: if not all(isinstance(item, str) for item in elem.keys()): - msg = "Top level keys must be strings. keys: %s" - raise AnsibleFilterError(msg % elem.keys()) + msg = f"Top level keys must be strings. keys: {list(elem.keys())}" + raise AnsibleFilterError(msg) if mp not in ml: - msg = "The matching_parameter must be one of %s. matching_parameter=%s" - raise AnsibleFilterError(msg % (ml, mp)) + msg = f"The matching_parameter must be one of {ml}. matching_parameter={mp!r}" + raise AnsibleFilterError(msg) - return + return data, mp -def _keys_filter_target_str(target, matching_parameter): +def _keys_filter_target_str(target: t.Any, matching_parameter: t.Any) -> tuple[str, ...] | re.Pattern: """ Test: * target is a non-empty string or list. @@ -54,18 +59,18 @@ def _keys_filter_target_str(target, matching_parameter): """ if not isinstance(target, Sequence): - msg = "The target must be a string or a list. target is %s." - raise AnsibleFilterError(msg % type(target)) + msg = f"The target must be a string or a list. target is {type(target)}." + raise AnsibleFilterError(msg) if len(target) == 0: msg = "The target can't be empty." raise AnsibleFilterError(msg) - if isinstance(target, list): + if is_sequence(target): for elem in target: if not isinstance(elem, str): - msg = "The target items must be strings. %s is %s" - raise AnsibleFilterError(msg % (elem, type(elem))) + msg = f"The target items must be strings. {elem!r} is {type(elem)}" + raise AnsibleFilterError(msg) if matching_parameter == "regex": if isinstance(target, str): @@ -77,19 +82,19 @@ def _keys_filter_target_str(target, matching_parameter): else: r = target[0] try: - tt = re.compile(r) + return re.compile(r) except re.error as e: - msg = "The target must be a valid regex if matching_parameter=regex. target is %s" - raise AnsibleFilterError(msg % r) from e + msg = f"The target must be a valid regex if matching_parameter=regex. target is {r}" + raise AnsibleFilterError(msg) from e elif isinstance(target, str): - tt = (target,) + return (target,) else: - tt = tuple(set(target)) - - return tt + return tuple(set(target)) -def _keys_filter_target_dict(target, matching_parameter): +def _keys_filter_target_dict( + target: t.Any, matching_parameter: t.Any +) -> list[tuple[str, str]] | list[tuple[re.Pattern, str]]: """ Test: * target is a list of dictionaries with attributes 'after' and 'before'. @@ -101,8 +106,8 @@ def _keys_filter_target_dict(target, matching_parameter): """ if not isinstance(target, list): - msg = "The target must be a list. target is %s." - raise AnsibleFilterError(msg % (target, type(target))) + msg = f"The target must be a list. target is {target!r} of type {type(target)}." + raise AnsibleFilterError(msg) if len(target) == 0: msg = "The target can't be empty." @@ -110,25 +115,25 @@ def _keys_filter_target_dict(target, matching_parameter): for elem in target: if not isinstance(elem, Mapping): - msg = "The target items must be dictionaries. %s is %s" - raise AnsibleFilterError(msg % (elem, type(elem))) + msg = f"The target items must be dictionaries. {elem!r}%s is {type(elem)}" + raise AnsibleFilterError(msg) if not all(k in elem for k in ("before", "after")): msg = "All dictionaries in target must include attributes: after, before." raise AnsibleFilterError(msg) if not isinstance(elem["before"], str): - msg = "The attributes before must be strings. %s is %s" - raise AnsibleFilterError(msg % (elem["before"], type(elem["before"]))) + msg = f"The attributes before must be strings. {elem['before']!r} is {type(elem['before'])}" + raise AnsibleFilterError(msg) if not isinstance(elem["after"], str): - msg = "The attributes after must be strings. %s is %s" - raise AnsibleFilterError(msg % (elem["after"], type(elem["after"]))) + msg = f"The attributes after must be strings. {elem['after']!r} is {type(elem['after'])}" + raise AnsibleFilterError(msg) - before = [d["before"] for d in target] - after = [d["after"] for d in target] + before: list[str] = [d["before"] for d in target] + after: list[str] = [d["after"] for d in target] if matching_parameter == "regex": try: tr = map(re.compile, before) - tz = list(zip(tr, after)) + return list(zip(tr, after)) except re.error as e: msg = ( "The attributes before must be valid regex if matching_parameter=regex." @@ -136,6 +141,4 @@ def _keys_filter_target_dict(target, matching_parameter): ) raise AnsibleFilterError(msg % before) from e else: - tz = list(zip(before, after)) - - return tz + return list(zip(before, after)) diff --git a/plugins/plugin_utils/unsafe.py b/plugins/plugin_utils/unsafe.py index a60e5d939b..c4efaf26e6 100644 --- a/plugins/plugin_utils/unsafe.py +++ b/plugins/plugin_utils/unsafe.py @@ -5,8 +5,9 @@ from __future__ import annotations import re +import typing as t -from collections.abc import Mapping, Set +from collections.abc import Mapping, Sequence, Set from ansible.module_utils.common.collections import is_sequence from ansible.utils.unsafe_proxy import ( AnsibleUnsafe, @@ -17,14 +18,54 @@ _RE_TEMPLATE_CHARS = re.compile("[{}]") _RE_TEMPLATE_CHARS_BYTES = re.compile(b"[{}]") -def make_unsafe(value): +@t.overload +def make_unsafe(value: None) -> None: ... + + +@t.overload +def make_unsafe(value: Mapping) -> dict: ... + + +@t.overload +def make_unsafe(value: Set) -> set: ... + + +@t.overload +def make_unsafe(value: tuple) -> tuple: ... + + +@t.overload +def make_unsafe(value: list) -> list: ... + + +@t.overload +def make_unsafe(value: Sequence) -> Sequence: ... + + +@t.overload +def make_unsafe(value: str) -> str: ... + + +@t.overload +def make_unsafe(value: bool) -> bool: ... + + +@t.overload +def make_unsafe(value: int) -> int: ... + + +@t.overload +def make_unsafe(value: float) -> float: ... + + +def make_unsafe(value: t.Any) -> t.Any: if value is None or isinstance(value, AnsibleUnsafe): return value if isinstance(value, Mapping): return {make_unsafe(key): make_unsafe(val) for key, val in value.items()} elif isinstance(value, Set): - return set(make_unsafe(elt) for elt in value) + return {make_unsafe(elt) for elt in value} elif is_sequence(value): return type(value)(make_unsafe(elt) for elt in value) elif isinstance(value, bytes): diff --git a/plugins/test/a_module.py b/plugins/test/a_module.py index 44285f0d15..fbd1e1f004 100644 --- a/plugins/test/a_module.py +++ b/plugins/test/a_module.py @@ -38,7 +38,11 @@ _value: type: boolean """ +import typing as t +from collections.abc import Callable + from ansible.plugins.loader import action_loader, module_loader +from ansible.errors import AnsibleFilterError try: from ansible.errors import AnsiblePluginRemovedError @@ -46,12 +50,14 @@ except ImportError: AnsiblePluginRemovedError = Exception # type: ignore -def a_module(term): +def a_module(term: t.Any) -> bool: """ Example: - 'community.general.ufw' is community.general.a_module - 'community.general.does_not_exist' is not community.general.a_module """ + if not isinstance(term, str): + raise AnsibleFilterError(f"Parameter must be a string, got {term!r} of type {type(term)}") try: for loader in (action_loader, module_loader): data = loader.find_plugin(term) @@ -65,7 +71,7 @@ def a_module(term): class TestModule: """Ansible jinja2 tests""" - def tests(self): + def tests(self) -> dict[str, Callable]: return { "a_module": a_module, } diff --git a/plugins/test/ansible_type.py b/plugins/test/ansible_type.py index d298fd8245..262645d427 100644 --- a/plugins/test/ansible_type.py +++ b/plugins/test/ansible_type.py @@ -222,19 +222,21 @@ _value: type: bool """ -from collections.abc import Sequence +import typing as t +from collections.abc import Callable, Sequence from ansible.errors import AnsibleFilterError from ansible_collections.community.general.plugins.plugin_utils.ansible_type import _ansible_type -def ansible_type(data, dtype, alias=None): +def ansible_type(data: t.Any, dtype: t.Any, alias: t.Any = None) -> bool: """Validates data type""" if not isinstance(dtype, Sequence): - msg = "The argument dtype must be a string or a list. dtype is %s." - raise AnsibleFilterError(msg % (dtype, type(dtype))) + msg = f"The argument dtype must be a string or a list. dtype is {dtype!r} of type {type(dtype)}." + raise AnsibleFilterError(msg) + data_types: Sequence if isinstance(dtype, str): data_types = [dtype] else: @@ -245,5 +247,5 @@ def ansible_type(data, dtype, alias=None): class TestModule: - def tests(self): + def tests(self) -> dict[str, Callable]: return {"ansible_type": ansible_type} diff --git a/plugins/test/fqdn_valid.py b/plugins/test/fqdn_valid.py index c46d55d36f..8a3377c35c 100644 --- a/plugins/test/fqdn_valid.py +++ b/plugins/test/fqdn_valid.py @@ -63,7 +63,10 @@ _value: type: bool """ -from ansible.errors import AnsibleError +import typing as t +from collections.abc import Callable + +from ansible.errors import AnsibleFilterError ANOTHER_LIBRARY_IMPORT_ERROR: ImportError | None try: @@ -74,7 +77,7 @@ else: ANOTHER_LIBRARY_IMPORT_ERROR = None -def fqdn_valid(name, min_labels=1, allow_underscores=False): +def fqdn_valid(name: t.Any, min_labels: t.Any = 1, allow_underscores: t.Any = False) -> bool: """ Example: - 'srv.example.com' is community.general.fqdn_valid @@ -82,7 +85,22 @@ def fqdn_valid(name, min_labels=1, allow_underscores=False): """ if ANOTHER_LIBRARY_IMPORT_ERROR: - raise AnsibleError("Python package fqdn must be installed to use this test.") from ANOTHER_LIBRARY_IMPORT_ERROR + raise AnsibleFilterError( + "Python package fqdn must be installed to use this test." + ) from ANOTHER_LIBRARY_IMPORT_ERROR + + if not isinstance(name, str): + raise AnsibleFilterError(f"The name parameter must be a string, got {name!r} of type {type(name)}") + + if not isinstance(min_labels, int): + raise AnsibleFilterError( + f"The min_labels parameter must be an integer, got {min_labels!r} of type {type(min_labels)}" + ) + + if not isinstance(allow_underscores, bool): + raise AnsibleFilterError( + f"The allow_underscores parameter must be a boolean, got {allow_underscores!r} of type {type(allow_underscores)}" + ) fobj = FQDN(name, min_labels=min_labels, allow_underscores=allow_underscores) return fobj.is_valid @@ -93,7 +111,7 @@ class TestModule: https://pypi.org/project/fqdn/ """ - def tests(self): + def tests(self) -> dict[str, Callable]: return { "fqdn_valid": fqdn_valid, } diff --git a/tests/sanity/ignore-2.17.txt b/tests/sanity/ignore-2.17.txt index f6b058ec69..b37d75a1f5 100644 --- a/tests/sanity/ignore-2.17.txt +++ b/tests/sanity/ignore-2.17.txt @@ -9,4 +9,5 @@ plugins/modules/rhevm.py validate-modules:parameter-state-invalid-choice plugins/modules/udm_user.py import-3.11 # Uses deprecated stdlib library 'crypt' plugins/modules/udm_user.py import-3.12 # Uses deprecated stdlib library 'crypt' plugins/modules/xfconf.py validate-modules:return-syntax-error +plugins/plugin_utils/unsafe.py pep8:E704 tests/unit/plugins/modules/test_gio_mime.yaml no-smart-quotes diff --git a/tests/sanity/ignore-2.18.txt b/tests/sanity/ignore-2.18.txt index f6b058ec69..b37d75a1f5 100644 --- a/tests/sanity/ignore-2.18.txt +++ b/tests/sanity/ignore-2.18.txt @@ -9,4 +9,5 @@ plugins/modules/rhevm.py validate-modules:parameter-state-invalid-choice plugins/modules/udm_user.py import-3.11 # Uses deprecated stdlib library 'crypt' plugins/modules/udm_user.py import-3.12 # Uses deprecated stdlib library 'crypt' plugins/modules/xfconf.py validate-modules:return-syntax-error +plugins/plugin_utils/unsafe.py pep8:E704 tests/unit/plugins/modules/test_gio_mime.yaml no-smart-quotes