1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-03 23:41:51 +00:00
community.general/plugins/module_utils/manageiq.py
Alexei Znamensky 5b5f7e9e64
batch 1 - update Python idiom to 3.7 using pyupgrade (#11341)
* batch 1 - update Python idiom to 3.7 using pyupgrade

* add changelog frag

* add changelog frag
2025-12-30 16:15:24 +01:00

435 lines
15 KiB
Python

#
# Copyright (c) 2017, Daniel Korn <korndaniel1@gmail.com>
#
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause
from __future__ import annotations
import os
import traceback
import typing as t
from ansible.module_utils.basic import missing_required_lib
if t.TYPE_CHECKING:
from ansible.module_utils.basic import AnsibleModule
CLIENT_IMP_ERR = None
try:
from manageiq_client.api import ManageIQClient
HAS_CLIENT = True
except ImportError:
CLIENT_IMP_ERR = traceback.format_exc()
HAS_CLIENT = False
def manageiq_argument_spec() -> dict[str, t.Any]:
options = dict(
url=dict(default=os.environ.get("MIQ_URL", None)),
username=dict(default=os.environ.get("MIQ_USERNAME", None)),
password=dict(default=os.environ.get("MIQ_PASSWORD", None), no_log=True),
token=dict(default=os.environ.get("MIQ_TOKEN", None), no_log=True),
validate_certs=dict(default=True, type="bool", aliases=["verify_ssl"]),
ca_cert=dict(required=False, default=None, aliases=["ca_bundle_path"]),
)
return dict(
manageiq_connection=dict(type="dict", apply_defaults=True, options=options),
)
def check_client(module: AnsibleModule) -> None:
if not HAS_CLIENT:
module.fail_json(msg=missing_required_lib("manageiq-client"), exception=CLIENT_IMP_ERR)
def validate_connection_params(module: AnsibleModule) -> dict[str, t.Any]:
params: dict[str, t.Any] = module.params["manageiq_connection"]
error_str = "missing required argument: manageiq_connection[{}]"
url: str | None = params["url"]
token: str | None = params["token"]
username: str | None = params["username"]
password: str | None = params["password"]
if (url and username and password) or (url and token):
return params
for arg in ["url", "username", "password"]:
if params[arg] in (None, ""):
module.fail_json(msg=error_str.format(arg))
raise AssertionError("should be unreachable")
def manageiq_entities() -> dict[str, str]:
return {
"provider": "providers",
"host": "hosts",
"vm": "vms",
"category": "categories",
"cluster": "clusters",
"data store": "data_stores",
"group": "groups",
"resource pool": "resource_pools",
"service": "services",
"service template": "service_templates",
"template": "templates",
"tenant": "tenants",
"user": "users",
"blueprint": "blueprints",
}
class ManageIQ:
"""
class encapsulating ManageIQ API client.
"""
def __init__(self, module: AnsibleModule) -> None:
# handle import errors
check_client(module)
params = validate_connection_params(module)
url = params["url"]
username = params["username"]
password = params["password"]
token = params["token"]
verify_ssl = params["validate_certs"]
ca_bundle_path = params["ca_cert"]
self._module = module
self._api_url = f"{url}/api"
self._auth = dict(user=username, password=password, token=token)
try:
self._client = ManageIQClient(
self._api_url, self._auth, verify_ssl=verify_ssl, ca_bundle_path=ca_bundle_path
)
except Exception as e:
self.module.fail_json(msg=f"failed to open connection ({url}): {e}")
@property
def module(self) -> AnsibleModule:
"""Ansible module module
Returns:
the ansible module
"""
return self._module
@property
def api_url(self) -> str:
"""Base ManageIQ API
Returns:
the base ManageIQ API
"""
return self._api_url
@property
def client(self):
"""ManageIQ client
Returns:
the ManageIQ client
"""
return self._client
def find_collection_resource_by(self, collection_name, **params):
"""Searches the collection resource by the collection name and the param passed.
Returns:
the resource as an object if it exists in manageiq, None otherwise.
"""
try:
entity = self.client.collections.__getattribute__(collection_name).get(**params)
except ValueError:
return None
except Exception as e:
self.module.fail_json(msg=f"failed to find resource {e}")
return vars(entity)
def find_collection_resource_or_fail(self, collection_name, **params):
"""Searches the collection resource by the collection name and the param passed.
Returns:
the resource as an object if it exists in manageiq, Fail otherwise.
"""
resource = self.find_collection_resource_by(collection_name, **params)
if resource:
return resource
else:
msg = f"{collection_name} where {params} does not exist in manageiq"
self.module.fail_json(msg=msg)
def policies(self, resource_id, resource_type, resource_name):
manageiq = ManageIQ(self.module)
# query resource id, fail if resource does not exist
if resource_id is None:
resource_id = manageiq.find_collection_resource_or_fail(resource_type, name=resource_name)["id"]
return ManageIQPolicies(manageiq, resource_type, resource_id)
def query_resource_id(self, resource_type, resource_name):
"""Query the resource name in ManageIQ.
Returns:
the resource ID if it exists in ManageIQ, Fail otherwise.
"""
resource = self.find_collection_resource_by(resource_type, name=resource_name)
if resource:
return resource["id"]
else:
msg = f"{resource_name} {resource_type} does not exist in manageiq"
self.module.fail_json(msg=msg)
class ManageIQPolicies:
"""
Object to execute policies management operations of manageiq resources.
"""
def __init__(self, manageiq: ManageIQ, resource_type, resource_id):
self.manageiq = manageiq
self.module = self.manageiq.module
self.api_url = self.manageiq.api_url
self.client = self.manageiq.client
self.resource_type = resource_type
self.resource_id = resource_id
self.resource_url = f"{self.api_url}/{resource_type}/{resource_id}"
def query_profile_href(self, profile):
"""Add or Update the policy_profile href field
Example:
{name: STR, ...} => {name: STR, href: STR}
"""
resource = self.manageiq.find_collection_resource_or_fail("policy_profiles", **profile)
return dict(name=profile["name"], href=resource["href"])
def query_resource_profiles(self):
"""Returns a set of the profile objects objects assigned to the resource"""
url = "{resource_url}/policy_profiles?expand=resources"
try:
response = self.client.get(url.format(resource_url=self.resource_url))
except Exception as e:
msg = f"Failed to query {self.resource_type} policies: {e}"
self.module.fail_json(msg=msg)
resources = response.get("resources", [])
# clean the returned rest api profile object to look like:
# {profile_name: STR, profile_description: STR, policies: ARR<POLICIES>}
profiles = [self.clean_profile_object(profile) for profile in resources]
return profiles
def query_profile_policies(self, profile_id):
"""Returns a set of the policy objects assigned to the resource"""
url = "{api_url}/policy_profiles/{profile_id}?expand=policies"
try:
response = self.client.get(url.format(api_url=self.api_url, profile_id=profile_id))
except Exception as e:
msg = f"Failed to query {self.resource_type} policies: {e}"
self.module.fail_json(msg=msg)
resources = response.get("policies", [])
# clean the returned rest api policy object to look like:
# {name: STR, description: STR, active: BOOL}
policies = [self.clean_policy_object(policy) for policy in resources]
return policies
def clean_policy_object(self, policy):
"""Clean a policy object to have human readable form of:
{
name: STR,
description: STR,
active: BOOL
}
"""
name = policy.get("name")
description = policy.get("description")
active = policy.get("active")
return dict(name=name, description=description, active=active)
def clean_profile_object(self, profile):
"""Clean a profile object to have human readable form of:
{
profile_name: STR,
profile_description: STR,
policies: ARR<POLICIES>
}
"""
profile_id = profile["id"]
name = profile.get("name")
description = profile.get("description")
policies = self.query_profile_policies(profile_id)
return dict(profile_name=name, profile_description=description, policies=policies)
def profiles_to_update(self, profiles, action):
"""Create a list of policies we need to update in ManageIQ.
Returns:
Whether or not a change took place and a message describing the
operation executed.
"""
profiles_to_post = []
assigned_profiles = self.query_resource_profiles()
# make a list of assigned full profile names strings
# e.g. ['openscap profile', ...]
assigned_profiles_set = {profile["profile_name"] for profile in assigned_profiles}
for profile in profiles:
assigned = profile.get("name") in assigned_profiles_set
if (action == "unassign" and assigned) or (action == "assign" and not assigned):
# add/update the policy profile href field
# {name: STR, ...} => {name: STR, href: STR}
profile = self.query_profile_href(profile)
profiles_to_post.append(profile)
return profiles_to_post
def assign_or_unassign_profiles(self, profiles, action):
"""Perform assign/unassign action"""
# get a list of profiles needed to be changed
profiles_to_post = self.profiles_to_update(profiles, action)
if not profiles_to_post:
return dict(changed=False, msg=f"Profiles {profiles} already {action}ed, nothing to do")
# try to assign or unassign profiles to resource
url = f"{self.resource_url}/policy_profiles"
try:
response = self.client.post(url, action=action, resources=profiles_to_post)
except Exception as e:
msg = f"Failed to {action} profile: {e}"
self.module.fail_json(msg=msg)
# check all entities in result to be successful
for result in response["results"]:
if not result["success"]:
msg = f"Failed to {action}: {result['message']}"
self.module.fail_json(msg=msg)
# successfully changed all needed profiles
return dict(changed=True, msg=f"Successfully {action}ed profiles: {profiles}")
class ManageIQTags:
"""
Object to execute tags management operations of manageiq resources.
"""
def __init__(self, manageiq: ManageIQ, resource_type, resource_id):
self.manageiq = manageiq
self.module = self.manageiq.module
self.api_url = self.manageiq.api_url
self.client = self.manageiq.client
self.resource_type = resource_type
self.resource_id = resource_id
self.resource_url = f"{self.api_url}/{resource_type}/{resource_id}"
def full_tag_name(self, tag):
"""Returns the full tag name in manageiq"""
return f"/managed/{tag['category']}/{tag['name']}"
def clean_tag_object(self, tag):
"""Clean a tag object to have human readable form of:
{
full_name: STR,
name: STR,
display_name: STR,
category: STR
}
"""
full_name = tag.get("name")
categorization = tag.get("categorization", {})
return dict(
full_name=full_name,
name=categorization.get("name"),
display_name=categorization.get("display_name"),
category=categorization.get("category", {}).get("name"),
)
def query_resource_tags(self):
"""Returns a set of the tag objects assigned to the resource"""
url = "{resource_url}/tags?expand=resources&attributes=categorization"
try:
response = self.client.get(url.format(resource_url=self.resource_url))
except Exception as e:
msg = f"Failed to query {self.resource_type} tags: {e}"
self.module.fail_json(msg=msg)
resources = response.get("resources", [])
# clean the returned rest api tag object to look like:
# {full_name: STR, name: STR, display_name: STR, category: STR}
tags = [self.clean_tag_object(tag) for tag in resources]
return tags
def tags_to_update(self, tags, action):
"""Create a list of tags we need to update in ManageIQ.
Returns:
Whether or not a change took place and a message describing the
operation executed.
"""
tags_to_post = []
assigned_tags = self.query_resource_tags()
# make a list of assigned full tag names strings
# e.g. ['/managed/environment/prod', ...]
assigned_tags_set = {tag["full_name"] for tag in assigned_tags}
for tag in tags:
assigned = self.full_tag_name(tag) in assigned_tags_set
if assigned and action == "unassign":
tags_to_post.append(tag)
elif (not assigned) and action == "assign":
tags_to_post.append(tag)
return tags_to_post
def assign_or_unassign_tags(self, tags, action):
"""Perform assign/unassign action"""
# get a list of tags needed to be changed
tags_to_post = self.tags_to_update(tags, action)
if not tags_to_post:
return dict(changed=False, msg=f"Tags already {action}ed, nothing to do")
# try to assign or unassign tags to resource
url = f"{self.resource_url}/tags"
try:
response = self.client.post(url, action=action, resources=tags)
except Exception as e:
msg = f"Failed to {action} tag: {e}"
self.module.fail_json(msg=msg)
# check all entities in result to be successful
for result in response["results"]:
if not result["success"]:
msg = f"Failed to {action}: {result['message']}"
self.module.fail_json(msg=msg)
# successfully changed all needed tags
return dict(changed=True, msg=f"Successfully {action}ed tags")