1
0
Fork 0
mirror of https://github.com/containers/ansible-podman-collections.git synced 2026-02-04 07:11:49 +00:00
ansible-podman-collections/plugins/module_utils/podman/common.py
Sergey f2e813671a
Run black -l 120 on all files, again (#943)
Signed-off-by: Sagi Shnaidman <sshnaidm@redhat.com>
2025-06-26 13:24:44 +03:00

452 lines
17 KiB
Python

# Copyright (c) 2020 Red Hat
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import json
import os
import shutil
from ansible.module_utils.six import raise_from
try:
from ansible.module_utils.compat.version import LooseVersion # noqa: F401
except ImportError:
try:
from distutils.version import LooseVersion # noqa: F401
except ImportError as exc:
raise_from(
ImportError(
"To use this plugin or module with ansible-core"
" < 2.11, you need to use Python < 3.12 with "
"distutils.version present"
),
exc,
)
ARGUMENTS_OPTS_DICT = {
"--attach": ["--attach", "-a"],
"--cpu-shares": ["--cpu-shares", "-c"],
"--detach": ["--detach", "-d"],
"--env": ["--env", "-e"],
"--hostname": ["--hostname", "-h"],
"--interactive": ["--interactive", "-i"],
"--label": ["--label", "-l"],
"--memory": ["--memory", "-m"],
"--network": ["--network", "--net"],
"--publish": ["--publish", "-p"],
"--publish-all": ["--publish-all", "-P"],
"--quiet": ["--quiet", "-q"],
"--tty": ["--tty", "-t"],
"--user": ["--user", "-u"],
"--volume": ["--volume", "-v"],
"--workdir": ["--workdir", "-w"],
}
def run_podman_command(module, executable="podman", args=None, expected_rc=0, ignore_errors=False):
if not isinstance(executable, list):
command = [executable]
if args is not None:
command.extend(args)
rc, out, err = module.run_command(command)
if not ignore_errors and rc != expected_rc:
module.fail_json(msg="Failed to run {command} {args}: {err}".format(command=command, args=args, err=err))
return rc, out, err
def run_generate_systemd_command(module, module_params, name, version):
"""Generate systemd unit file."""
command = [
module_params["executable"],
"generate",
"systemd",
name,
"--format",
"json",
]
sysconf = module_params["generate_systemd"]
gt4ver = LooseVersion(version) >= LooseVersion("4.0.0")
if sysconf.get("restart_policy"):
if sysconf.get("restart_policy") not in [
"no",
"on-success",
"on-failure",
"on-abnormal",
"on-watchdog",
"on-abort",
"always",
]:
module.fail_json(
'Restart policy for systemd unit file is "%s" and must be one of: '
'"no", "on-success", "on-failure", "on-abnormal", "on-watchdog", "on-abort", or "always"'
% sysconf.get("restart_policy")
)
command.extend(["--restart-policy", sysconf["restart_policy"]])
if sysconf.get("restart_sec") is not None:
command.extend(["--restart-sec=%s" % sysconf["restart_sec"]])
if (sysconf.get("stop_timeout") is not None) or (sysconf.get("time") is not None):
# Select correct parameter name based on version
arg_name = "stop-timeout" if gt4ver else "time"
arg_value = sysconf.get("stop_timeout") if sysconf.get("stop_timeout") is not None else sysconf.get("time")
command.extend(["--%s=%s" % (arg_name, arg_value)])
if sysconf.get("start_timeout") is not None:
command.extend(["--start-timeout=%s" % sysconf["start_timeout"]])
if sysconf.get("no_header"):
command.extend(["--no-header"])
if sysconf.get("names", True):
command.extend(["--name"])
if sysconf.get("new"):
command.extend(["--new"])
if sysconf.get("container_prefix") is not None:
command.extend(["--container-prefix=%s" % sysconf["container_prefix"]])
if sysconf.get("pod_prefix") is not None:
command.extend(["--pod-prefix=%s" % sysconf["pod_prefix"]])
if sysconf.get("separator") is not None:
command.extend(["--separator=%s" % sysconf["separator"]])
if sysconf.get("after") is not None:
sys_after = sysconf["after"]
if isinstance(sys_after, str):
sys_after = [sys_after]
for after in sys_after:
command.extend(["--after=%s" % after])
if sysconf.get("wants") is not None:
sys_wants = sysconf["wants"]
if isinstance(sys_wants, str):
sys_wants = [sys_wants]
for want in sys_wants:
command.extend(["--wants=%s" % want])
if sysconf.get("requires") is not None:
sys_req = sysconf["requires"]
if isinstance(sys_req, str):
sys_req = [sys_req]
for require in sys_req:
command.extend(["--requires=%s" % require])
for param in ["after", "wants", "requires"]:
if sysconf.get(param) is not None and not gt4ver:
module.fail_json(
msg="Systemd parameter '%s' is supported from "
"podman version 4 only! Current version is %s" % (param, version)
)
if module.params["debug"] or module_params["debug"]:
module.log("PODMAN-CONTAINER-DEBUG: systemd command: %s" % " ".join(command))
rc, systemd, err = module.run_command(command)
return rc, systemd, err
def compare_systemd_file_content(file_path, file_content):
if not os.path.exists(file_path):
# File does not exist, so all lines in file_content are different
return "", file_content
# Read the file
with open(file_path, "r") as unit_file:
current_unit_file_content = unit_file.read()
# Function to remove comments from file content
def remove_comments(content):
return "\n".join([line for line in content.splitlines() if not line.startswith("#")])
# Remove comments from both file contents before comparison
current_unit_file_content_nocmnt = remove_comments(current_unit_file_content)
unit_content_nocmnt = remove_comments(file_content)
if current_unit_file_content_nocmnt == unit_content_nocmnt:
return None
# Get the different lines between the two contents
diff_in_file = [
line for line in unit_content_nocmnt.splitlines() if line not in current_unit_file_content_nocmnt.splitlines()
]
diff_in_string = [
line for line in current_unit_file_content_nocmnt.splitlines() if line not in unit_content_nocmnt.splitlines()
]
return diff_in_string, diff_in_file
def generate_systemd(module, module_params, name, version):
result = {
"changed": False,
"systemd": {},
"diff": {},
}
sysconf = module_params["generate_systemd"]
rc, systemd, err = run_generate_systemd_command(module, module_params, name, version)
if rc != 0:
module.log("PODMAN-CONTAINER-DEBUG: Error generating systemd: %s" % err)
if sysconf:
module.fail_json(msg="Error generating systemd: %s" % err)
return result
else:
try:
data = json.loads(systemd)
result["systemd"] = data
if sysconf.get("path"):
full_path = os.path.expanduser(sysconf["path"])
if not os.path.exists(full_path):
os.makedirs(full_path)
result["changed"] = True
if not os.path.isdir(full_path):
module.fail_json(
"Path %s is not a directory! " "Can not save systemd unit files there!" % full_path
)
for file_name, file_content in data.items():
file_name += ".service"
if not os.path.exists(os.path.join(full_path, file_name)):
result["changed"] = True
if result["diff"].get("before") is None:
result["diff"] = {"before": {}, "after": {}}
result["diff"]["before"].update({"systemd_{file_name}.service".format(file_name=file_name): ""})
result["diff"]["after"].update(
{"systemd_{file_name}.service".format(file_name=file_name): file_content}
)
else:
diff_ = compare_systemd_file_content(os.path.join(full_path, file_name), file_content)
if diff_:
result["changed"] = True
if result["diff"].get("before") is None:
result["diff"] = {"before": {}, "after": {}}
result["diff"]["before"].update(
{"systemd_{file_name}.service".format(file_name=file_name): "\n".join(diff_[0])}
)
result["diff"]["after"].update(
{"systemd_{file_name}.service".format(file_name=file_name): "\n".join(diff_[1])}
)
with open(os.path.join(full_path, file_name), "w") as f:
f.write(file_content)
diff_before = "\n".join(
[
"{j} - {k}".format(j=j, k=k)
for j, k in result["diff"].get("before", {}).items()
if "PIDFile" not in k
]
).strip()
diff_after = "\n".join(
[
"{j} - {k}".format(j=j, k=k)
for j, k in result["diff"].get("after", {}).items()
if "PIDFile" not in k
]
).strip()
if diff_before or diff_after:
result["diff"]["before"] = diff_before + "\n"
result["diff"]["after"] = diff_after + "\n"
else:
result["diff"] = {}
return result
except Exception as e:
module.log("PODMAN-CONTAINER-DEBUG: Error writing systemd: %s" % e)
if sysconf:
module.fail_json(msg="Error writing systemd: %s" % e)
return result
def delete_systemd(module, module_params, name, version):
sysconf = module_params["generate_systemd"]
if not sysconf.get("path"):
# We don't know where systemd files are located, nothing to delete
module.log("PODMAN-CONTAINER-DEBUG: Not deleting systemd file - no path!")
return
rc, systemd, err = run_generate_systemd_command(module, module_params, name, version)
if rc != 0:
module.log("PODMAN-CONTAINER-DEBUG: Error generating systemd: %s" % err)
return
else:
try:
data = json.loads(systemd)
for file_name in data.keys():
file_name += ".service"
full_dir_path = os.path.expanduser(sysconf["path"])
file_path = os.path.join(full_dir_path, file_name)
if os.path.exists(file_path):
os.unlink(file_path)
return
except Exception as e:
module.log("PODMAN-CONTAINER-DEBUG: Error deleting systemd: %s" % e)
return
def lower_keys(x):
if isinstance(x, list):
return [lower_keys(v) for v in x]
elif isinstance(x, dict):
return dict((k.lower(), lower_keys(v)) for k, v in x.items())
else:
return x
def remove_file_or_dir(path):
if os.path.isfile(path):
os.unlink(path)
elif os.path.isdir(path):
shutil.rmtree(path)
else:
raise ValueError("file %s is not a file or dir." % path)
# Generated from https://github.com/containers/podman/blob/main/pkg/signal/signal_linux.go
# and https://github.com/containers/podman/blob/main/pkg/signal/signal_linux_mipsx.go
_signal_map = {
"ABRT": 6,
"ALRM": 14,
"BUS": 7,
"CHLD": 17,
"CLD": 17,
"CONT": 18,
"EMT": 7,
"FPE": 8,
"HUP": 1,
"ILL": 4,
"INT": 2,
"IO": 29,
"IOT": 6,
"KILL": 9,
"PIPE": 13,
"POLL": 29,
"PROF": 27,
"PWR": 30,
"QUIT": 3,
"RTMAX": 64,
"RTMIN": 34,
"SEGV": 11,
"STKFLT": 16,
"STOP": 19,
"SYS": 31,
"TERM": 15,
"TRAP": 5,
"TSTP": 20,
"TTIN": 21,
"TTOU": 22,
"URG": 23,
"USR1": 10,
"USR2": 12,
"VTALRM": 26,
"WINCH": 28,
"XCPU": 24,
"XFSZ": 25,
}
for i in range(1, _signal_map["RTMAX"] - _signal_map["RTMIN"] + 1):
_signal_map["RTMIN+{0}".format(i)] = _signal_map["RTMIN"] + i
_signal_map["RTMAX-{0}".format(i)] = _signal_map["RTMAX"] - i
def normalize_signal(signal_name_or_number):
signal_name_or_number = str(signal_name_or_number)
if signal_name_or_number.isdigit():
return signal_name_or_number
else:
signal_name = signal_name_or_number.upper()
if signal_name.startswith("SIG"):
signal_name = signal_name[3:]
if signal_name not in _signal_map:
raise RuntimeError("Unknown signal '{0}'".format(signal_name_or_number))
return str(_signal_map[signal_name])
def get_podman_version(module, fail=True):
executable = module.params["executable"] if module.params["executable"] else "podman"
rc, out, err = module.run_command([executable, b"--version"])
if rc != 0 or not out or "version" not in out:
if fail:
module.fail_json(msg="'%s --version' run failed! Error: %s" % (executable, err))
return None
return out.split("version")[1].strip()
def createcommand(argument, info_config, boolean_type=False):
"""Returns list of values for given argument from CreateCommand
from Podman container inspect output.
Args:
argument (str): argument name
info_config (dict): dictionary with container info
boolean_type (bool): if True, then argument is boolean type
Returns:
all_values: list of values for given argument from createcommand
"""
if "createcommand" not in info_config:
return []
cr_com = info_config["createcommand"]
argument_values = ARGUMENTS_OPTS_DICT.get(argument, [argument])
all_values = []
# Remove command args from the list
container_cmd = info_config.get("cmd")
if container_cmd and container_cmd == cr_com[-len(container_cmd):]:
cr_com = cr_com[:-len(container_cmd)]
for arg in argument_values:
for ind, cr_opt in enumerate(cr_com):
if arg == cr_opt:
if boolean_type:
# This is a boolean argument and doesn't have value
return [True]
if not cr_com[ind + 1].startswith("-"):
# This is a key=value argument
all_values.append(cr_com[ind + 1])
else:
# This is also a false/true switching argument
return [True]
if cr_opt.startswith("%s=" % arg):
all_values.append(cr_opt.split("=", 1)[1])
return all_values
def diff_generic(params, info_config, module_arg, cmd_arg, boolean_type=False):
"""
Generic diff function for module arguments from CreateCommand
in Podman inspection output.
Args:
params (dict): module parameters
info_config (dict): dictionary with container info
module_arg (str): module argument name
cmd_arg (str): command line argument name
boolean_type (bool): if True, then argument is boolean type
Returns:
bool: True if there is a difference, False otherwise
"""
before = createcommand(cmd_arg, info_config, boolean_type=boolean_type)
if before == []:
before = None
after = params[module_arg]
if boolean_type and (before, after) in [(None, False), (False, None)]:
before, after = False, False
return before, after
if before is None and after is None:
return before, after
if after is not None:
if isinstance(after, list):
after = ",".join(sorted([str(i).lower() for i in after]))
if before:
before = ",".join(sorted([str(i).lower() for i in before]))
else:
before = ""
elif isinstance(after, dict):
if module_arg == "log_opt" and "max_size" in after:
after["max-size"] = after.pop("max_size")
after = ",".join(sorted([str(k).lower() + "=" + str(v).lower() for k, v in after.items() if v is not None]))
if before:
before = ",".join(sorted([j.lower() for j in before]))
else:
before = ""
elif isinstance(after, bool):
after = str(after).capitalize()
if before is not None:
before = str(before[0]).capitalize()
elif isinstance(after, int):
after = str(after)
if before is not None:
before = str(before[0])
else:
before = before[0] if before else None
else:
before = ",".join(sorted(before)) if len(before) > 1 else before[0]
return before, after