1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 07:51:50 +00:00
community.general/plugins/module_utils/btrfs.py
patchback[bot] 8cd80d94a0
[PR #11049/396f467b backport][stable-12] Improve Python code: address unused variables (#11058)
Improve Python code: address unused variables (#11049)

* Address F841 (unused variable).

* Reformat.

* Add changelog fragment.

* More cleanup.

* Remove trailing whitespace.

* Readd removed code as a comment with TODO.

(cherry picked from commit 396f467bbb)

Co-authored-by: Felix Fontein <felix@fontein.de>
2025-11-09 09:59:42 +01:00

460 lines
17 KiB
Python

# Copyright (c) 2022, Gregory Furlong <gnfzdz@fzdz.io>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
from ansible.module_utils.common.text.converters import to_bytes
import re
import os
def normalize_subvolume_path(path):
"""
Normalizes btrfs subvolume paths to ensure exactly one leading slash, no trailing slashes and no consecutive slashes.
In addition, if the path is prefixed with a leading <FS_TREE>, this value is removed.
"""
fstree_stripped = re.sub(r"^<FS_TREE>", "", path)
result = re.sub(r"/+$", "", re.sub(r"/+", "/", f"/{fstree_stripped}"))
return result if len(result) > 0 else "/"
class BtrfsModuleException(Exception):
pass
class BtrfsCommands:
"""
Provides access to a subset of the Btrfs command line
"""
def __init__(self, module):
self.__module = module
self.__btrfs = self.__module.get_bin_path("btrfs", required=True)
def filesystem_show(self):
command = f"{self.__btrfs} filesystem show -d"
result = self.__module.run_command(command, check_rc=True)
stdout = [x.strip() for x in result[1].splitlines()]
filesystems = []
current = None
for line in stdout:
if line.startswith("Label"):
current = self.__parse_filesystem(line)
filesystems.append(current)
elif line.startswith("devid"):
current["devices"].append(self.__parse_filesystem_device(line))
return filesystems
def __parse_filesystem(self, line):
label = re.sub(r"\s*uuid:.*$", "", re.sub(r"^Label:\s*", "", line))
id = re.sub(r"^.*uuid:\s*", "", line)
filesystem = {}
filesystem["label"] = label.strip("'") if label != "none" else None
filesystem["uuid"] = id
filesystem["devices"] = []
filesystem["mountpoints"] = []
filesystem["subvolumes"] = []
filesystem["default_subvolid"] = None
return filesystem
def __parse_filesystem_device(self, line):
return re.sub(r"^.*path\s", "", line)
def subvolumes_list(self, filesystem_path):
command = f"{self.__btrfs} subvolume list -tap {filesystem_path}"
result = self.__module.run_command(command, check_rc=True)
stdout = [x.split("\t") for x in result[1].splitlines()]
subvolumes = [{"id": 5, "parent": None, "path": "/"}]
if len(stdout) > 2:
subvolumes.extend([self.__parse_subvolume_list_record(x) for x in stdout[2:]])
return subvolumes
def __parse_subvolume_list_record(self, item):
return {
"id": int(item[0]),
"parent": int(item[2]),
"path": normalize_subvolume_path(item[5]),
}
def subvolume_get_default(self, filesystem_path):
command = [self.__btrfs, "subvolume", "get-default", to_bytes(filesystem_path)]
result = self.__module.run_command(command, check_rc=True)
# ID [n] ...
return int(result[1].strip().split()[1])
def subvolume_set_default(self, filesystem_path, subvolume_id):
command = [self.__btrfs, "subvolume", "set-default", str(subvolume_id), to_bytes(filesystem_path)]
self.__module.run_command(command, check_rc=True)
def subvolume_create(self, subvolume_path):
command = [self.__btrfs, "subvolume", "create", to_bytes(subvolume_path)]
self.__module.run_command(command, check_rc=True)
def subvolume_snapshot(self, snapshot_source, snapshot_destination):
command = [self.__btrfs, "subvolume", "snapshot", to_bytes(snapshot_source), to_bytes(snapshot_destination)]
self.__module.run_command(command, check_rc=True)
def subvolume_delete(self, subvolume_path):
command = [self.__btrfs, "subvolume", "delete", to_bytes(subvolume_path)]
self.__module.run_command(command, check_rc=True)
class BtrfsInfoProvider:
"""
Utility providing details of the currently available btrfs filesystems
"""
def __init__(self, module):
self.__module = module
self.__btrfs_api = BtrfsCommands(module)
self.__findmnt_path = self.__module.get_bin_path("findmnt", required=True)
def get_filesystems(self):
filesystems = self.__btrfs_api.filesystem_show()
mountpoints = self.__find_mountpoints()
for filesystem in filesystems:
device_mountpoints = self.__filter_mountpoints_for_devices(mountpoints, filesystem["devices"])
filesystem["mountpoints"] = device_mountpoints
if len(device_mountpoints) > 0:
# any path within the filesystem can be used to query metadata
mountpoint = device_mountpoints[0]["mountpoint"]
filesystem["subvolumes"] = self.get_subvolumes(mountpoint)
filesystem["default_subvolid"] = self.get_default_subvolume_id(mountpoint)
return filesystems
def get_mountpoints(self, filesystem_devices):
mountpoints = self.__find_mountpoints()
return self.__filter_mountpoints_for_devices(mountpoints, filesystem_devices)
def get_subvolumes(self, filesystem_path):
return self.__btrfs_api.subvolumes_list(filesystem_path)
def get_default_subvolume_id(self, filesystem_path):
return self.__btrfs_api.subvolume_get_default(filesystem_path)
def __filter_mountpoints_for_devices(self, mountpoints, devices):
return [m for m in mountpoints if (m["device"] in devices)]
def __find_mountpoints(self):
command = f"{self.__findmnt_path} -t btrfs -nvP"
result = self.__module.run_command(command)
mountpoints = []
if result[0] == 0:
lines = result[1].splitlines()
for line in lines:
mountpoint = self.__parse_mountpoint_pairs(line)
mountpoints.append(mountpoint)
return mountpoints
def __parse_mountpoint_pairs(self, line):
pattern = re.compile(
r'^TARGET="(?P<target>.*)"\s+SOURCE="(?P<source>.*)"\s+FSTYPE="(?P<fstype>.*)"\s+OPTIONS="(?P<options>.*)"\s*$'
)
match = pattern.search(line)
if match is not None:
groups = match.groupdict()
return {
"mountpoint": groups["target"],
"device": groups["source"],
"subvolid": self.__extract_mount_subvolid(groups["options"]),
}
else:
raise BtrfsModuleException(f"Failed to parse findmnt result for line: '{line}'")
def __extract_mount_subvolid(self, mount_options):
for option in mount_options.split(","):
if option.startswith("subvolid="):
return int(option[len("subvolid=") :])
raise BtrfsModuleException(f"Failed to find subvolid for mountpoint in options '{mount_options}'")
class BtrfsSubvolume:
"""
Wrapper class providing convenience methods for inspection of a btrfs subvolume
"""
def __init__(self, filesystem, subvolume_id):
self.__filesystem = filesystem
self.__subvolume_id = subvolume_id
def get_filesystem(self):
return self.__filesystem
def is_mounted(self):
mountpoints = self.get_mountpoints()
return mountpoints is not None and len(mountpoints) > 0
def is_filesystem_root(self):
return self.__subvolume_id == 5
def is_filesystem_default(self):
return self.__filesystem.default_subvolid == self.__subvolume_id
def get_mounted_path(self):
mountpoints = self.get_mountpoints()
if mountpoints is not None and len(mountpoints) > 0:
return mountpoints[0]
elif self.parent is not None:
parent = self.__filesystem.get_subvolume_by_id(self.parent)
parent_path = parent.get_mounted_path()
if parent_path is not None:
return parent_path + os.path.sep + self.name
else:
return None
def get_mountpoints(self):
return self.__filesystem.get_mountpoints_by_subvolume_id(self.__subvolume_id)
def get_child_relative_path(self, absolute_child_path):
"""
Get the relative path from this subvolume to the named child subvolume.
The provided parameter is expected to be normalized as by normalize_subvolume_path.
"""
path = self.path
if absolute_child_path.startswith(path):
relative = absolute_child_path[len(path) :]
return re.sub(r"^/*", "", relative)
else:
raise BtrfsModuleException(f"Path '{absolute_child_path}' doesn't start with '{path}'")
def get_parent_subvolume(self):
parent_id = self.parent
return self.__filesystem.get_subvolume_by_id(parent_id) if parent_id is not None else None
def get_child_subvolumes(self):
return self.__filesystem.get_subvolume_children(self.__subvolume_id)
@property
def __info(self):
return self.__filesystem.get_subvolume_info_for_id(self.__subvolume_id)
@property
def id(self):
return self.__subvolume_id
@property
def name(self):
return self.path.split("/").pop()
@property
def path(self):
return self.__info["path"]
@property
def parent(self):
return self.__info["parent"]
class BtrfsFilesystem:
"""
Wrapper class providing convenience methods for inspection of a btrfs filesystem
"""
def __init__(self, info, provider, module):
self.__provider = provider
# constant for module execution
self.__uuid = info["uuid"]
self.__label = info["label"]
self.__devices = info["devices"]
# refreshable
self.__default_subvolid = info["default_subvolid"] if "default_subvolid" in info else None
self.__update_mountpoints(info["mountpoints"] if "mountpoints" in info else [])
self.__update_subvolumes(info["subvolumes"] if "subvolumes" in info else [])
@property
def uuid(self):
return self.__uuid
@property
def label(self):
return self.__label
@property
def default_subvolid(self):
return self.__default_subvolid
@property
def devices(self):
return list(self.__devices)
def refresh(self):
self.refresh_mountpoints()
self.refresh_subvolumes()
self.refresh_default_subvolume()
def refresh_mountpoints(self):
mountpoints = self.__provider.get_mountpoints(list(self.__devices))
self.__update_mountpoints(mountpoints)
def __update_mountpoints(self, mountpoints):
self.__mountpoints = dict()
for i in mountpoints:
subvolid = i["subvolid"]
mountpoint = i["mountpoint"]
if subvolid not in self.__mountpoints:
self.__mountpoints[subvolid] = []
self.__mountpoints[subvolid].append(mountpoint)
def refresh_subvolumes(self):
filesystem_path = self.get_any_mountpoint()
if filesystem_path is not None:
subvolumes = self.__provider.get_subvolumes(filesystem_path)
self.__update_subvolumes(subvolumes)
def __update_subvolumes(self, subvolumes):
# TODO strategy for retaining information on deleted subvolumes?
self.__subvolumes = dict()
for subvolume in subvolumes:
self.__subvolumes[subvolume["id"]] = subvolume
def refresh_default_subvolume(self):
filesystem_path = self.get_any_mountpoint()
if filesystem_path is not None:
self.__default_subvolid = self.__provider.get_default_subvolume_id(filesystem_path)
def contains_device(self, device):
return device in self.__devices
def contains_subvolume(self, subvolume):
return self.get_subvolume_by_name(subvolume) is not None
def get_subvolume_by_id(self, subvolume_id):
return BtrfsSubvolume(self, subvolume_id) if subvolume_id in self.__subvolumes else None
def get_subvolume_info_for_id(self, subvolume_id):
return self.__subvolumes[subvolume_id] if subvolume_id in self.__subvolumes else None
def get_subvolume_by_name(self, subvolume):
for subvolume_info in self.__subvolumes.values():
if subvolume_info["path"] == subvolume:
return BtrfsSubvolume(self, subvolume_info["id"])
return None
def get_any_mountpoint(self):
for subvol_mountpoints in self.__mountpoints.values():
if len(subvol_mountpoints) > 0:
return subvol_mountpoints[0]
# maybe error?
return None
def get_any_mounted_subvolume(self):
for subvolid, subvol_mountpoints in self.__mountpoints.items():
if len(subvol_mountpoints) > 0:
return self.get_subvolume_by_id(subvolid)
return None
def get_mountpoints_by_subvolume_id(self, subvolume_id):
return self.__mountpoints[subvolume_id] if subvolume_id in self.__mountpoints else []
def get_nearest_subvolume(self, subvolume):
"""Return the identified subvolume if existing, else the closest matching parent"""
subvolumes_by_path = self.__get_subvolumes_by_path()
while len(subvolume) > 1:
if subvolume in subvolumes_by_path:
return BtrfsSubvolume(self, subvolumes_by_path[subvolume]["id"])
else:
subvolume = re.sub(r"/[^/]+$", "", subvolume)
return BtrfsSubvolume(self, 5)
def get_mountpath_as_child(self, subvolume_name):
"""Find a path to the target subvolume through a mounted ancestor"""
nearest = self.get_nearest_subvolume(subvolume_name)
if nearest.path == subvolume_name:
nearest = nearest.get_parent_subvolume()
if nearest is None or nearest.get_mounted_path() is None:
raise BtrfsModuleException(f"Failed to find a path '{subvolume_name}' through a mounted parent subvolume")
else:
return nearest.get_mounted_path() + os.path.sep + nearest.get_child_relative_path(subvolume_name)
def get_subvolume_children(self, subvolume_id):
return [BtrfsSubvolume(self, x["id"]) for x in self.__subvolumes.values() if x["parent"] == subvolume_id]
def __get_subvolumes_by_path(self):
result = {}
for s in self.__subvolumes.values():
path = s["path"]
result[path] = s
return result
def is_mounted(self):
return self.__mountpoints is not None and len(self.__mountpoints) > 0
def get_summary(self):
subvolumes = []
sources = self.__subvolumes.values() if self.__subvolumes is not None else []
for subvolume in sources:
id = subvolume["id"]
subvolumes.append(
{
"id": id,
"path": subvolume["path"],
"parent": subvolume["parent"],
"mountpoints": self.get_mountpoints_by_subvolume_id(id),
}
)
return {
"default_subvolume": self.__default_subvolid,
"devices": self.__devices,
"label": self.__label,
"uuid": self.__uuid,
"subvolumes": subvolumes,
}
class BtrfsFilesystemsProvider:
"""
Provides methods to query available btrfs filesystems
"""
def __init__(self, module):
self.__module = module
self.__provider = BtrfsInfoProvider(module)
self.__filesystems = None
def get_matching_filesystem(self, criteria):
if criteria["device"] is not None:
criteria["device"] = os.path.realpath(criteria["device"])
self.__check_init()
matching = [f for f in self.__filesystems.values() if self.__filesystem_matches_criteria(f, criteria)]
if len(matching) == 1:
return matching[0]
else:
raise BtrfsModuleException(
f"Found {len(matching)} filesystems matching criteria uuid={criteria['uuid']} label={criteria['label']} device={criteria['device']}"
)
def __filesystem_matches_criteria(self, filesystem, criteria):
return (
(criteria["uuid"] is None or filesystem.uuid == criteria["uuid"])
and (criteria["label"] is None or filesystem.label == criteria["label"])
and (criteria["device"] is None or filesystem.contains_device(criteria["device"]))
)
def get_filesystem_for_device(self, device):
real_device = os.path.realpath(device)
self.__check_init()
for fs in self.__filesystems.values():
if fs.contains_device(real_device):
return fs
return None
def get_filesystems(self):
self.__check_init()
return list(self.__filesystems.values())
def __check_init(self):
if self.__filesystems is None:
self.__filesystems = dict()
for f in self.__provider.get_filesystems():
uuid = f["uuid"]
self.__filesystems[uuid] = BtrfsFilesystem(f, self.__provider, self.__module)