mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-02-03 23:41:51 +00:00
* Resolve E713 and E714 (not in/is tests). * Address UP018 (unnecessary str call). * UP045 requires Python 3.10+. * Address UP007 (X | Y for type annotations). * Address UP035 (import Callable from collections.abc). * Address UP006 (t.Dict -> dict). * Address UP009 (UTF-8 encoding comment). * Address UP034 (extraneous parantheses). * Address SIM910 (dict.get() with None default). * Address F401 (unused import). * Address UP020 (use builtin open). * Address B009 and B010 (getattr/setattr with constant name). * Address SIM300 (Yoda conditions). * UP029 isn't in use anyway. * Address FLY002 (static join). * Address B034 (re.sub positional args). * Address B020 (loop variable overrides input). * Address B017 (assert raise Exception). * Address SIM211 (if expression with false/true). * Address SIM113 (enumerate for loop). * Address UP036 (sys.version_info checks). * Remove unnecessary UP039. * Address SIM201 (not ==). * Address SIM212 (if expr with twisted arms). * Add changelog fragment. * Reformat.
245 lines
8.4 KiB
Python
245 lines
8.4 KiB
Python
# Copyright (c) Ansible project
|
|
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
|
|
# SPDX-License-Identifier: BSD-2-Clause
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
|
|
#
|
|
# DEPRECATED
|
|
#
|
|
# This module utils is deprecated and will be removed in community.general 13.0.0
|
|
#
|
|
|
|
|
|
class OneAndOneResources:
|
|
firewall_policy = "firewall_policy"
|
|
load_balancer = "load_balancer"
|
|
monitoring_policy = "monitoring_policy"
|
|
private_network = "private_network"
|
|
public_ip = "public_ip"
|
|
role = "role"
|
|
server = "server"
|
|
user = "user"
|
|
vpn = "vpn"
|
|
|
|
|
|
def get_resource(oneandone_conn, resource_type, resource_id):
|
|
switcher = {
|
|
"firewall_policy": oneandone_conn.get_firewall,
|
|
"load_balancer": oneandone_conn.get_load_balancer,
|
|
"monitoring_policy": oneandone_conn.get_monitoring_policy,
|
|
"private_network": oneandone_conn.get_private_network,
|
|
"public_ip": oneandone_conn.get_public_ip,
|
|
"role": oneandone_conn.get_role,
|
|
"server": oneandone_conn.get_server,
|
|
"user": oneandone_conn.get_user,
|
|
"vpn": oneandone_conn.get_vpn,
|
|
}
|
|
|
|
return switcher.get(resource_type)(resource_id)
|
|
|
|
|
|
def get_datacenter(oneandone_conn, datacenter, full_object=False):
|
|
"""
|
|
Validates the datacenter exists by ID or country code.
|
|
Returns the datacenter ID.
|
|
"""
|
|
for _datacenter in oneandone_conn.list_datacenters():
|
|
if datacenter in (_datacenter["id"], _datacenter["country_code"]):
|
|
if full_object:
|
|
return _datacenter
|
|
return _datacenter["id"]
|
|
|
|
|
|
def get_fixed_instance_size(oneandone_conn, fixed_instance_size, full_object=False):
|
|
"""
|
|
Validates the fixed instance size exists by ID or name.
|
|
Return the instance size ID.
|
|
"""
|
|
for _fixed_instance_size in oneandone_conn.fixed_server_flavors():
|
|
if fixed_instance_size in (_fixed_instance_size["id"], _fixed_instance_size["name"]):
|
|
if full_object:
|
|
return _fixed_instance_size
|
|
return _fixed_instance_size["id"]
|
|
|
|
|
|
def get_appliance(oneandone_conn, appliance, full_object=False):
|
|
"""
|
|
Validates the appliance exists by ID or name.
|
|
Return the appliance ID.
|
|
"""
|
|
for _appliance in oneandone_conn.list_appliances(q="IMAGE"):
|
|
if appliance in (_appliance["id"], _appliance["name"]):
|
|
if full_object:
|
|
return _appliance
|
|
return _appliance["id"]
|
|
|
|
|
|
def get_private_network(oneandone_conn, private_network, full_object=False):
|
|
"""
|
|
Validates the private network exists by ID or name.
|
|
Return the private network ID.
|
|
"""
|
|
for _private_network in oneandone_conn.list_private_networks():
|
|
if private_network in (_private_network["name"], _private_network["id"]):
|
|
if full_object:
|
|
return _private_network
|
|
return _private_network["id"]
|
|
|
|
|
|
def get_monitoring_policy(oneandone_conn, monitoring_policy, full_object=False):
|
|
"""
|
|
Validates the monitoring policy exists by ID or name.
|
|
Return the monitoring policy ID.
|
|
"""
|
|
for _monitoring_policy in oneandone_conn.list_monitoring_policies():
|
|
if monitoring_policy in (_monitoring_policy["name"], _monitoring_policy["id"]):
|
|
if full_object:
|
|
return _monitoring_policy
|
|
return _monitoring_policy["id"]
|
|
|
|
|
|
def get_firewall_policy(oneandone_conn, firewall_policy, full_object=False):
|
|
"""
|
|
Validates the firewall policy exists by ID or name.
|
|
Return the firewall policy ID.
|
|
"""
|
|
for _firewall_policy in oneandone_conn.list_firewall_policies():
|
|
if firewall_policy in (_firewall_policy["name"], _firewall_policy["id"]):
|
|
if full_object:
|
|
return _firewall_policy
|
|
return _firewall_policy["id"]
|
|
|
|
|
|
def get_load_balancer(oneandone_conn, load_balancer, full_object=False):
|
|
"""
|
|
Validates the load balancer exists by ID or name.
|
|
Return the load balancer ID.
|
|
"""
|
|
for _load_balancer in oneandone_conn.list_load_balancers():
|
|
if load_balancer in (_load_balancer["name"], _load_balancer["id"]):
|
|
if full_object:
|
|
return _load_balancer
|
|
return _load_balancer["id"]
|
|
|
|
|
|
def get_server(oneandone_conn, instance, full_object=False):
|
|
"""
|
|
Validates that the server exists whether by ID or name.
|
|
Returns the server if one was found.
|
|
"""
|
|
for server in oneandone_conn.list_servers(per_page=1000):
|
|
if instance in (server["id"], server["name"]):
|
|
if full_object:
|
|
return server
|
|
return server["id"]
|
|
|
|
|
|
def get_user(oneandone_conn, user, full_object=False):
|
|
"""
|
|
Validates that the user exists by ID or a name.
|
|
Returns the user if one was found.
|
|
"""
|
|
for _user in oneandone_conn.list_users(per_page=1000):
|
|
if user in (_user["id"], _user["name"]):
|
|
if full_object:
|
|
return _user
|
|
return _user["id"]
|
|
|
|
|
|
def get_role(oneandone_conn, role, full_object=False):
|
|
"""
|
|
Given a name, validates that the role exists
|
|
whether it is a proper ID or a name.
|
|
Returns the role if one was found, else None.
|
|
"""
|
|
for _role in oneandone_conn.list_roles(per_page=1000):
|
|
if role in (_role["id"], _role["name"]):
|
|
if full_object:
|
|
return _role
|
|
return _role["id"]
|
|
|
|
|
|
def get_vpn(oneandone_conn, vpn, full_object=False):
|
|
"""
|
|
Validates that the vpn exists by ID or a name.
|
|
Returns the vpn if one was found.
|
|
"""
|
|
for _vpn in oneandone_conn.list_vpns(per_page=1000):
|
|
if vpn in (_vpn["id"], _vpn["name"]):
|
|
if full_object:
|
|
return _vpn
|
|
return _vpn["id"]
|
|
|
|
|
|
def get_public_ip(oneandone_conn, public_ip, full_object=False):
|
|
"""
|
|
Validates that the public ip exists by ID or a name.
|
|
Returns the public ip if one was found.
|
|
"""
|
|
for _public_ip in oneandone_conn.list_public_ips(per_page=1000):
|
|
if public_ip in (_public_ip["id"], _public_ip["ip"]):
|
|
if full_object:
|
|
return _public_ip
|
|
return _public_ip["id"]
|
|
|
|
|
|
def wait_for_resource_creation_completion(oneandone_conn, resource_type, resource_id, wait_timeout, wait_interval):
|
|
"""
|
|
Waits for the resource create operation to complete based on the timeout period.
|
|
"""
|
|
wait_timeout = time.time() + wait_timeout
|
|
while wait_timeout > time.time():
|
|
time.sleep(wait_interval)
|
|
|
|
# Refresh the resource info
|
|
resource = get_resource(oneandone_conn, resource_type, resource_id)
|
|
|
|
if resource_type == OneAndOneResources.server:
|
|
resource_state = resource["status"]["state"]
|
|
else:
|
|
resource_state = resource["state"]
|
|
|
|
if (resource_type == OneAndOneResources.server and resource_state.lower() == "powered_on") or (
|
|
resource_type != OneAndOneResources.server and resource_state.lower() == "active"
|
|
):
|
|
return
|
|
elif resource_state.lower() == "failed":
|
|
raise Exception(f"{resource_type} creation failed for {resource_id}")
|
|
elif resource_state.lower() in ("active", "enabled", "deploying", "configuring"):
|
|
continue
|
|
else:
|
|
raise Exception(f"Unknown {resource_type} state {resource_state}")
|
|
|
|
raise Exception(f"Timed out waiting for {resource_type} completion for {resource_id}")
|
|
|
|
|
|
def wait_for_resource_deletion_completion(oneandone_conn, resource_type, resource_id, wait_timeout, wait_interval):
|
|
"""
|
|
Waits for the resource delete operation to complete based on the timeout period.
|
|
"""
|
|
wait_timeout = time.time() + wait_timeout
|
|
while wait_timeout > time.time():
|
|
time.sleep(wait_interval)
|
|
|
|
# Refresh the operation info
|
|
logs = oneandone_conn.list_logs(q="DELETE", period="LAST_HOUR", sort="-start_date")
|
|
|
|
if resource_type == OneAndOneResources.server:
|
|
_type = "VM"
|
|
elif resource_type == OneAndOneResources.private_network:
|
|
_type = "PRIVATENETWORK"
|
|
else:
|
|
raise Exception(f"Unsupported wait_for delete operation for {resource_type} resource")
|
|
|
|
for log in logs:
|
|
if (
|
|
log["resource"]["id"] == resource_id
|
|
and log["action"] == "DELETE"
|
|
and log["type"] == _type
|
|
and log["status"]["state"] == "OK"
|
|
):
|
|
return
|
|
raise Exception(f"Timed out waiting for {resource_type} deletion for {resource_id}")
|