1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-04-22 03:39:08 +00:00

Prepare main for 13.0.0 (#11834)

* Bump version to 13.0.0.

* Remove deprecated modules and plugins.

* Remove deprecated module utils.

* Remove leftovers.

* Remove mode=compatibility.

* Change default of is_pre740 from true to false.

* Change default of force_defaults from true to false.

* Remove support for ubuntu_legacy mechanism.

* Remove cpanm compatibility tests.
This commit is contained in:
Felix Fontein 2026-04-20 12:35:43 +02:00 committed by GitHub
parent 7ce198f0e7
commit 72c13c85ad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
62 changed files with 104 additions and 13052 deletions

View file

@ -1,222 +0,0 @@
#
# Copyright (c) 2016 Allen Sanabria, <asanabria@linuxdynasty.org>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
# This module utils is deprecated and will be removed in community.general 13.0.0
from __future__ import annotations
"""
This module adds shared support for generic cloud modules
In order to use this module, include it as part of a custom
module as shown below.
from ansible.module_utils.cloud import CloudRetry
The 'cloud' module provides the following common classes:
* CloudRetry
- The base class to be used by other cloud providers, in order to
provide a backoff/retry decorator based on status codes.
- Example using the AWSRetry class which inherits from CloudRetry.
@AWSRetry.exponential_backoff(retries=10, delay=3)
get_ec2_security_group_ids_from_names()
@AWSRetry.jittered_backoff()
get_ec2_security_group_ids_from_names()
"""
import random
import syslog
import time
from functools import wraps
def _exponential_backoff(retries=10, delay=2, backoff=2, max_delay=60):
"""Customizable exponential backoff strategy.
Args:
retries (int): Maximum number of times to retry a request.
delay (float): Initial (base) delay.
backoff (float): base of the exponent to use for exponential
backoff.
max_delay (int): Optional. If provided each delay generated is capped
at this amount. Defaults to 60 seconds.
Returns:
Callable that returns a generator. This generator yields durations in
seconds to be used as delays for an exponential backoff strategy.
Usage:
>>> backoff = _exponential_backoff()
>>> backoff
<function backoff_backoff at 0x7f0d939facf8>
>>> list(backoff())
[2, 4, 8, 16, 32, 60, 60, 60, 60, 60]
"""
def backoff_gen():
for retry in range(0, retries):
sleep = delay * backoff**retry
yield sleep if max_delay is None else min(sleep, max_delay)
return backoff_gen
def _full_jitter_backoff(retries=10, delay=3, max_delay=60, _random=random):
"""Implements the "Full Jitter" backoff strategy described here
https://www.awsarchitectureblog.com/2015/03/backoff.html
Args:
retries (int): Maximum number of times to retry a request.
delay (float): Approximate number of seconds to sleep for the first
retry.
max_delay (int): The maximum number of seconds to sleep for any retry.
_random (random.Random or None): Makes this generator testable by
allowing developers to explicitly pass in the a seeded Random.
Returns:
Callable that returns a generator. This generator yields durations in
seconds to be used as delays for a full jitter backoff strategy.
Usage:
>>> backoff = _full_jitter_backoff(retries=5)
>>> backoff
<function backoff_backoff at 0x7f0d939facf8>
>>> list(backoff())
[3, 6, 5, 23, 38]
>>> list(backoff())
[2, 1, 6, 6, 31]
"""
def backoff_gen():
for retry in range(0, retries):
yield _random.randint(0, min(max_delay, delay * 2**retry))
return backoff_gen
class CloudRetry:
"""CloudRetry can be used by any cloud provider, in order to implement a
backoff algorithm/retry effect based on Status Code from Exceptions.
"""
# This is the base class of the exception.
# AWS Example botocore.exceptions.ClientError
base_class = None
@staticmethod
def status_code_from_exception(error):
"""Return the status code from the exception object
Args:
error (object): The exception itself.
"""
pass
@staticmethod
def found(response_code, catch_extra_error_codes=None):
"""Return True if the Response Code to retry on was found.
Args:
response_code (str): This is the Response Code that is being matched against.
"""
pass
@classmethod
def _backoff(cls, backoff_strategy, catch_extra_error_codes=None):
"""Retry calling the Cloud decorated function using the provided
backoff strategy.
Args:
backoff_strategy (callable): Callable that returns a generator. The
generator should yield sleep times for each retry of the decorated
function.
"""
def deco(f):
@wraps(f)
def retry_func(*args, **kwargs):
for delay in backoff_strategy():
try:
return f(*args, **kwargs)
except Exception as e:
if isinstance(e, cls.base_class): # pylint: disable=isinstance-second-argument-not-valid-type
response_code = cls.status_code_from_exception(e)
if cls.found(response_code, catch_extra_error_codes):
msg = f"{e}: Retrying in {delay} seconds..."
syslog.syslog(syslog.LOG_INFO, msg)
time.sleep(delay)
else:
# Return original exception if exception is not a ClientError
raise e
else:
# Return original exception if exception is not a ClientError
raise e
return f(*args, **kwargs)
return retry_func # true decorator
return deco
@classmethod
def exponential_backoff(cls, retries=10, delay=3, backoff=2, max_delay=60, catch_extra_error_codes=None):
"""
Retry calling the Cloud decorated function using an exponential backoff.
Kwargs:
retries (int): Number of times to retry a failed request before giving up
default=10
delay (int or float): Initial delay between retries in seconds
default=3
backoff (int or float): backoff multiplier e.g. value of 2 will
double the delay each retry
default=1.1
max_delay (int or None): maximum amount of time to wait between retries.
default=60
"""
return cls._backoff(
_exponential_backoff(retries=retries, delay=delay, backoff=backoff, max_delay=max_delay),
catch_extra_error_codes,
)
@classmethod
def jittered_backoff(cls, retries=10, delay=3, max_delay=60, catch_extra_error_codes=None):
"""
Retry calling the Cloud decorated function using a jittered backoff
strategy. More on this strategy here:
https://www.awsarchitectureblog.com/2015/03/backoff.html
Kwargs:
retries (int): Number of times to retry a failed request before giving up
default=10
delay (int): Initial delay between retries in seconds
default=3
max_delay (int): maximum amount of time to wait between retries.
default=60
"""
return cls._backoff(
_full_jitter_backoff(retries=retries, delay=delay, max_delay=max_delay), catch_extra_error_codes
)
@classmethod
def backoff(cls, tries=10, delay=3, backoff=1.1, catch_extra_error_codes=None):
"""
Retry calling the Cloud decorated function using an exponential backoff.
Compatibility for the original implementation of CloudRetry.backoff that
did not provide configurable backoff strategies. Developers should use
CloudRetry.exponential_backoff instead.
Kwargs:
tries (int): Number of times to try (not retry) before giving up
default=10
delay (int or float): Initial delay between retries in seconds
default=3
backoff (int or float): backoff multiplier e.g. value of 2 will
double the delay each retry
default=1.1
"""
return cls.exponential_backoff(
retries=tries - 1,
delay=delay,
backoff=backoff,
max_delay=None,
catch_extra_error_codes=catch_extra_error_codes,
)

View file

@ -1,194 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Copyright (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
#
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause
# This module utils is deprecated and will be removed in community.general 13.0.0
from __future__ import annotations
import re
import typing as t
if t.TYPE_CHECKING:
from ansible.module_utils.basic import AnsibleModule
# Input patterns for is_input_dangerous function:
#
# 1. '"' in string and '--' in string or
# "'" in string and '--' in string
PATTERN_1 = re.compile(r"(\'|\").*--")
# 2. union \ intersect \ except + select
PATTERN_2 = re.compile(r"(UNION|INTERSECT|EXCEPT).*SELECT", re.IGNORECASE)
# 3. ';' and any KEY_WORDS
PATTERN_3 = re.compile(r";.*(SELECT|UPDATE|INSERT|DELETE|DROP|TRUNCATE|ALTER)", re.IGNORECASE)
class SQLParseError(Exception):
pass
class UnclosedQuoteError(SQLParseError):
pass
# maps a type of identifier to the maximum number of dot levels that are
# allowed to specify that identifier. For example, a database column can be
# specified by up to 4 levels: database.schema.table.column
_PG_IDENTIFIER_TO_DOT_LEVEL = dict(
database=1,
schema=2,
table=3,
column=4,
role=1,
tablespace=1,
sequence=3,
publication=1,
)
_MYSQL_IDENTIFIER_TO_DOT_LEVEL = dict(database=1, table=2, column=3, role=1, vars=1)
def _find_end_quote(identifier, quote_char):
accumulate = 0
while True:
try:
quote = identifier.index(quote_char)
except ValueError as e:
raise UnclosedQuoteError from e
accumulate = accumulate + quote
try:
next_char = identifier[quote + 1]
except IndexError:
return accumulate
if next_char == quote_char:
try:
identifier = identifier[quote + 2 :]
accumulate = accumulate + 2
except IndexError as e:
raise UnclosedQuoteError from e
else:
return accumulate
def _identifier_parse(identifier, quote_char):
if not identifier:
raise SQLParseError("Identifier name unspecified or unquoted trailing dot")
already_quoted = False
if identifier.startswith(quote_char):
already_quoted = True
try:
end_quote = _find_end_quote(identifier[1:], quote_char=quote_char) + 1
except UnclosedQuoteError:
already_quoted = False
else:
if end_quote < len(identifier) - 1:
if identifier[end_quote + 1] == ".":
dot = end_quote + 1
first_identifier = identifier[:dot]
next_identifier = identifier[dot + 1 :]
further_identifiers = _identifier_parse(next_identifier, quote_char)
further_identifiers.insert(0, first_identifier)
else:
raise SQLParseError("User escaped identifiers must escape extra quotes")
else:
further_identifiers = [identifier]
if not already_quoted:
try:
dot = identifier.index(".")
except ValueError:
identifier = identifier.replace(quote_char, quote_char * 2)
identifier = f"{quote_char}{identifier}{quote_char}"
further_identifiers = [identifier]
else:
if dot == 0 or dot >= len(identifier) - 1:
identifier = identifier.replace(quote_char, quote_char * 2)
identifier = f"{quote_char}{identifier}{quote_char}"
further_identifiers = [identifier]
else:
first_identifier = identifier[:dot]
next_identifier = identifier[dot + 1 :]
further_identifiers = _identifier_parse(next_identifier, quote_char)
first_identifier = first_identifier.replace(quote_char, quote_char * 2)
first_identifier = f"{quote_char}{first_identifier}{quote_char}"
further_identifiers.insert(0, first_identifier)
return further_identifiers
def pg_quote_identifier(identifier, id_type):
identifier_fragments = _identifier_parse(identifier, quote_char='"')
if len(identifier_fragments) > _PG_IDENTIFIER_TO_DOT_LEVEL[id_type]:
raise SQLParseError(
f"PostgreSQL does not support {id_type} with more than {_PG_IDENTIFIER_TO_DOT_LEVEL[id_type]} dots"
)
return ".".join(identifier_fragments)
def mysql_quote_identifier(identifier, id_type):
identifier_fragments = _identifier_parse(identifier, quote_char="`")
if (len(identifier_fragments) - 1) > _MYSQL_IDENTIFIER_TO_DOT_LEVEL[id_type]:
raise SQLParseError(
f"MySQL does not support {id_type} with more than {_MYSQL_IDENTIFIER_TO_DOT_LEVEL[id_type]} dots"
)
special_cased_fragments = []
for fragment in identifier_fragments:
if fragment == "`*`":
special_cased_fragments.append("*")
else:
special_cased_fragments.append(fragment)
return ".".join(special_cased_fragments)
def is_input_dangerous(string):
"""Check if the passed string is potentially dangerous.
Can be used to prevent SQL injections.
Note: use this function only when you can't use
psycopg2's cursor.execute method parametrized
(typically with DDL queries).
"""
if not string:
return False
return any(pattern.search(string) for pattern in (PATTERN_1, PATTERN_2, PATTERN_3))
def check_input(module: AnsibleModule, *args) -> None:
"""Wrapper for is_input_dangerous function."""
needs_to_check = args
dangerous_elements = []
for elem in needs_to_check:
if isinstance(elem, str):
if is_input_dangerous(elem):
dangerous_elements.append(elem)
elif isinstance(elem, list):
for e in elem:
if is_input_dangerous(e):
dangerous_elements.append(e)
elif elem is None or isinstance(elem, bool):
pass
else:
elem = str(elem)
if is_input_dangerous(elem):
dangerous_elements.append(elem)
if dangerous_elements:
module.fail_json(msg=f"Passed input '{', '.join(dangerous_elements)}' is potentially dangerous")

View file

@ -1,331 +0,0 @@
#
# Copyright (c) 2016 Dimension Data
#
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
#
# Authors:
# - Aimon Bustardo <aimon.bustardo@dimensiondata.com>
# - Mark Maglana <mmaglana@gmail.com>
# - Adam Friedman <tintoy@tintoy.io>
#
# Common functionality to be used by various module components
from __future__ import annotations
#
# DEPRECATED
#
# This module utils is deprecated and will be removed in community.general 13.0.0
#
import configparser
import os
import re
import traceback
from os.path import expanduser
from uuid import UUID
from ansible.module_utils.basic import AnsibleModule, missing_required_lib # noqa: F401, pylint: disable=unused-import
LIBCLOUD_IMP_ERR = None
try:
import libcloud.security
from libcloud.common.dimensiondata import ( # noqa: F401, pylint: disable=unused-import
API_ENDPOINTS,
DimensionDataAPIException,
DimensionDataStatus,
)
from libcloud.compute.base import Node, NodeLocation # noqa: F401, pylint: disable=unused-import
from libcloud.compute.providers import get_driver
from libcloud.compute.types import Provider
HAS_LIBCLOUD = True
except ImportError:
LIBCLOUD_IMP_ERR = traceback.format_exc()
HAS_LIBCLOUD = False
# MCP 2.x version pattern for location (datacenter) names.
#
# Note that this is not a totally reliable way of determining MCP version.
# Unfortunately, libcloud's NodeLocation currently makes no provision for extended properties.
# At some point we may therefore want to either enhance libcloud or enable overriding mcp_version
# by specifying it in the module parameters.
MCP_2_LOCATION_NAME_PATTERN = re.compile(r".*MCP\s?2.*")
class DimensionDataModule:
"""
The base class containing common functionality used by Dimension Data modules for Ansible.
"""
def __init__(self, module: AnsibleModule) -> None:
"""
Create a new DimensionDataModule.
Will fail if Apache libcloud is not present.
:param module: The underlying Ansible module.
:type module: AnsibleModule
"""
self.module = module
if not HAS_LIBCLOUD:
self.module.fail_json(msg=missing_required_lib("libcloud"), exception=LIBCLOUD_IMP_ERR)
# Credentials are common to all Dimension Data modules.
credentials = self.get_credentials()
self.user_id = credentials["user_id"]
self.key = credentials["key"]
# Region and location are common to all Dimension Data modules.
region = self.module.params["region"]
self.region = f"dd-{region}"
self.location = self.module.params["location"]
libcloud.security.VERIFY_SSL_CERT = self.module.params["validate_certs"]
self.driver = get_driver(Provider.DIMENSIONDATA)(self.user_id, self.key, region=self.region)
# Determine the MCP API version (this depends on the target datacenter).
self.mcp_version = self.get_mcp_version(self.location)
# Optional "wait-for-completion" arguments
if "wait" in self.module.params:
self.wait = self.module.params["wait"]
self.wait_time = self.module.params["wait_time"]
self.wait_poll_interval = self.module.params["wait_poll_interval"]
else:
self.wait = False
self.wait_time = 0
self.wait_poll_interval = 0
def get_credentials(self):
"""
Get user_id and key from module configuration, environment, or dotfile.
Order of priority is module, environment, dotfile.
To set in environment:
export MCP_USER='myusername'
export MCP_PASSWORD='mypassword'
To set in dot file place a file at ~/.dimensiondata with
the following contents:
[dimensiondatacloud]
MCP_USER: myusername
MCP_PASSWORD: mypassword
"""
if not HAS_LIBCLOUD:
self.module.fail_json(msg="libcloud is required for this module.")
user_id = None
key = None
# First, try the module configuration
if "mcp_user" in self.module.params:
if "mcp_password" not in self.module.params:
self.module.fail_json(
msg='"mcp_user" parameter was specified, but not "mcp_password" (either both must be specified, or neither).'
)
user_id = self.module.params["mcp_user"]
key = self.module.params["mcp_password"]
# Fall back to environment
if not user_id or not key:
user_id = os.environ.get("MCP_USER", None)
key = os.environ.get("MCP_PASSWORD", None)
# Finally, try dotfile (~/.dimensiondata)
if not user_id or not key:
home = expanduser("~")
config = configparser.RawConfigParser()
config.read(f"{home}/.dimensiondata")
try:
user_id = config.get("dimensiondatacloud", "MCP_USER")
key = config.get("dimensiondatacloud", "MCP_PASSWORD")
except (configparser.NoSectionError, configparser.NoOptionError):
pass
# One or more credentials not found. Function can't recover from this
# so it has to raise an error instead of fail silently.
if not user_id:
raise MissingCredentialsError("Dimension Data user id not found")
elif not key:
raise MissingCredentialsError("Dimension Data key not found")
# Both found, return data
return dict(user_id=user_id, key=key)
def get_mcp_version(self, location):
"""
Get the MCP version for the specified location.
"""
location = self.driver.ex_get_location_by_id(location)
if MCP_2_LOCATION_NAME_PATTERN.match(location.name):
return "2.0"
return "1.0"
def get_network_domain(self, locator, location):
"""
Retrieve a network domain by its name or Id.
"""
if is_uuid(locator):
network_domain = self.driver.ex_get_network_domain(locator)
else:
matching_network_domains = [
network_domain
for network_domain in self.driver.ex_list_network_domains(location=location)
if network_domain.name == locator
]
if matching_network_domains:
network_domain = matching_network_domains[0]
else:
network_domain = None
if network_domain:
return network_domain
raise UnknownNetworkError(f"Network '{locator}' could not be found")
def get_vlan(self, locator, location, network_domain):
"""
Get a VLAN object by its name or id
"""
if is_uuid(locator):
vlan = self.driver.ex_get_vlan(locator)
else:
matching_vlans = [
vlan for vlan in self.driver.ex_list_vlans(location, network_domain) if vlan.name == locator
]
if matching_vlans:
vlan = matching_vlans[0]
else:
vlan = None
if vlan:
return vlan
raise UnknownVLANError(f"VLAN '{locator}' could not be found")
@staticmethod
def argument_spec(**additional_argument_spec):
"""
Build an argument specification for a Dimension Data module.
:param additional_argument_spec: An optional dictionary representing the specification for additional module arguments (if any).
:return: A dict containing the argument specification.
"""
spec = dict(
region=dict(type="str", default="na"),
mcp_user=dict(type="str", required=False),
mcp_password=dict(type="str", required=False, no_log=True),
location=dict(type="str", required=True),
validate_certs=dict(type="bool", required=False, default=True),
)
if additional_argument_spec:
spec.update(additional_argument_spec)
return spec
@staticmethod
def argument_spec_with_wait(**additional_argument_spec):
"""
Build an argument specification for a Dimension Data module that includes "wait for completion" arguments.
:param additional_argument_spec: An optional dictionary representing the specification for additional module arguments (if any).
:return: A dict containing the argument specification.
"""
spec = DimensionDataModule.argument_spec(
wait=dict(type="bool", required=False, default=False),
wait_time=dict(type="int", required=False, default=600),
wait_poll_interval=dict(type="int", required=False, default=2),
)
if additional_argument_spec:
spec.update(additional_argument_spec)
return spec
@staticmethod
def required_together(*additional_required_together):
"""
Get the basic argument specification for Dimension Data modules indicating which arguments are must be specified together.
:param additional_required_together: An optional list representing the specification for additional module arguments that must be specified together.
:return: An array containing the argument specifications.
"""
required_together = [["mcp_user", "mcp_password"]]
if additional_required_together:
required_together.extend(additional_required_together)
return required_together
class LibcloudNotFound(Exception):
"""
Exception raised when Apache libcloud cannot be found.
"""
pass
class MissingCredentialsError(Exception):
"""
Exception raised when credentials for Dimension Data CloudControl cannot be found.
"""
pass
class UnknownNetworkError(Exception):
"""
Exception raised when a network or network domain cannot be found.
"""
pass
class UnknownVLANError(Exception):
"""
Exception raised when a VLAN cannot be found.
"""
pass
def get_dd_regions():
"""
Get the list of available regions whose vendor is Dimension Data.
"""
# Get endpoints
all_regions = API_ENDPOINTS.keys()
# Only Dimension Data endpoints (no prefix)
regions = [region[3:] for region in all_regions if region.startswith("dd-")]
return regions
def is_uuid(u, version=4):
"""
Test if valid v4 UUID
"""
try:
uuid_obj = UUID(u, version=version)
return str(uuid_obj) == u
except ValueError:
return False

View file

@ -74,10 +74,6 @@ _django_std_arg_fmts: dict[str, ArgFormatter] = dict(
# keys can be used in _django_args
_args_menu = dict(
std=(django_std_args, _django_std_arg_fmts),
database=(_database_dash, {"database": _django_std_arg_fmts["database_dash"]}), # deprecate, remove in 13.0.0
noinput=({}, {"noinput": cmd_runner_fmt.as_fixed("--noinput")}), # deprecate, remove in 13.0.0
dry_run=({}, {"dry_run": cmd_runner_fmt.as_bool("--dry-run")}), # deprecate, remove in 13.0.0
check=({}, {"check": cmd_runner_fmt.as_bool("--check")}), # deprecate, remove in 13.0.0
database_dash=(_database_dash, {}),
data=(_data, {}),
)

View file

@ -3019,10 +3019,6 @@ class KeycloakAPI:
except Exception as e:
self.fail_request(e, msg=f"Could not get groups for user {user_id} in realm {realm}: {e}")
def add_user_in_group(self, user_id, group_id, realm: str = "master"):
"""DEPRECATED: Call add_user_to_group(...) instead. This method is scheduled for removal in community.general 13.0.0."""
return self.add_user_to_group(user_id, group_id, realm)
def add_user_to_group(self, user_id, group_id, realm: str = "master"):
"""
Add a user to a group.

View file

@ -1,171 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Copyright (c), Michael DeHaan <michael.dehaan@gmail.com>, 2012-2013
#
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause
# This module utils is deprecated and will be removed in community.general 13.0.0
from __future__ import annotations
import hmac
import os
import re
from urllib.parse import urlparse
try:
from hashlib import sha1
except ImportError:
import sha as sha1 # type: ignore[no-redef]
HASHED_KEY_MAGIC = "|1|"
def is_ssh_url(url):
"""check if url is ssh"""
if "@" in url and "://" not in url:
return True
return any(url.startswith(scheme) for scheme in ("ssh://", "git+ssh://", "ssh+git://"))
def get_fqdn_and_port(repo_url):
"""chop the hostname and port out of a url"""
fqdn = None
port = None
ipv6_re = re.compile(r"(\[[^]]*\])(?::([0-9]+))?")
if "@" in repo_url and "://" not in repo_url:
# most likely an user@host:path or user@host/path type URL
repo_url = repo_url.split("@", 1)[1]
match = ipv6_re.match(repo_url)
# For this type of URL, colon specifies the path, not the port
if match:
fqdn, path = match.groups()
elif ":" in repo_url:
fqdn = repo_url.split(":")[0]
elif "/" in repo_url:
fqdn = repo_url.split("/")[0]
elif "://" in repo_url:
# this should be something we can parse with urlparse
parts = urlparse(repo_url)
fqdn = parts[1]
if "@" in fqdn:
fqdn = fqdn.split("@", 1)[1]
match = ipv6_re.match(fqdn)
if match:
fqdn, port = match.groups()
elif ":" in fqdn:
fqdn, port = fqdn.split(":")[0:2]
return fqdn, port
def check_hostkey(module, fqdn):
return not not_in_host_file(module, fqdn)
# this is a variant of code found in connection_plugins/paramiko.py and we should modify
# the paramiko code to import and use this.
def not_in_host_file(self, host):
if "USER" in os.environ:
user_host_file = os.path.expandvars("~${USER}/.ssh/known_hosts")
else:
user_host_file = "~/.ssh/known_hosts"
user_host_file = os.path.expanduser(user_host_file)
host_file_list = [
user_host_file,
"/etc/ssh/ssh_known_hosts",
"/etc/ssh/ssh_known_hosts2",
"/etc/openssh/ssh_known_hosts",
]
hfiles_not_found = 0
for hf in host_file_list:
if not os.path.exists(hf):
hfiles_not_found += 1
continue
try:
with open(hf) as host_fh:
data = host_fh.read()
except OSError:
hfiles_not_found += 1
continue
for line in data.split("\n"):
if line is None or " " not in line:
continue
tokens = line.split()
if tokens[0].find(HASHED_KEY_MAGIC) == 0:
# this is a hashed known host entry
try:
(kn_salt, kn_host) = tokens[0][len(HASHED_KEY_MAGIC) :].split("|", 2)
hash = hmac.new(kn_salt.decode("base64"), digestmod=sha1)
hash.update(host)
if hash.digest() == kn_host.decode("base64"):
return False
except Exception:
# invalid hashed host key, skip it
continue
else:
# standard host file entry
if host in tokens[0]:
return False
return True
def add_host_key(module, fqdn, port=22, key_type="rsa", create_dir=False):
"""use ssh-keyscan to add the hostkey"""
keyscan_cmd = module.get_bin_path("ssh-keyscan", True)
if "USER" in os.environ:
user_ssh_dir = os.path.expandvars("~${USER}/.ssh/")
user_host_file = os.path.expandvars("~${USER}/.ssh/known_hosts")
else:
user_ssh_dir = "~/.ssh/"
user_host_file = "~/.ssh/known_hosts"
user_ssh_dir = os.path.expanduser(user_ssh_dir)
if not os.path.exists(user_ssh_dir):
if create_dir:
try:
os.makedirs(user_ssh_dir, int("700", 8))
except Exception:
module.fail_json(msg=f"failed to create host key directory: {user_ssh_dir}")
else:
module.fail_json(msg=f"{user_ssh_dir} does not exist")
elif not os.path.isdir(user_ssh_dir):
module.fail_json(msg=f"{user_ssh_dir} is not a directory")
if port:
this_cmd = f"{keyscan_cmd} -t {key_type} -p {port} {fqdn}"
else:
this_cmd = f"{keyscan_cmd} -t {key_type} {fqdn}"
rc, out, err = module.run_command(this_cmd, environ_update={"LANGUAGE": "C", "LC_ALL": "C"})
# ssh-keyscan gives a 0 exit code and prints nothing on timeout
if rc != 0 or not out:
msg = "failed to retrieve hostkey"
if not out:
msg += f'. "{this_cmd}" returned no matches.'
else:
msg += f' using command "{this_cmd}". [stdout]: {out}'
if err:
msg += f" [stderr]: {err}"
module.fail_json(msg=msg)
module.append_to_file(user_host_file, out)
return rc, out, err

View file

@ -1,245 +0,0 @@
# Copyright (c) Ansible project
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause
from __future__ import annotations
import time
#
# DEPRECATED
#
# This module utils is deprecated and will be removed in community.general 13.0.0
#
class OneAndOneResources:
firewall_policy = "firewall_policy"
load_balancer = "load_balancer"
monitoring_policy = "monitoring_policy"
private_network = "private_network"
public_ip = "public_ip"
role = "role"
server = "server"
user = "user"
vpn = "vpn"
def get_resource(oneandone_conn, resource_type, resource_id):
switcher = {
"firewall_policy": oneandone_conn.get_firewall,
"load_balancer": oneandone_conn.get_load_balancer,
"monitoring_policy": oneandone_conn.get_monitoring_policy,
"private_network": oneandone_conn.get_private_network,
"public_ip": oneandone_conn.get_public_ip,
"role": oneandone_conn.get_role,
"server": oneandone_conn.get_server,
"user": oneandone_conn.get_user,
"vpn": oneandone_conn.get_vpn,
}
return switcher.get(resource_type)(resource_id)
def get_datacenter(oneandone_conn, datacenter, full_object=False):
"""
Validates the datacenter exists by ID or country code.
Returns the datacenter ID.
"""
for _datacenter in oneandone_conn.list_datacenters():
if datacenter in (_datacenter["id"], _datacenter["country_code"]):
if full_object:
return _datacenter
return _datacenter["id"]
def get_fixed_instance_size(oneandone_conn, fixed_instance_size, full_object=False):
"""
Validates the fixed instance size exists by ID or name.
Return the instance size ID.
"""
for _fixed_instance_size in oneandone_conn.fixed_server_flavors():
if fixed_instance_size in (_fixed_instance_size["id"], _fixed_instance_size["name"]):
if full_object:
return _fixed_instance_size
return _fixed_instance_size["id"]
def get_appliance(oneandone_conn, appliance, full_object=False):
"""
Validates the appliance exists by ID or name.
Return the appliance ID.
"""
for _appliance in oneandone_conn.list_appliances(q="IMAGE"):
if appliance in (_appliance["id"], _appliance["name"]):
if full_object:
return _appliance
return _appliance["id"]
def get_private_network(oneandone_conn, private_network, full_object=False):
"""
Validates the private network exists by ID or name.
Return the private network ID.
"""
for _private_network in oneandone_conn.list_private_networks():
if private_network in (_private_network["name"], _private_network["id"]):
if full_object:
return _private_network
return _private_network["id"]
def get_monitoring_policy(oneandone_conn, monitoring_policy, full_object=False):
"""
Validates the monitoring policy exists by ID or name.
Return the monitoring policy ID.
"""
for _monitoring_policy in oneandone_conn.list_monitoring_policies():
if monitoring_policy in (_monitoring_policy["name"], _monitoring_policy["id"]):
if full_object:
return _monitoring_policy
return _monitoring_policy["id"]
def get_firewall_policy(oneandone_conn, firewall_policy, full_object=False):
"""
Validates the firewall policy exists by ID or name.
Return the firewall policy ID.
"""
for _firewall_policy in oneandone_conn.list_firewall_policies():
if firewall_policy in (_firewall_policy["name"], _firewall_policy["id"]):
if full_object:
return _firewall_policy
return _firewall_policy["id"]
def get_load_balancer(oneandone_conn, load_balancer, full_object=False):
"""
Validates the load balancer exists by ID or name.
Return the load balancer ID.
"""
for _load_balancer in oneandone_conn.list_load_balancers():
if load_balancer in (_load_balancer["name"], _load_balancer["id"]):
if full_object:
return _load_balancer
return _load_balancer["id"]
def get_server(oneandone_conn, instance, full_object=False):
"""
Validates that the server exists whether by ID or name.
Returns the server if one was found.
"""
for server in oneandone_conn.list_servers(per_page=1000):
if instance in (server["id"], server["name"]):
if full_object:
return server
return server["id"]
def get_user(oneandone_conn, user, full_object=False):
"""
Validates that the user exists by ID or a name.
Returns the user if one was found.
"""
for _user in oneandone_conn.list_users(per_page=1000):
if user in (_user["id"], _user["name"]):
if full_object:
return _user
return _user["id"]
def get_role(oneandone_conn, role, full_object=False):
"""
Given a name, validates that the role exists
whether it is a proper ID or a name.
Returns the role if one was found, else None.
"""
for _role in oneandone_conn.list_roles(per_page=1000):
if role in (_role["id"], _role["name"]):
if full_object:
return _role
return _role["id"]
def get_vpn(oneandone_conn, vpn, full_object=False):
"""
Validates that the vpn exists by ID or a name.
Returns the vpn if one was found.
"""
for _vpn in oneandone_conn.list_vpns(per_page=1000):
if vpn in (_vpn["id"], _vpn["name"]):
if full_object:
return _vpn
return _vpn["id"]
def get_public_ip(oneandone_conn, public_ip, full_object=False):
"""
Validates that the public ip exists by ID or a name.
Returns the public ip if one was found.
"""
for _public_ip in oneandone_conn.list_public_ips(per_page=1000):
if public_ip in (_public_ip["id"], _public_ip["ip"]):
if full_object:
return _public_ip
return _public_ip["id"]
def wait_for_resource_creation_completion(oneandone_conn, resource_type, resource_id, wait_timeout, wait_interval):
"""
Waits for the resource create operation to complete based on the timeout period.
"""
wait_timeout = time.time() + wait_timeout
while wait_timeout > time.time():
time.sleep(wait_interval)
# Refresh the resource info
resource = get_resource(oneandone_conn, resource_type, resource_id)
if resource_type == OneAndOneResources.server:
resource_state = resource["status"]["state"]
else:
resource_state = resource["state"]
if (resource_type == OneAndOneResources.server and resource_state.lower() == "powered_on") or (
resource_type != OneAndOneResources.server and resource_state.lower() == "active"
):
return
elif resource_state.lower() == "failed":
raise Exception(f"{resource_type} creation failed for {resource_id}")
elif resource_state.lower() in ("active", "enabled", "deploying", "configuring"):
continue
else:
raise Exception(f"Unknown {resource_type} state {resource_state}")
raise Exception(f"Timed out waiting for {resource_type} completion for {resource_id}")
def wait_for_resource_deletion_completion(oneandone_conn, resource_type, resource_id, wait_timeout, wait_interval):
"""
Waits for the resource delete operation to complete based on the timeout period.
"""
wait_timeout = time.time() + wait_timeout
while wait_timeout > time.time():
time.sleep(wait_interval)
# Refresh the operation info
logs = oneandone_conn.list_logs(q="DELETE", period="LAST_HOUR", sort="-start_date")
if resource_type == OneAndOneResources.server:
_type = "VM"
elif resource_type == OneAndOneResources.private_network:
_type = "PRIVATENETWORK"
else:
raise Exception(f"Unsupported wait_for delete operation for {resource_type} resource")
for log in logs:
if (
log["resource"]["id"] == resource_id
and log["action"] == "DELETE"
and log["type"] == _type
and log["status"]["state"] == "OK"
):
return
raise Exception(f"Timed out waiting for {resource_type} deletion for {resource_id}")

File diff suppressed because it is too large Load diff

View file

@ -104,22 +104,3 @@ def make_process_dict(include_injected, include_deps=False):
return results, raw_data
return process_dict
def make_process_list(mod_helper, **kwargs):
#
# ATTENTION!
#
# The function `make_process_list()` is deprecated and will be removed in community.general 13.0.0
#
process_dict = make_process_dict(mod_helper, **kwargs)
def process_list(rc, out, err):
res_dict, raw_data = process_dict(rc, out, err)
if kwargs.get("include_raw"):
mod_helper.vars.raw_output = raw_data
return [entry for name, entry in res_dict.items() if name == kwargs.get("name")]
return process_list

View file

@ -1,171 +0,0 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
# Copyright (c) 2020, Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru>
#
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause
# This module utils is deprecated and will be removed in community.general 13.0.0
from __future__ import annotations
from stringprep import (
in_table_a1,
in_table_b1,
in_table_c3,
in_table_c4,
in_table_c5,
in_table_c6,
in_table_c7,
in_table_c8,
in_table_c9,
in_table_c12,
in_table_c21_c22,
in_table_d1,
in_table_d2,
)
from unicodedata import normalize
def is_unicode_str(string):
return True if isinstance(string, str) else False
def mapping_profile(string):
"""RFC4013 Mapping profile implementation."""
# Regarding RFC4013,
# This profile specifies:
# - non-ASCII space characters [StringPrep, C.1.2] that can be
# mapped to SPACE (U+0020), and
# - the "commonly mapped to nothing" characters [StringPrep, B.1]
# that can be mapped to nothing.
tmp = []
for c in string:
# If not the "commonly mapped to nothing"
if not in_table_b1(c):
if in_table_c12(c):
# map non-ASCII space characters
# (that can be mapped) to Unicode space
tmp.append(" ")
else:
tmp.append(c)
return "".join(tmp)
def is_ral_string(string):
"""RFC3454 Check bidirectional category of the string"""
# Regarding RFC3454,
# Table D.1 lists the characters that belong
# to Unicode bidirectional categories "R" and "AL".
# If a string contains any RandALCat character, a RandALCat
# character MUST be the first character of the string, and a
# RandALCat character MUST be the last character of the string.
if in_table_d1(string[0]):
if not in_table_d1(string[-1]):
raise ValueError("RFC3454: incorrect bidirectional RandALCat string.")
return True
return False
def prohibited_output_profile(string):
"""RFC4013 Prohibited output profile implementation."""
# Implements:
# RFC4013, 2.3. Prohibited Output.
# This profile specifies the following characters as prohibited input:
# - Non-ASCII space characters [StringPrep, C.1.2]
# - ASCII control characters [StringPrep, C.2.1]
# - Non-ASCII control characters [StringPrep, C.2.2]
# - Private Use characters [StringPrep, C.3]
# - Non-character code points [StringPrep, C.4]
# - Surrogate code points [StringPrep, C.5]
# - Inappropriate for plain text characters [StringPrep, C.6]
# - Inappropriate for canonical representation characters [StringPrep, C.7]
# - Change display properties or deprecated characters [StringPrep, C.8]
# - Tagging characters [StringPrep, C.9]
# RFC4013, 2.4. Bidirectional Characters.
# RFC4013, 2.5. Unassigned Code Points.
# Determine how to handle bidirectional characters (RFC3454):
if is_ral_string(string):
# If a string contains any RandALCat characters,
# The string MUST NOT contain any LCat character:
is_prohibited_bidi_ch = in_table_d2
bidi_table = "D.2"
else:
# Forbid RandALCat characters in LCat string:
is_prohibited_bidi_ch = in_table_d1
bidi_table = "D.1"
RFC = "RFC4013"
for c in string:
# RFC4013 2.3. Prohibited Output:
if in_table_c12(c):
raise ValueError(f"{RFC}: prohibited non-ASCII space characters that cannot be replaced (C.1.2).")
if in_table_c21_c22(c):
raise ValueError(f"{RFC}: prohibited control characters (C.2.1).")
if in_table_c3(c):
raise ValueError(f"{RFC}: prohibited private Use characters (C.3).")
if in_table_c4(c):
raise ValueError(f"{RFC}: prohibited non-character code points (C.4).")
if in_table_c5(c):
raise ValueError(f"{RFC}: prohibited surrogate code points (C.5).")
if in_table_c6(c):
raise ValueError(f"{RFC}: prohibited inappropriate for plain text characters (C.6).")
if in_table_c7(c):
raise ValueError(f"{RFC}: prohibited inappropriate for canonical representation characters (C.7).")
if in_table_c8(c):
raise ValueError(f"{RFC}: prohibited change display properties / deprecated characters (C.8).")
if in_table_c9(c):
raise ValueError(f"{RFC}: prohibited tagging characters (C.9).")
# RFC4013, 2.4. Bidirectional Characters:
if is_prohibited_bidi_ch(c):
raise ValueError(f"{RFC}: prohibited bidi characters ({bidi_table}).")
# RFC4013, 2.5. Unassigned Code Points:
if in_table_a1(c):
raise ValueError(f"{RFC}: prohibited unassigned code points (A.1).")
def saslprep(string):
"""RFC4013 implementation.
Implements "SASLprep" profile (RFC4013) of the "stringprep" algorithm (RFC3454)
to prepare Unicode strings representing user names and passwords for comparison.
Regarding the RFC4013, the "SASLprep" profile is intended to be used by
Simple Authentication and Security Layer (SASL) mechanisms
(such as PLAIN, CRAM-MD5, and DIGEST-MD5), as well as other protocols
exchanging simple user names and/or passwords.
Args:
string (unicode string): Unicode string to validate and prepare.
Returns:
Prepared unicode string.
"""
# RFC4013: "The algorithm assumes all strings are
# comprised of characters from the Unicode [Unicode] character set."
# Validate the string is a Unicode string
if not is_unicode_str(string):
raise TypeError(f"input must be of type str, not {type(string)}")
# RFC4013: 2.1. Mapping.
string = mapping_profile(string)
# RFC4013: 2.2. Normalization.
# "This profile specifies using Unicode normalization form KC."
string = normalize("NFKC", string)
if not string:
return ""
# RFC4013: 2.3. Prohibited Output.
# RFC4013: 2.4. Bidirectional Characters.
# RFC4013: 2.5. Unassigned Code Points.
prohibited_output_profile(string)
return string