# Copyright (c) Ansible project # Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause) # SPDX-License-Identifier: BSD-2-Clause from __future__ import annotations import time # # DEPRECATED # # This module utils is deprecated and will be removed in community.general 13.0.0 # class OneAndOneResources: firewall_policy = "firewall_policy" load_balancer = "load_balancer" monitoring_policy = "monitoring_policy" private_network = "private_network" public_ip = "public_ip" role = "role" server = "server" user = "user" vpn = "vpn" def get_resource(oneandone_conn, resource_type, resource_id): switcher = { "firewall_policy": oneandone_conn.get_firewall, "load_balancer": oneandone_conn.get_load_balancer, "monitoring_policy": oneandone_conn.get_monitoring_policy, "private_network": oneandone_conn.get_private_network, "public_ip": oneandone_conn.get_public_ip, "role": oneandone_conn.get_role, "server": oneandone_conn.get_server, "user": oneandone_conn.get_user, "vpn": oneandone_conn.get_vpn, } return switcher.get(resource_type)(resource_id) def get_datacenter(oneandone_conn, datacenter, full_object=False): """ Validates the datacenter exists by ID or country code. Returns the datacenter ID. """ for _datacenter in oneandone_conn.list_datacenters(): if datacenter in (_datacenter["id"], _datacenter["country_code"]): if full_object: return _datacenter return _datacenter["id"] def get_fixed_instance_size(oneandone_conn, fixed_instance_size, full_object=False): """ Validates the fixed instance size exists by ID or name. Return the instance size ID. """ for _fixed_instance_size in oneandone_conn.fixed_server_flavors(): if fixed_instance_size in (_fixed_instance_size["id"], _fixed_instance_size["name"]): if full_object: return _fixed_instance_size return _fixed_instance_size["id"] def get_appliance(oneandone_conn, appliance, full_object=False): """ Validates the appliance exists by ID or name. Return the appliance ID. """ for _appliance in oneandone_conn.list_appliances(q="IMAGE"): if appliance in (_appliance["id"], _appliance["name"]): if full_object: return _appliance return _appliance["id"] def get_private_network(oneandone_conn, private_network, full_object=False): """ Validates the private network exists by ID or name. Return the private network ID. """ for _private_network in oneandone_conn.list_private_networks(): if private_network in (_private_network["name"], _private_network["id"]): if full_object: return _private_network return _private_network["id"] def get_monitoring_policy(oneandone_conn, monitoring_policy, full_object=False): """ Validates the monitoring policy exists by ID or name. Return the monitoring policy ID. """ for _monitoring_policy in oneandone_conn.list_monitoring_policies(): if monitoring_policy in (_monitoring_policy["name"], _monitoring_policy["id"]): if full_object: return _monitoring_policy return _monitoring_policy["id"] def get_firewall_policy(oneandone_conn, firewall_policy, full_object=False): """ Validates the firewall policy exists by ID or name. Return the firewall policy ID. """ for _firewall_policy in oneandone_conn.list_firewall_policies(): if firewall_policy in (_firewall_policy["name"], _firewall_policy["id"]): if full_object: return _firewall_policy return _firewall_policy["id"] def get_load_balancer(oneandone_conn, load_balancer, full_object=False): """ Validates the load balancer exists by ID or name. Return the load balancer ID. """ for _load_balancer in oneandone_conn.list_load_balancers(): if load_balancer in (_load_balancer["name"], _load_balancer["id"]): if full_object: return _load_balancer return _load_balancer["id"] def get_server(oneandone_conn, instance, full_object=False): """ Validates that the server exists whether by ID or name. Returns the server if one was found. """ for server in oneandone_conn.list_servers(per_page=1000): if instance in (server["id"], server["name"]): if full_object: return server return server["id"] def get_user(oneandone_conn, user, full_object=False): """ Validates that the user exists by ID or a name. Returns the user if one was found. """ for _user in oneandone_conn.list_users(per_page=1000): if user in (_user["id"], _user["name"]): if full_object: return _user return _user["id"] def get_role(oneandone_conn, role, full_object=False): """ Given a name, validates that the role exists whether it is a proper ID or a name. Returns the role if one was found, else None. """ for _role in oneandone_conn.list_roles(per_page=1000): if role in (_role["id"], _role["name"]): if full_object: return _role return _role["id"] def get_vpn(oneandone_conn, vpn, full_object=False): """ Validates that the vpn exists by ID or a name. Returns the vpn if one was found. """ for _vpn in oneandone_conn.list_vpns(per_page=1000): if vpn in (_vpn["id"], _vpn["name"]): if full_object: return _vpn return _vpn["id"] def get_public_ip(oneandone_conn, public_ip, full_object=False): """ Validates that the public ip exists by ID or a name. Returns the public ip if one was found. """ for _public_ip in oneandone_conn.list_public_ips(per_page=1000): if public_ip in (_public_ip["id"], _public_ip["ip"]): if full_object: return _public_ip return _public_ip["id"] def wait_for_resource_creation_completion(oneandone_conn, resource_type, resource_id, wait_timeout, wait_interval): """ Waits for the resource create operation to complete based on the timeout period. """ wait_timeout = time.time() + wait_timeout while wait_timeout > time.time(): time.sleep(wait_interval) # Refresh the resource info resource = get_resource(oneandone_conn, resource_type, resource_id) if resource_type == OneAndOneResources.server: resource_state = resource["status"]["state"] else: resource_state = resource["state"] if (resource_type == OneAndOneResources.server and resource_state.lower() == "powered_on") or ( resource_type != OneAndOneResources.server and resource_state.lower() == "active" ): return elif resource_state.lower() == "failed": raise Exception(f"{resource_type} creation failed for {resource_id}") elif resource_state.lower() in ("active", "enabled", "deploying", "configuring"): continue else: raise Exception(f"Unknown {resource_type} state {resource_state}") raise Exception(f"Timed out waiting for {resource_type} completion for {resource_id}") def wait_for_resource_deletion_completion(oneandone_conn, resource_type, resource_id, wait_timeout, wait_interval): """ Waits for the resource delete operation to complete based on the timeout period. """ wait_timeout = time.time() + wait_timeout while wait_timeout > time.time(): time.sleep(wait_interval) # Refresh the operation info logs = oneandone_conn.list_logs(q="DELETE", period="LAST_HOUR", sort="-start_date") if resource_type == OneAndOneResources.server: _type = "VM" elif resource_type == OneAndOneResources.private_network: _type = "PRIVATENETWORK" else: raise Exception(f"Unsupported wait_for delete operation for {resource_type} resource") for log in logs: if ( log["resource"]["id"] == resource_id and log["action"] == "DELETE" and log["type"] == _type and log["status"]["state"] == "OK" ): return raise Exception(f"Timed out waiting for {resource_type} deletion for {resource_id}")