1
0
Fork 0
mirror of https://github.com/ansible-collections/hetzner.hcloud.git synced 2026-02-03 23:51:48 +00:00
hetzner.hcloud/plugins/module_utils/_vendor/hcloud/firewalls/client.py
Jonas L. cfa0d181f7
refactor: mark module_utils modules as private (#782)
##### SUMMARY

All `module_utils` are now marked as **private**. None of the modules
were intended for public use.

Similar to
https://togithub.com/ansible-collections/community.general/issues/11312
2026-01-06 08:43:46 +01:00

507 lines
18 KiB
Python

from __future__ import annotations
from typing import TYPE_CHECKING, Any, NamedTuple
from ..actions import (
ActionSort,
ActionsPageResult,
ActionStatus,
BoundAction,
ResourceActionsClient,
)
from ..actions.client import ResourceClientBaseActionsMixin
from ..core import BoundModelBase, Meta, ResourceClientBase
from .domain import (
CreateFirewallResponse,
Firewall,
FirewallResource,
FirewallResourceAppliedToResources,
FirewallResourceLabelSelector,
FirewallRule,
)
if TYPE_CHECKING:
from .._client import Client
__all__ = [
"BoundFirewall",
"FirewallsPageResult",
"FirewallsClient",
]
class BoundFirewall(BoundModelBase[Firewall], Firewall):
_client: FirewallsClient
model = Firewall
def __init__(
self,
client: FirewallsClient,
data: dict[str, Any],
complete: bool = True,
):
rules = data.get("rules", [])
if rules:
rules = [
FirewallRule(
direction=rule["direction"],
source_ips=rule["source_ips"],
destination_ips=rule["destination_ips"],
protocol=rule["protocol"],
port=rule["port"],
description=rule["description"],
)
for rule in rules
]
data["rules"] = rules
applied_to = data.get("applied_to", [])
if applied_to:
# pylint: disable=import-outside-toplevel
from ..servers import BoundServer
data_applied_to = []
for firewall_resource in applied_to:
applied_to_resources = None
if firewall_resource.get("applied_to_resources"):
applied_to_resources = [
FirewallResourceAppliedToResources(
type=resource["type"],
server=(
BoundServer(
client._parent.servers,
resource.get("server"),
complete=False,
)
if resource.get("server") is not None
else None
),
)
for resource in firewall_resource.get("applied_to_resources")
]
if firewall_resource["type"] == FirewallResource.TYPE_SERVER:
data_applied_to.append(
FirewallResource(
type=firewall_resource["type"],
server=BoundServer(
client._parent.servers,
firewall_resource["server"],
complete=False,
),
applied_to_resources=applied_to_resources,
)
)
elif firewall_resource["type"] == FirewallResource.TYPE_LABEL_SELECTOR:
data_applied_to.append(
FirewallResource(
type=firewall_resource["type"],
label_selector=FirewallResourceLabelSelector(
selector=firewall_resource["label_selector"]["selector"]
),
applied_to_resources=applied_to_resources,
)
)
data["applied_to"] = data_applied_to
super().__init__(client, data, complete)
def get_actions_list(
self,
status: list[ActionStatus] | None = None,
sort: list[ActionSort] | None = None,
page: int | None = None,
per_page: int | None = None,
) -> ActionsPageResult:
"""
Returns a paginated list of Actions for a Firewall.
:param status: Filter the Actions by status.
:param sort: Sort Actions by field and direction.
:param page: Page number to get.
:param per_page: Maximum number of Actions returned per page.
"""
return self._client.get_actions_list(
self,
status=status,
sort=sort,
page=page,
per_page=per_page,
)
def get_actions(
self,
status: list[ActionStatus] | None = None,
sort: list[ActionSort] | None = None,
) -> list[BoundAction]:
"""
Returns all Actions for a Firewall.
:param status: Filter the Actions by status.
:param sort: Sort Actions by field and direction.
"""
return self._client.get_actions(
self,
status=status,
sort=sort,
)
def update(
self,
name: str | None = None,
labels: dict[str, str] | None = None,
) -> BoundFirewall:
"""Updates the name or labels of a Firewall.
:param labels: Dict[str, str] (optional)
User-defined labels (key-value pairs)
:param name: str (optional)
New Name to set
:return: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>`
"""
return self._client.update(self, name=name, labels=labels)
def delete(self) -> bool:
"""Deletes a Firewall.
:return: boolean
"""
return self._client.delete(self)
def set_rules(self, rules: list[FirewallRule]) -> list[BoundAction]:
"""Sets the rules of a Firewall. All existing rules will be overwritten. Pass an empty rules array to remove all rules.
:param rules: List[:class:`FirewallRule <hcloud.firewalls.domain.FirewallRule>`]
:return: List[:class:`BoundAction <hcloud.actions.client.BoundAction>`]
"""
return self._client.set_rules(self, rules=rules)
def apply_to_resources(
self,
resources: list[FirewallResource],
) -> list[BoundAction]:
"""Applies one Firewall to multiple resources.
:param resources: List[:class:`FirewallResource <hcloud.firewalls.domain.FirewallResource>`]
:return: List[:class:`BoundAction <hcloud.actions.client.BoundAction>`]
"""
return self._client.apply_to_resources(self, resources=resources)
def remove_from_resources(
self,
resources: list[FirewallResource],
) -> list[BoundAction]:
"""Removes one Firewall from multiple resources.
:param resources: List[:class:`FirewallResource <hcloud.firewalls.domain.FirewallResource>`]
:return: List[:class:`BoundAction <hcloud.actions.client.BoundAction>`]
"""
return self._client.remove_from_resources(self, resources=resources)
class FirewallsPageResult(NamedTuple):
firewalls: list[BoundFirewall]
meta: Meta
class FirewallsClient(
ResourceClientBaseActionsMixin,
ResourceClientBase,
):
_base_url = "/firewalls"
actions: ResourceActionsClient
"""Firewalls scoped actions client
:type: :class:`ResourceActionsClient <hcloud.actions.client.ResourceActionsClient>`
"""
def __init__(self, client: Client):
super().__init__(client)
self.actions = ResourceActionsClient(client, self._base_url)
def get_actions_list(
self,
firewall: Firewall | BoundFirewall,
status: list[ActionStatus] | None = None,
sort: list[ActionSort] | None = None,
page: int | None = None,
per_page: int | None = None,
) -> ActionsPageResult:
"""
Returns a paginated list of Actions for a Firewall.
:param firewall: Firewall to get the Actions for.
:param status: Filter the Actions by status.
:param sort: Sort Actions by field and direction.
:param page: Page number to get.
:param per_page: Maximum number of Actions returned per page.
"""
return self._get_actions_list(
f"{self._base_url}/{firewall.id}",
status=status,
sort=sort,
page=page,
per_page=per_page,
)
def get_actions(
self,
firewall: Firewall | BoundFirewall,
status: list[ActionStatus] | None = None,
sort: list[ActionSort] | None = None,
) -> list[BoundAction]:
"""
Returns all Actions for a Firewall.
:param firewall: Firewall to get the Actions for.
:param status: Filter the Actions by status.
:param sort: Sort Actions by field and direction.
"""
return self._iter_pages(
self.get_actions_list,
firewall,
status=status,
sort=sort,
)
def get_by_id(self, id: int) -> BoundFirewall:
"""Returns a specific Firewall object.
:param id: int
:return: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>`
"""
response = self._client.request(url=f"{self._base_url}/{id}", method="GET")
return BoundFirewall(self, response["firewall"])
def get_list(
self,
label_selector: str | None = None,
page: int | None = None,
per_page: int | None = None,
name: str | None = None,
sort: list[str] | None = None,
) -> FirewallsPageResult:
"""Get a list of floating ips from this account
:param label_selector: str (optional)
Can be used to filter Firewalls by labels. The response will only contain Firewalls matching the label selector values.
:param page: int (optional)
Specifies the page to fetch
:param per_page: int (optional)
Specifies how many results are returned by page
:param name: str (optional)
Can be used to filter networks by their name.
:param sort: List[str] (optional)
Choices: id name created (You can add one of ":asc", ":desc" to modify sort order. ( ":asc" is default))
:return: (List[:class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>`], :class:`Meta <hcloud.core.domain.Meta>`)
"""
params: dict[str, Any] = {}
if label_selector is not None:
params["label_selector"] = label_selector
if page is not None:
params["page"] = page
if per_page is not None:
params["per_page"] = per_page
if name is not None:
params["name"] = name
if sort is not None:
params["sort"] = sort
response = self._client.request(url=self._base_url, method="GET", params=params)
firewalls = [
BoundFirewall(self, firewall_data)
for firewall_data in response["firewalls"]
]
return FirewallsPageResult(firewalls, Meta.parse_meta(response))
def get_all(
self,
label_selector: str | None = None,
name: str | None = None,
sort: list[str] | None = None,
) -> list[BoundFirewall]:
"""Get all floating ips from this account
:param label_selector: str (optional)
Can be used to filter Firewalls by labels. The response will only contain Firewalls matching the label selector values.
:param name: str (optional)
Can be used to filter networks by their name.
:param sort: List[str] (optional)
Choices: id name created (You can add one of ":asc", ":desc" to modify sort order. ( ":asc" is default))
:return: List[:class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>`]
"""
return self._iter_pages(
self.get_list,
label_selector=label_selector,
name=name,
sort=sort,
)
def get_by_name(self, name: str) -> BoundFirewall | None:
"""Get Firewall by name
:param name: str
Used to get Firewall by name.
:return: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>`
"""
return self._get_first_by(self.get_list, name=name)
def create(
self,
name: str,
rules: list[FirewallRule] | None = None,
labels: str | None = None,
resources: list[FirewallResource] | None = None,
) -> CreateFirewallResponse:
"""Creates a new Firewall.
:param name: str
Firewall Name
:param rules: List[:class:`FirewallRule <hcloud.firewalls.domain.FirewallRule>`] (optional)
:param labels: Dict[str, str] (optional)
User-defined labels (key-value pairs)
:param resources: List[:class:`FirewallResource <hcloud.firewalls.domain.FirewallResource>`] (optional)
:return: :class:`CreateFirewallResponse <hcloud.firewalls.domain.CreateFirewallResponse>`
"""
data: dict[str, Any] = {"name": name}
if labels is not None:
data["labels"] = labels
if rules is not None:
data.update({"rules": []})
for rule in rules:
data["rules"].append(rule.to_payload())
if resources is not None:
data.update({"apply_to": []})
for resource in resources:
data["apply_to"].append(resource.to_payload())
response = self._client.request(url=self._base_url, json=data, method="POST")
actions = []
if response.get("actions") is not None:
actions = [
BoundAction(self._parent.actions, action_data)
for action_data in response["actions"]
]
result = CreateFirewallResponse(
firewall=BoundFirewall(self, response["firewall"]), actions=actions
)
return result
def update(
self,
firewall: Firewall | BoundFirewall,
labels: dict[str, str] | None = None,
name: str | None = None,
) -> BoundFirewall:
"""Updates the description or labels of a Firewall.
:param firewall: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>` or :class:`Firewall <hcloud.firewalls.domain.Firewall>`
:param labels: Dict[str, str] (optional)
User-defined labels (key-value pairs)
:param name: str (optional)
New name to set
:return: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>`
"""
data: dict[str, Any] = {}
if labels is not None:
data["labels"] = labels
if name is not None:
data["name"] = name
response = self._client.request(
url=f"{self._base_url}/{firewall.id}",
method="PUT",
json=data,
)
return BoundFirewall(self, response["firewall"])
def delete(self, firewall: Firewall | BoundFirewall) -> bool:
"""Deletes a Firewall.
:param firewall: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>` or :class:`Firewall <hcloud.firewalls.domain.Firewall>`
:return: boolean
"""
self._client.request(
url=f"{self._base_url}/{firewall.id}",
method="DELETE",
)
# Return always true, because the API does not return an action for it. When an error occurs a HcloudAPIException will be raised
return True
def set_rules(
self,
firewall: Firewall | BoundFirewall,
rules: list[FirewallRule],
) -> list[BoundAction]:
"""Sets the rules of a Firewall. All existing rules will be overwritten. Pass an empty rules array to remove all rules.
:param firewall: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>` or :class:`Firewall <hcloud.firewalls.domain.Firewall>`
:param rules: List[:class:`FirewallRule <hcloud.firewalls.domain.FirewallRule>`]
:return: List[:class:`BoundAction <hcloud.actions.client.BoundAction>`]
"""
data: dict[str, Any] = {"rules": []}
for rule in rules:
data["rules"].append(rule.to_payload())
response = self._client.request(
url=f"{self._base_url}/{firewall.id}/actions/set_rules",
method="POST",
json=data,
)
return [
BoundAction(self._parent.actions, action_data)
for action_data in response["actions"]
]
def apply_to_resources(
self,
firewall: Firewall | BoundFirewall,
resources: list[FirewallResource],
) -> list[BoundAction]:
"""Applies one Firewall to multiple resources.
:param firewall: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>` or :class:`Firewall <hcloud.firewalls.domain.Firewall>`
:param resources: List[:class:`FirewallResource <hcloud.firewalls.domain.FirewallResource>`]
:return: List[:class:`BoundAction <hcloud.actions.client.BoundAction>`]
"""
data: dict[str, Any] = {"apply_to": []}
for resource in resources:
data["apply_to"].append(resource.to_payload())
response = self._client.request(
url=f"{self._base_url}/{firewall.id}/actions/apply_to_resources",
method="POST",
json=data,
)
return [
BoundAction(self._parent.actions, action_data)
for action_data in response["actions"]
]
def remove_from_resources(
self,
firewall: Firewall | BoundFirewall,
resources: list[FirewallResource],
) -> list[BoundAction]:
"""Removes one Firewall from multiple resources.
:param firewall: :class:`BoundFirewall <hcloud.firewalls.client.BoundFirewall>` or :class:`Firewall <hcloud.firewalls.domain.Firewall>`
:param resources: List[:class:`FirewallResource <hcloud.firewalls.domain.FirewallResource>`]
:return: List[:class:`BoundAction <hcloud.actions.client.BoundAction>`]
"""
data: dict[str, Any] = {"remove_from": []}
for resource in resources:
data["remove_from"].append(resource.to_payload())
response = self._client.request(
url=f"{self._base_url}/{firewall.id}/actions/remove_from_resources",
method="POST",
json=data,
)
return [
BoundAction(self._parent.actions, action_data)
for action_data in response["actions"]
]