1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00
community.general/plugins/connection/incus.py
patchback[bot] 5d16a88298
[PR #11347/e790b950 backport][stable-12] incus connection: fix regex (#11415)
* incus connection: fix regex (#11347)

* incus connection: fix regex

* updates

* Apply suggestions from code review

* expand regexp capture

* add changelog frag

* Update plugins/connection/incus.py

* split arguments after command option

* Update plugins/connection/incus.py

* remove *() and split from the last command

* add tests, make small adjustments

* remove redundant strip()

* add more tests

* adjusted changelog fragment

(cherry picked from commit e790b95067)

* Order imports.

(cherry picked from commit 76d51db8d0)

---------

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
Co-authored-by: Felix Fontein <felix@fontein.de>
2026-01-09 21:22:29 +01:00

308 lines
11 KiB
Python

# Based on lxd.py (c) 2016, Matt Clay <matt@mystile.com>
# (c) 2023, Stephane Graber <stgraber@stgraber.org>
# Copyright (c) 2023 Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
DOCUMENTATION = r"""
author: Stéphane Graber (@stgraber)
name: incus
short_description: Run tasks in Incus instances using the Incus CLI
description:
- Run commands or put/fetch files to an existing Incus instance using Incus CLI.
version_added: "8.2.0"
notes:
- When using this collection for Windows virtual machines, set C(ansible_shell_type) to C(powershell) or C(cmd) as a variable to the host in
the inventory.
options:
remote_addr:
description:
- The instance identifier.
type: string
default: inventory_hostname
vars:
- name: inventory_hostname
- name: ansible_host
- name: ansible_incus_host
executable:
description:
- The shell to use for execution inside the instance.
type: string
default: /bin/sh
vars:
- name: ansible_executable
- name: ansible_incus_executable
incus_become_method:
description:
- Become command used to switch to a non-root user.
- Is only used when O(remote_user) is not V(root).
type: str
default: /bin/su
vars:
- name: incus_become_method
version_added: 10.4.0
remote:
description:
- The name of the Incus remote to use (per C(incus remote list)).
- Remotes are used to access multiple servers from a single client.
type: string
default: local
vars:
- name: ansible_incus_remote
remote_user:
description:
- User to login/authenticate as.
- Can be set from the CLI with the C(--user) or C(-u) options.
type: string
default: root
vars:
- name: ansible_user
env:
- name: ANSIBLE_REMOTE_USER
ini:
- section: defaults
key: remote_user
keyword:
- name: remote_user
version_added: 10.4.0
project:
description:
- The name of the Incus project to use (per C(incus project list)).
- Projects are used to divide the instances running on a server.
type: string
default: default
vars:
- name: ansible_incus_project
"""
import os
import re
from subprocess import PIPE, Popen, call
from ansible.errors import AnsibleConnectionFailure, AnsibleError, AnsibleFileNotFound
from ansible.module_utils.common.process import get_bin_path
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.plugins.connection import ConnectionBase
class Connection(ConnectionBase):
"""Incus based connections"""
transport = "incus"
has_pipelining = True
def __init__(self, play_context, new_stdin, *args, **kwargs):
super().__init__(play_context, new_stdin, *args, **kwargs)
self._incus_cmd = get_bin_path("incus")
if not self._incus_cmd:
raise AnsibleError("incus command not found in PATH")
if getattr(self._shell, "_IS_WINDOWS", False):
# Initializing regular expression patterns to match on a PowerShell or cmd command line.
self.powershell_regex_pattern = re.compile(
r'^"?(?P<executable>(?:[a-z]:\\)?[a-z0-9 ()\\.]*powershell(?:\.exe)?)"?\s+(?P<args>.*)(?P<command>-c(?:ommand)?)\s+(?P<post_args>.*(\n.*)*)',
re.IGNORECASE,
)
self.cmd_regex_pattern = re.compile(
r'^"?(?P<executable>(?:[a-z]:\\)?[a-z0-9 ()\\.]*cmd(?:\.exe)?)"?\s+(?P<args>.*)(?P<command>/c)\s+(?P<post_args>.*)',
re.IGNORECASE,
)
# Basic setup for a Windows host.
self.has_native_async = True
self.always_pipeline_modules = True
self.module_implementation_preferences = (".ps1", ".exe", "")
self.allow_executable = False
def _connect(self):
"""connect to Incus (nothing to do here)"""
super()._connect()
if not self._connected:
self._display.vvv(
f"ESTABLISH Incus CONNECTION FOR USER: {self.get_option('remote_user')}", host=self._instance()
)
self._connected = True
def _build_command(self, cmd) -> list[str]:
"""build the command to execute on the incus host"""
exec_cmd: list[str] = [
self._incus_cmd,
"--project",
self.get_option("project"),
"exec",
*(["-T"] if getattr(self._shell, "_IS_WINDOWS", False) else []),
f"{self.get_option('remote')}:{self._instance()}",
"--",
]
if getattr(self._shell, "_IS_WINDOWS", False):
if regex_match := self.powershell_regex_pattern.match(cmd):
regex_pattern = self.powershell_regex_pattern
elif regex_match := self.cmd_regex_pattern.match(cmd):
regex_pattern = self.cmd_regex_pattern
if regex_match:
self._display.vvvvvv(
f'Found keyword: "{regex_match.group("command")}" based on regex: {regex_pattern.pattern}',
host=self._instance(),
)
# To avoid splitting on a space contained in the path, set the executable as the first argument.
exec_cmd.append(regex_match.group("executable"))
if args := regex_match.group("args"):
exec_cmd.extend(args.strip().split(" "))
# Set the command argument depending on cmd or powershell and the rest of it
exec_cmd.append(regex_match.group("command"))
if post_args := regex_match.group("post_args"):
exec_cmd.append(post_args.strip())
else:
# For anything else using -EncodedCommand or else, just split on space.
exec_cmd.extend(cmd.split(" "))
else:
if self.get_option("remote_user") != "root":
self._display.vvv(
f"INFO: Running as non-root user: {self.get_option('remote_user')}, \
trying to run 'incus exec' with become method: {self.get_option('incus_become_method')}",
host=self._instance(),
)
exec_cmd.extend([self.get_option("incus_become_method"), self.get_option("remote_user"), "-c"])
exec_cmd.extend([self.get_option("executable"), "-c", cmd])
return exec_cmd
def _instance(self):
# Return only the leading part of the FQDN as the instance name
# as Incus instance names cannot be a FQDN.
return self.get_option("remote_addr").split(".")[0]
def exec_command(self, cmd, in_data=None, sudoable=True):
"""execute a command on the Incus host"""
super().exec_command(cmd, in_data=in_data, sudoable=sudoable)
self._display.vvv(f"EXEC {cmd}", host=self._instance())
local_cmd = self._build_command(cmd)
self._display.vvvvv(f"EXEC {local_cmd}", host=self._instance())
local_cmd = [to_bytes(i, errors="surrogate_or_strict") for i in local_cmd]
in_data = to_bytes(in_data, errors="surrogate_or_strict", nonstring="passthru")
process = Popen(local_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdout, stderr = process.communicate(in_data)
stdout = to_text(stdout)
stderr = to_text(stderr)
if stderr.startswith("Error: ") and stderr.rstrip().endswith(": Instance is not running"):
raise AnsibleConnectionFailure(
f"instance not running: {self._instance()} (remote={self.get_option('remote')}, project={self.get_option('project')})"
)
if stderr.startswith("Error: ") and stderr.rstrip().endswith(": Instance not found"):
raise AnsibleConnectionFailure(
f"instance not found: {self._instance()} (remote={self.get_option('remote')}, project={self.get_option('project')})"
)
if stderr.startswith("Error: ") and ": User does not have permission " in stderr:
raise AnsibleConnectionFailure(
f"instance access denied: {self._instance()} (remote={self.get_option('remote')}, project={self.get_option('project')})"
)
if stderr.startswith("Error: ") and ": User does not have entitlement " in stderr:
raise AnsibleConnectionFailure(
f"instance access denied: {self._instance()} (remote={self.get_option('remote')}, project={self.get_option('project')})"
)
return process.returncode, stdout, stderr
def _get_remote_uid_gid(self) -> tuple[int, int]:
"""Get the user and group ID of 'remote_user' from the instance."""
rc, uid_out, err = self.exec_command("/bin/id -u")
if rc != 0:
raise AnsibleError(f"Failed to get remote uid for user {self.get_option('remote_user')}: {err}")
uid = uid_out.strip()
rc, gid_out, err = self.exec_command("/bin/id -g")
if rc != 0:
raise AnsibleError(f"Failed to get remote gid for user {self.get_option('remote_user')}: {err}")
gid = gid_out.strip()
return int(uid), int(gid)
def put_file(self, in_path, out_path):
"""put a file from local to Incus"""
super().put_file(in_path, out_path)
self._display.vvv(f"PUT {in_path} TO {out_path}", host=self._instance())
if not os.path.isfile(to_bytes(in_path, errors="surrogate_or_strict")):
raise AnsibleFileNotFound(f"input path is not a file: {in_path}")
if not getattr(self._shell, "_IS_WINDOWS", False) and self.get_option("remote_user") != "root":
uid, gid = self._get_remote_uid_gid()
local_cmd = [
self._incus_cmd,
"--project",
self.get_option("project"),
"file",
"push",
"--uid",
str(uid),
"--gid",
str(gid),
"--quiet",
in_path,
f"{self.get_option('remote')}:{self._instance()}/{out_path}",
]
else:
local_cmd = [
self._incus_cmd,
"--project",
self.get_option("project"),
"file",
"push",
"--quiet",
in_path,
f"{self.get_option('remote')}:{self._instance()}/{out_path}",
]
self._display.vvvvv(f"PUT {local_cmd}", host=self._instance())
local_cmd = [to_bytes(i, errors="surrogate_or_strict") for i in local_cmd]
call(local_cmd)
def fetch_file(self, in_path, out_path):
"""fetch a file from Incus to local"""
super().fetch_file(in_path, out_path)
self._display.vvv(f"FETCH {in_path} TO {out_path}", host=self._instance())
local_cmd = [
self._incus_cmd,
"--project",
self.get_option("project"),
"file",
"pull",
"--quiet",
f"{self.get_option('remote')}:{self._instance()}/{in_path}",
out_path,
]
local_cmd = [to_bytes(i, errors="surrogate_or_strict") for i in local_cmd]
call(local_cmd)
def close(self):
"""close the connection (nothing to do here)"""
super().close()
self._connected = False