1
0
Fork 0
mirror of https://github.com/ansible-collections/hetzner.hcloud.git synced 2026-02-04 08:01:49 +00:00
hetzner.hcloud/plugins/filter/all.py
Jonas L. cfa0d181f7
refactor: mark module_utils modules as private (#782)
##### SUMMARY

All `module_utils` are now marked as **private**. None of the modules
were intended for public use.

Similar to
https://togithub.com/ansible-collections/community.general/issues/11312
2026-01-06 08:43:46 +01:00

73 lines
2.2 KiB
Python

from __future__ import annotations
from typing import Literal
from ansible.errors import AnsibleFilterError
from ansible.module_utils.common.text.converters import to_native
from ..module_utils._vendor.hcloud.exp.zone import format_txt_record
# pylint: disable=unused-argument
def load_balancer_status(load_balancer: dict, *args, **kwargs) -> Literal["unknown", "unhealthy", "healthy"]:
"""
Return the status of a Load Balancer based on its targets.
"""
def targets_status(targets: list) -> Literal["unknown", "unhealthy", "healthy"]:
result = "healthy"
for target in targets:
# Label selector targets have child targets that must be checked
if target["type"] == "label_selector":
status = targets_status(target["targets"])
if status == "unhealthy":
return "unhealthy"
if status in (None, "unknown"):
result = "unknown"
continue
# Report missing health status as unknown
if not target.get("health_status"):
return "unknown"
for health_status in target.get("health_status"):
status = health_status.get("status")
if status == "unhealthy":
return "unhealthy"
if status in (None, "unknown"):
result = "unknown"
return result
try:
return targets_status(load_balancer["targets"])
except Exception as exc:
raise AnsibleFilterError(f"load_balancer_status - {to_native(exc)}", orig_exc=exc) from exc
# pylint: disable=unused-argument
def txt_record(record: str, *args, **kwargs) -> str:
"""
Format a TXT record by splitting it in quoted strings of 255 characters.
Existing quotes will be escaped.
"""
try:
return format_txt_record(record)
except Exception as exc:
raise AnsibleFilterError(f"txt_record - {to_native(exc)}", orig_exc=exc) from exc
class FilterModule:
"""
Hetzner Cloud filters.
"""
def filters(self):
return {
"load_balancer_status": load_balancer_status,
"txt_record": txt_record,
}