from __future__ import annotations import time from http import HTTPStatus from random import uniform from typing import Protocol try: import requests except ImportError: requests = None from ._exceptions import APIException from ._version import __version__ from .actions import ActionsClient from .certificates import CertificatesClient from .datacenters import DatacentersClient from .firewalls import FirewallsClient from .floating_ips import FloatingIPsClient from .images import ImagesClient from .isos import IsosClient from .load_balancer_types import LoadBalancerTypesClient from .load_balancers import LoadBalancersClient from .locations import LocationsClient from .networks import NetworksClient from .placement_groups import PlacementGroupsClient from .primary_ips import PrimaryIPsClient from .server_types import ServerTypesClient from .servers import ServersClient from .ssh_keys import SSHKeysClient from .volumes import VolumesClient from .zones import ZonesClient class BackoffFunction(Protocol): def __call__(self, retries: int) -> float: """ Return a interval in seconds to wait between each API call. :param retries: Number of calls already made. """ def constant_backoff_function(interval: float) -> BackoffFunction: """ Return a backoff function, implementing a constant backoff. :param interval: Constant interval to return. """ # pylint: disable=unused-argument def func(retries: int) -> float: return interval return func def exponential_backoff_function( *, base: float, multiplier: int, cap: float, jitter: bool = False, ) -> BackoffFunction: """ Return a backoff function, implementing a truncated exponential backoff with optional full jitter. :param base: Base for the exponential backoff algorithm. :param multiplier: Multiplier for the exponential backoff algorithm. :param cap: Value at which the interval is truncated. :param jitter: Whether to add jitter. """ def func(retries: int) -> float: interval = base * multiplier**retries # Exponential backoff interval = min(cap, interval) # Cap backoff if jitter: interval = uniform(base, interval) # Add jitter return interval return func def _build_user_agent( application_name: str | None, application_version: str | None, ) -> str: """Build the user agent of the hcloud-python instance with the user application name (if specified) :return: The user agent of this hcloud-python instance """ parts = [] for name, version in [ (application_name, application_version), ("hcloud-python", __version__), ]: if name is not None: parts.append(name if version is None else f"{name}/{version}") return " ".join(parts) class Client: """ Client for the Hetzner Cloud API. The Hetzner Cloud API reference is available at https://docs.hetzner.cloud. Make sure to follow our API changelog available at https://docs.hetzner.cloud/changelog (or the RRS feed available at https://docs.hetzner.cloud/changelog/feed.rss) to be notified about additions, deprecations and removals. **Retry mechanism** The :attr:`Client.request` method will retry failed requests that match certain criteria. The default retry interval is defined by an exponential backoff algorithm truncated to 60s with jitter. The default maximal number of retries is 5. The following rules define when a request can be retried: - When the client returned a network timeout error. - When the API returned an HTTP error, with the status code: - ``502`` Bad Gateway - ``504`` Gateway Timeout - When the API returned an application error, with the code: - ``conflict`` - ``rate_limit_exceeded`` Changes to the retry policy might occur between releases, and will not be considered breaking changes. """ def __init__( self, token: str, api_endpoint: str = "https://api.hetzner.cloud/v1", application_name: str | None = None, application_version: str | None = None, poll_interval: int | float | BackoffFunction = 1.0, poll_max_retries: int = 120, timeout: float | tuple[float, float] | None = None, ): """Create a new Client instance :param token: Hetzner Cloud API token :param api_endpoint: Hetzner Cloud API endpoint :param application_name: Your application name :param application_version: Your application _version :param poll_interval: Interval in seconds to use when polling actions from the API. You may pass a function to compute a custom poll interval. :param poll_max_retries: Max retries before timeout when polling actions from the API. :param timeout: Requests timeout in seconds """ self._client = ClientBase( token=token, endpoint=api_endpoint, application_name=application_name, application_version=application_version, poll_interval=poll_interval, poll_max_retries=poll_max_retries, timeout=timeout, ) self.datacenters = DatacentersClient(self) """DatacentersClient Instance :type: :class:`DatacentersClient ` """ self.locations = LocationsClient(self) """LocationsClient Instance :type: :class:`LocationsClient ` """ self.servers = ServersClient(self) """ServersClient Instance :type: :class:`ServersClient ` """ self.server_types = ServerTypesClient(self) """ServerTypesClient Instance :type: :class:`ServerTypesClient ` """ self.volumes = VolumesClient(self) """VolumesClient Instance :type: :class:`VolumesClient ` """ self.actions = ActionsClient(self) """ActionsClient Instance :type: :class:`ActionsClient ` """ self.images = ImagesClient(self) """ImagesClient Instance :type: :class:`ImagesClient ` """ self.isos = IsosClient(self) """ImagesClient Instance :type: :class:`IsosClient ` """ self.ssh_keys = SSHKeysClient(self) """SSHKeysClient Instance :type: :class:`SSHKeysClient ` """ self.floating_ips = FloatingIPsClient(self) """FloatingIPsClient Instance :type: :class:`FloatingIPsClient ` """ self.primary_ips = PrimaryIPsClient(self) """PrimaryIPsClient Instance :type: :class:`PrimaryIPsClient ` """ self.networks = NetworksClient(self) """NetworksClient Instance :type: :class:`NetworksClient ` """ self.certificates = CertificatesClient(self) """CertificatesClient Instance :type: :class:`CertificatesClient ` """ self.load_balancers = LoadBalancersClient(self) """LoadBalancersClient Instance :type: :class:`LoadBalancersClient ` """ self.load_balancer_types = LoadBalancerTypesClient(self) """LoadBalancerTypesClient Instance :type: :class:`LoadBalancerTypesClient ` """ self.firewalls = FirewallsClient(self) """FirewallsClient Instance :type: :class:`FirewallsClient ` """ self.placement_groups = PlacementGroupsClient(self) """PlacementGroupsClient Instance :type: :class:`PlacementGroupsClient ` """ self.zones = ZonesClient(self) """ZonesClient Instance :type: :class:`ZonesClient ` """ def request( # type: ignore[no-untyped-def] self, method: str, url: str, **kwargs, ) -> dict: """Perform a request to the Hetzner Cloud API. :param method: Method to perform the request. :param url: URL to perform the request. :param timeout: Requests timeout in seconds. """ return self._client.request(method, url, **kwargs) class ClientBase: def __init__( self, token: str, *, endpoint: str, application_name: str | None = None, application_version: str | None = None, poll_interval: int | float | BackoffFunction = 1.0, poll_max_retries: int = 120, timeout: float | tuple[float, float] | None = None, ): self._token = token self._endpoint = endpoint self._user_agent = _build_user_agent(application_name, application_version) self._headers = { "User-Agent": self._user_agent, "Authorization": f"Bearer {self._token}", "Accept": "application/json", } if isinstance(poll_interval, (int, float)): poll_interval_func = constant_backoff_function(poll_interval) else: poll_interval_func = poll_interval self._poll_interval_func = poll_interval_func self._poll_max_retries = poll_max_retries self._retry_interval_func = exponential_backoff_function( base=1.0, multiplier=2, cap=60.0, jitter=True ) self._retry_max_retries = 5 self._timeout = timeout self._session = requests.Session() def request( # type: ignore[no-untyped-def] self, method: str, url: str, **kwargs, ) -> dict: """Perform a request to the provided URL. :param method: Method to perform the request. :param url: URL to perform the request. :param timeout: Requests timeout in seconds. :return: Response """ kwargs.setdefault("timeout", self._timeout) url = self._endpoint + url headers = self._headers retries = 0 while True: try: response = self._session.request( method=method, url=url, headers=headers, **kwargs, ) return self._read_response(response) except APIException as exception: if retries < self._retry_max_retries and self._retry_policy(exception): time.sleep(self._retry_interval_func(retries)) retries += 1 continue raise except requests.exceptions.Timeout: if retries < self._retry_max_retries: time.sleep(self._retry_interval_func(retries)) retries += 1 continue raise def _read_response(self, response) -> dict: correlation_id = response.headers.get("X-Correlation-Id") payload = {} try: if len(response.content) > 0: payload = response.json() except (TypeError, ValueError) as exc: raise APIException( code=response.status_code, message=response.reason, details={"content": response.content}, correlation_id=correlation_id, ) from exc if not response.ok: if not payload or "error" not in payload: raise APIException( code=response.status_code, message=response.reason, details={"content": response.content}, correlation_id=correlation_id, ) error: dict = payload["error"] raise APIException( code=error["code"], message=error["message"], details=error.get("details"), correlation_id=correlation_id, ) return payload def _retry_policy(self, exception: APIException) -> bool: if isinstance(exception.code, str): return exception.code in ( "rate_limit_exceeded", "conflict", ) if isinstance(exception.code, int): return exception.code in ( HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT, ) return False