1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 16:01:55 +00:00
community.general/plugins/modules/scaleway_compute.py
patchback[bot] b769b0bc01
[PR #11400/236b9c0e backport][stable-12] Sort imports with ruff check --fix (#11409)
Sort imports with ruff check --fix (#11400)

Sort imports with ruff check --fix.

(cherry picked from commit 236b9c0e04)

Co-authored-by: Felix Fontein <felix@fontein.de>
2026-01-09 19:36:52 +01:00

704 lines
23 KiB
Python

#!/usr/bin/python
#
# Scaleway Compute management module
#
# Copyright (C) 2018 Online SAS.
# https://www.scaleway.com
#
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
DOCUMENTATION = r"""
module: scaleway_compute
short_description: Scaleway compute management module
author: Remy Leone (@remyleone)
description:
- This module manages compute instances on Scaleway.
extends_documentation_fragment:
- community.general.scaleway
- community.general.attributes
- community.general.scaleway.actiongroup_scaleway
attributes:
check_mode:
support: full
diff_mode:
support: none
action_group:
version_added: 11.3.0
options:
public_ip:
type: str
description:
- Manage public IP on a Scaleway server.
- Could be Scaleway IP address UUID.
- V(dynamic) Means that IP is destroyed at the same time the host is destroyed.
- V(absent) Means no public IP at all.
default: absent
enable_ipv6:
description:
- Enable public IPv6 connectivity on the instance.
default: false
type: bool
image:
type: str
description:
- Image identifier used to start the instance with.
required: true
name:
type: str
description:
- Name of the instance.
organization:
type: str
description:
- Organization identifier.
- Exactly one of O(project) and O(organization) must be specified.
project:
type: str
description:
- Project identifier.
- Exactly one of O(project) and O(organization) must be specified.
version_added: 4.3.0
state:
type: str
description:
- Indicate desired state of the instance.
default: present
choices:
- present
- absent
- running
- restarted
- stopped
tags:
type: list
elements: str
description:
- List of tags to apply to the instance (5 max).
default: []
region:
type: str
description:
- Scaleway compute zone.
required: true
choices:
- ams1
- EMEA-NL-EVS
- ams2
- ams3
- par1
- EMEA-FR-PAR1
- par2
- EMEA-FR-PAR2
- par3
- waw1
- EMEA-PL-WAW1
- waw2
- waw3
commercial_type:
type: str
description:
- Commercial name of the compute node.
required: true
wait:
description:
- Wait for the instance to reach its desired state before returning.
type: bool
default: false
wait_timeout:
type: int
description:
- Time to wait for the server to reach the expected state.
default: 300
wait_sleep_time:
type: int
description:
- Time to wait before every attempt to check the state of the server.
default: 3
security_group:
type: str
description:
- Security group unique identifier.
- If no value provided, the default security group or current security group is used.
"""
EXAMPLES = r"""
- name: Create a server
community.general.scaleway_compute:
name: foobar
state: present
image: 89ee4018-f8c3-4dc4-a6b5-bca14f985ebe
project: 951df375-e094-4d26-97c1-ba548eeb9c42
region: ams1
commercial_type: VC1S
tags:
- test
- www
- name: Create a server attached to a security group
community.general.scaleway_compute:
name: foobar
state: present
image: 89ee4018-f8c3-4dc4-a6b5-bca14f985ebe
project: 951df375-e094-4d26-97c1-ba548eeb9c42
region: ams1
commercial_type: VC1S
security_group: 4a31b633-118e-4900-bd52-facf1085fc8d
tags:
- test
- www
- name: Destroy it right after
community.general.scaleway_compute:
name: foobar
state: absent
image: 89ee4018-f8c3-4dc4-a6b5-bca14f985ebe
project: 951df375-e094-4d26-97c1-ba548eeb9c42
region: ams1
commercial_type: VC1S
"""
RETURN = r"""
"""
import datetime
import time
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.datetime import now
from ansible_collections.community.general.plugins.module_utils.scaleway import (
SCALEWAY_LOCATION,
Scaleway,
scaleway_argument_spec,
)
SCALEWAY_SERVER_STATES = ("stopped", "stopping", "starting", "running", "locked")
SCALEWAY_TRANSITIONS_STATES = ("stopping", "starting", "pending")
def check_image_id(compute_api, image_id):
response = compute_api.get(path=f"images/{image_id}")
if not response.ok:
msg = f"Error in getting image {image_id} on {compute_api.module.params.get('api_url')} : {response.json}"
compute_api.module.fail_json(msg=msg)
def fetch_state(compute_api, server):
compute_api.module.debug(f"fetch_state of server: {server['id']}")
response = compute_api.get(path=f"servers/{server['id']}")
if response.status_code == 404:
return "absent"
if not response.ok:
msg = f"Error during state fetching: ({response.status_code}) {response.json}"
compute_api.module.fail_json(msg=msg)
try:
compute_api.module.debug(f"Server {server['id']} in state: {response.json['server']['state']}")
return response.json["server"]["state"]
except KeyError:
compute_api.module.fail_json(msg=f"Could not fetch state in {response.json}")
def wait_to_complete_state_transition(compute_api, server, wait=None):
if wait is None:
wait = compute_api.module.params["wait"]
if not wait:
return
wait_timeout = compute_api.module.params["wait_timeout"]
wait_sleep_time = compute_api.module.params["wait_sleep_time"]
start = now()
end = start + datetime.timedelta(seconds=wait_timeout)
while now() < end:
compute_api.module.debug("We are going to wait for the server to finish its transition")
if fetch_state(compute_api, server) not in SCALEWAY_TRANSITIONS_STATES:
compute_api.module.debug("It seems that the server is not in transition anymore.")
compute_api.module.debug(f"Server in state: {fetch_state(compute_api, server)}")
break
time.sleep(wait_sleep_time)
else:
compute_api.module.fail_json(msg="Server takes too long to finish its transition")
def public_ip_payload(compute_api, public_ip):
# We don't want a public ip
if public_ip in ("absent",):
return {"dynamic_ip_required": False}
# IP is only attached to the instance and is released as soon as the instance terminates
if public_ip in ("dynamic", "allocated"):
return {"dynamic_ip_required": True}
# We check that the IP we want to attach exists, if so its ID is returned
response = compute_api.get("ips")
if not response.ok:
msg = f"Error during public IP validation: ({response.status_code}) {response.json}"
compute_api.module.fail_json(msg=msg)
ip_list = []
try:
ip_list = response.json["ips"]
except KeyError:
compute_api.module.fail_json(msg=f"Error in getting the IP information from: {response.json}")
lookup = [ip["id"] for ip in ip_list]
if public_ip in lookup:
return {"public_ip": public_ip}
def create_server(compute_api, server):
compute_api.module.debug("Starting a create_server")
target_server = None
data = {
"enable_ipv6": server["enable_ipv6"],
"tags": server["tags"],
"commercial_type": server["commercial_type"],
"image": server["image"],
"dynamic_ip_required": server["dynamic_ip_required"],
"name": server["name"],
}
if server["project"]:
data["project"] = server["project"]
if server["organization"]:
data["organization"] = server["organization"]
if server["security_group"]:
data["security_group"] = server["security_group"]
response = compute_api.post(path="servers", data=data)
if not response.ok:
msg = f"Error during server creation: ({response.status_code}) {response.json}"
compute_api.module.fail_json(msg=msg)
try:
target_server = response.json["server"]
except KeyError:
compute_api.module.fail_json(msg=f"Error in getting the server information from: {response.json}")
wait_to_complete_state_transition(compute_api=compute_api, server=target_server)
return target_server
def restart_server(compute_api, server):
return perform_action(compute_api=compute_api, server=server, action="reboot")
def stop_server(compute_api, server):
return perform_action(compute_api=compute_api, server=server, action="poweroff")
def start_server(compute_api, server):
return perform_action(compute_api=compute_api, server=server, action="poweron")
def perform_action(compute_api, server, action):
response = compute_api.post(path=f"servers/{server['id']}/action", data={"action": action})
if not response.ok:
msg = f"Error during server {action}: ({response.status_code}) {response.json}"
compute_api.module.fail_json(msg=msg)
wait_to_complete_state_transition(compute_api=compute_api, server=server)
return response
def remove_server(compute_api, server):
compute_api.module.debug("Starting remove server strategy")
response = compute_api.delete(path=f"servers/{server['id']}")
if not response.ok:
msg = f"Error during server deletion: ({response.status_code}) {response.json}"
compute_api.module.fail_json(msg=msg)
wait_to_complete_state_transition(compute_api=compute_api, server=server)
return response
def present_strategy(compute_api, wished_server):
compute_api.module.debug("Starting present strategy")
changed = False
query_results = find(compute_api=compute_api, wished_server=wished_server, per_page=1)
if not query_results:
changed = True
if compute_api.module.check_mode:
return changed, {"status": "A server would be created."}
target_server = create_server(compute_api=compute_api, server=wished_server)
else:
target_server = query_results[0]
if server_attributes_should_be_changed(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
):
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} attributes would be changed."}
target_server = server_change_attributes(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
)
return changed, target_server
def absent_strategy(compute_api, wished_server):
compute_api.module.debug("Starting absent strategy")
changed = False
target_server = None
query_results = find(compute_api=compute_api, wished_server=wished_server, per_page=1)
if not query_results:
return changed, {"status": "Server already absent."}
else:
target_server = query_results[0]
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} would be made absent."}
# A server MUST be stopped to be deleted.
while fetch_state(compute_api=compute_api, server=target_server) != "stopped":
wait_to_complete_state_transition(compute_api=compute_api, server=target_server, wait=True)
response = stop_server(compute_api=compute_api, server=target_server)
if not response.ok:
err_msg = f"Error while stopping a server before removing it [{response.status_code}: {response.json}]"
compute_api.module.fail_json(msg=err_msg)
wait_to_complete_state_transition(compute_api=compute_api, server=target_server, wait=True)
response = remove_server(compute_api=compute_api, server=target_server)
if not response.ok:
err_msg = f"Error while removing server [{response.status_code}: {response.json}]"
compute_api.module.fail_json(msg=err_msg)
return changed, {"status": f"Server {target_server['id']} deleted"}
def running_strategy(compute_api, wished_server):
compute_api.module.debug("Starting running strategy")
changed = False
query_results = find(compute_api=compute_api, wished_server=wished_server, per_page=1)
if not query_results:
changed = True
if compute_api.module.check_mode:
return changed, {"status": "A server would be created before being run."}
target_server = create_server(compute_api=compute_api, server=wished_server)
else:
target_server = query_results[0]
if server_attributes_should_be_changed(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
):
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} attributes would be changed before running it."}
target_server = server_change_attributes(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
)
current_state = fetch_state(compute_api=compute_api, server=target_server)
if current_state not in ("running", "starting"):
compute_api.module.debug(f"running_strategy: Server in state: {current_state}")
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} attributes would be changed."}
response = start_server(compute_api=compute_api, server=target_server)
if not response.ok:
msg = f"Error while running server [{response.status_code}: {response.json}]"
compute_api.module.fail_json(msg=msg)
return changed, target_server
def stop_strategy(compute_api, wished_server):
compute_api.module.debug("Starting stop strategy")
query_results = find(compute_api=compute_api, wished_server=wished_server, per_page=1)
changed = False
if not query_results:
if compute_api.module.check_mode:
return changed, {"status": "A server would be created before being stopped."}
target_server = create_server(compute_api=compute_api, server=wished_server)
changed = True
else:
target_server = query_results[0]
compute_api.module.debug("stop_strategy: Servers are found.")
if server_attributes_should_be_changed(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
):
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} attributes would be changed before stopping it."}
target_server = server_change_attributes(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
)
wait_to_complete_state_transition(compute_api=compute_api, server=target_server)
current_state = fetch_state(compute_api=compute_api, server=target_server)
if current_state not in ("stopped",):
compute_api.module.debug(f"stop_strategy: Server in state: {current_state}")
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} would be stopped."}
response = stop_server(compute_api=compute_api, server=target_server)
compute_api.module.debug(response.json)
compute_api.module.debug(response.ok)
if not response.ok:
msg = f"Error while stopping server [{response.status_code}: {response.json}]"
compute_api.module.fail_json(msg=msg)
return changed, target_server
def restart_strategy(compute_api, wished_server):
compute_api.module.debug("Starting restart strategy")
changed = False
query_results = find(compute_api=compute_api, wished_server=wished_server, per_page=1)
if not query_results:
changed = True
if compute_api.module.check_mode:
return changed, {"status": "A server would be created before being rebooted."}
target_server = create_server(compute_api=compute_api, server=wished_server)
else:
target_server = query_results[0]
if server_attributes_should_be_changed(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
):
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} attributes would be changed before rebooting it."}
target_server = server_change_attributes(
compute_api=compute_api, target_server=target_server, wished_server=wished_server
)
changed = True
if compute_api.module.check_mode:
return changed, {"status": f"Server {target_server['id']} would be rebooted."}
wait_to_complete_state_transition(compute_api=compute_api, server=target_server)
if fetch_state(compute_api=compute_api, server=target_server) in ("running",):
response = restart_server(compute_api=compute_api, server=target_server)
wait_to_complete_state_transition(compute_api=compute_api, server=target_server)
if not response.ok:
msg = f"Error while restarting server that was running [{response.status_code}: {response.json}]."
compute_api.module.fail_json(msg=msg)
if fetch_state(compute_api=compute_api, server=target_server) in ("stopped",):
response = restart_server(compute_api=compute_api, server=target_server)
wait_to_complete_state_transition(compute_api=compute_api, server=target_server)
if not response.ok:
msg = f"Error while restarting server that was stopped [{response.status_code}: {response.json}]."
compute_api.module.fail_json(msg=msg)
return changed, target_server
state_strategy = {
"present": present_strategy,
"restarted": restart_strategy,
"stopped": stop_strategy,
"running": running_strategy,
"absent": absent_strategy,
}
def find(compute_api, wished_server, per_page=1):
compute_api.module.debug("Getting inside find")
# Only the name attribute is accepted in the Compute query API
response = compute_api.get("servers", params={"name": wished_server["name"], "per_page": per_page})
if not response.ok:
msg = f"Error during server search: ({response.status_code}) {response.json}"
compute_api.module.fail_json(msg=msg)
search_results = response.json["servers"]
return search_results
PATCH_MUTABLE_SERVER_ATTRIBUTES = (
"ipv6",
"tags",
"name",
"dynamic_ip_required",
"security_group",
)
def server_attributes_should_be_changed(compute_api, target_server, wished_server):
compute_api.module.debug("Checking if server attributes should be changed")
compute_api.module.debug(f"Current Server: {target_server}")
compute_api.module.debug(f"Wished Server: {wished_server}")
debug_dict = {
x: (target_server[x], wished_server[x])
for x in PATCH_MUTABLE_SERVER_ATTRIBUTES
if x in target_server and x in wished_server
}
compute_api.module.debug(f"Debug dict {debug_dict}")
try:
for key in PATCH_MUTABLE_SERVER_ATTRIBUTES:
if key in target_server and key in wished_server:
# When you are working with dict, only ID matter as we ask user to put only the resource ID in the playbook
if (
isinstance(target_server[key], dict)
and wished_server[key]
and "id" in target_server[key].keys()
and target_server[key]["id"] != wished_server[key]
):
return True
# Handling other structure compare simply the two objects content
elif not isinstance(target_server[key], dict) and target_server[key] != wished_server[key]:
return True
return False
except AttributeError:
compute_api.module.fail_json(msg="Error while checking if attributes should be changed")
def server_change_attributes(compute_api, target_server, wished_server):
compute_api.module.debug("Starting patching server attributes")
patch_payload = dict()
for key in PATCH_MUTABLE_SERVER_ATTRIBUTES:
if key in target_server and key in wished_server:
# When you are working with dict, only ID matter as we ask user to put only the resource ID in the playbook
if isinstance(target_server[key], dict) and "id" in target_server[key] and wished_server[key]:
# Setting all key to current value except ID
key_dict = {x: target_server[key][x] for x in target_server[key].keys() if x != "id"}
# Setting ID to the user specified ID
key_dict["id"] = wished_server[key]
patch_payload[key] = key_dict
elif not isinstance(target_server[key], dict):
patch_payload[key] = wished_server[key]
response = compute_api.patch(path=f"servers/{target_server['id']}", data=patch_payload)
if not response.ok:
msg = f"Error during server attributes patching: ({response.status_code}) {response.json}"
compute_api.module.fail_json(msg=msg)
try:
target_server = response.json["server"]
except KeyError:
compute_api.module.fail_json(msg=f"Error in getting the server information from: {response.json}")
wait_to_complete_state_transition(compute_api=compute_api, server=target_server)
return target_server
def core(module):
region = module.params["region"]
wished_server = {
"state": module.params["state"],
"image": module.params["image"],
"name": module.params["name"],
"commercial_type": module.params["commercial_type"],
"enable_ipv6": module.params["enable_ipv6"],
"tags": module.params["tags"],
"organization": module.params["organization"],
"project": module.params["project"],
"security_group": module.params["security_group"],
}
module.params["api_url"] = SCALEWAY_LOCATION[region]["api_endpoint"]
compute_api = Scaleway(module=module)
check_image_id(compute_api, wished_server["image"])
# IP parameters of the wished server depends on the configuration
ip_payload = public_ip_payload(compute_api=compute_api, public_ip=module.params["public_ip"])
wished_server.update(ip_payload)
changed, summary = state_strategy[wished_server["state"]](compute_api=compute_api, wished_server=wished_server)
module.exit_json(changed=changed, msg=summary)
def main():
argument_spec = scaleway_argument_spec()
argument_spec.update(
dict(
image=dict(required=True),
name=dict(),
region=dict(required=True, choices=list(SCALEWAY_LOCATION.keys())),
commercial_type=dict(required=True),
enable_ipv6=dict(default=False, type="bool"),
public_ip=dict(default="absent"),
state=dict(choices=list(state_strategy.keys()), default="present"),
tags=dict(type="list", elements="str", default=[]),
organization=dict(),
project=dict(),
wait=dict(type="bool", default=False),
wait_timeout=dict(type="int", default=300),
wait_sleep_time=dict(type="int", default=3),
security_group=dict(),
)
)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
mutually_exclusive=[
("organization", "project"),
],
required_one_of=[
("organization", "project"),
],
)
core(module)
if __name__ == "__main__":
main()