1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-02-04 16:01:55 +00:00
community.general/plugins/modules/crypttab.py
Alexei Znamensky 41923e43bd
fix ruff case SIM103 (#11119)
* fix ruff case SIM103

* add changelog frag
2025-11-12 21:12:47 +01:00

345 lines
11 KiB
Python

#!/usr/bin/python
# Copyright (c) 2014, Steve <yo@groks.org>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
DOCUMENTATION = r"""
module: crypttab
short_description: Encrypted Linux block devices
description:
- Control Linux encrypted block devices that are set up during system boot in C(/etc/crypttab).
extends_documentation_fragment:
- community.general.attributes
attributes:
check_mode:
support: full
diff_mode:
support: none
options:
name:
description:
- Name of the encrypted block device as it appears in the C(/etc/crypttab) file, or optionally prefixed with V(/dev/mapper/),
as it appears in the filesystem. V(/dev/mapper/) is stripped from O(name).
type: str
required: true
state:
description:
- Use V(present) to add a line to C(/etc/crypttab) or update its definition if already present.
- Use V(absent) to remove a line with matching O(name).
- Use V(opts_present) to add options to those already present; options with different values are updated.
- Use V(opts_absent) to remove options from the existing set.
type: str
required: true
choices: [absent, opts_absent, opts_present, present]
backing_device:
description:
- Path to the underlying block device or file, or the UUID of a block-device prefixed with V(UUID=).
type: str
password:
description:
- Encryption password, the path to a file containing the password, or V(-) or unset if the password should be entered
at boot.
type: path
opts:
description:
- A comma-delimited list of options. See V(crypttab(5\)) for details.
type: str
path:
description:
- Path to file to use instead of V(/etc/crypttab).
- This might be useful in a chroot environment.
type: path
default: /etc/crypttab
author:
- Steve (@groks)
"""
EXAMPLES = r"""
- name: Set the options explicitly a device which must already exist
community.general.crypttab:
name: luks-home
state: present
opts: discard,cipher=aes-cbc-essiv:sha256
- name: Add the 'discard' option to any existing options for all devices
community.general.crypttab:
name: '{{ item.device }}'
state: opts_present
opts: discard
loop: '{{ ansible_mounts }}'
when: "'/dev/mapper/luks-' in item.device"
- name: Add entry to /etc/crypttab for luks-home with password file
community.general.crypttab:
name: luks-home
backing_device: UUID=123e4567-e89b-12d3-a456-426614174000
password: /root/keys/luks-home.key
opts: discard,cipher=aes-cbc-essiv:sha256
state: present
"""
import os
import traceback
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_bytes
def main():
module = AnsibleModule(
argument_spec=dict(
name=dict(type="str", required=True),
state=dict(type="str", required=True, choices=["absent", "opts_absent", "opts_present", "present"]),
backing_device=dict(type="str"),
password=dict(type="path"),
opts=dict(type="str"),
path=dict(type="path", default="/etc/crypttab"),
),
supports_check_mode=True,
)
backing_device = module.params["backing_device"]
password = module.params["password"]
opts = module.params["opts"]
state = module.params["state"]
path = module.params["path"]
name = module.params["name"]
if name.startswith("/dev/mapper/"):
name = name[len("/dev/mapper/") :]
if state != "absent" and backing_device is None and password is None and opts is None:
module.fail_json(msg="expected one or more of 'backing_device', 'password' or 'opts'", **module.params)
if "opts" in state and (backing_device is not None or password is not None):
module.fail_json(msg=f"cannot update 'backing_device' or 'password' when state={state}", **module.params)
for arg_name, arg in (("name", name), ("backing_device", backing_device), ("password", password), ("opts", opts)):
if arg is not None and (" " in arg or "\t" in arg or arg == ""):
module.fail_json(msg=f"invalid '{arg_name}': contains white space or is empty", **module.params)
try:
crypttab = Crypttab(path)
existing_line = crypttab.match(name)
except Exception as e:
module.fail_json(
msg=f"failed to open and parse crypttab file: {e}", exception=traceback.format_exc(), **module.params
)
if "present" in state and existing_line is None and backing_device is None:
module.fail_json(msg="'backing_device' required to add a new entry", **module.params)
changed, reason = False, "?"
if state == "absent":
if existing_line is not None:
changed, reason = existing_line.remove()
elif state == "present":
if existing_line is not None:
changed, reason = existing_line.set(backing_device, password, opts)
else:
changed, reason = crypttab.add(Line(None, name, backing_device, password, opts))
elif state == "opts_present":
if existing_line is not None:
changed, reason = existing_line.opts.add(opts)
else:
changed, reason = crypttab.add(Line(None, name, backing_device, password, opts))
elif state == "opts_absent":
if existing_line is not None:
changed, reason = existing_line.opts.remove(opts)
if changed and not module.check_mode:
with open(path, "wb") as f:
f.write(to_bytes(crypttab, errors="surrogate_or_strict"))
module.exit_json(changed=changed, msg=reason, **module.params)
class Crypttab:
_lines: list[str]
def __init__(self, path):
self.path = path
self._lines = []
if not os.path.exists(path):
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
open(path, "a").close()
with open(path, "r") as f:
for line in f.readlines():
self._lines.append(Line(line))
def add(self, line):
self._lines.append(line)
return True, "added line"
def lines(self):
for line in self._lines:
if line.valid():
yield line
def match(self, name):
for line in self.lines():
if line.name == name:
return line
return None
def __str__(self):
lines = []
for line in self._lines:
lines.append(str(line))
crypttab = "\n".join(lines)
if len(crypttab) == 0:
crypttab += "\n"
if crypttab[-1] != "\n":
crypttab += "\n"
return crypttab
class Line:
def __init__(self, line=None, name=None, backing_device=None, password=None, opts=None):
self.line = line
self.name = name
self.backing_device = backing_device
self.password = password
self.opts = Options(opts)
if line is not None:
self.line = self.line.rstrip("\n")
if self._line_valid(line):
self.name, backing_device, password, opts = self._split_line(line)
self.set(backing_device, password, opts)
def set(self, backing_device, password, opts):
changed = False
if backing_device is not None and self.backing_device != backing_device:
self.backing_device = backing_device
changed = True
if password is not None and self.password != password:
self.password = password
changed = True
if opts is not None:
opts = Options(opts)
if opts != self.opts:
self.opts = opts
changed = True
return changed, "updated line"
def _line_valid(self, line):
return line.strip() and not line.startswith("#") and len(line.split()) in (2, 3, 4)
def _split_line(self, line):
fields = line.split()
try:
field2 = fields[2]
except IndexError:
field2 = None
try:
field3 = fields[3]
except IndexError:
field3 = None
return (fields[0], fields[1], field2, field3)
def remove(self):
self.line, self.name, self.backing_device = "", None, None
return True, "removed line"
def valid(self):
return self.name is not None and self.backing_device is not None
def __str__(self):
if self.valid():
fields = [self.name, self.backing_device]
if self.password is not None or self.opts:
if self.password is not None:
fields.append(self.password)
else:
fields.append("none")
if self.opts:
fields.append(str(self.opts))
return " ".join(fields)
return self.line
class Options(dict):
"""opts_string looks like: 'discard,foo=bar,baz=greeble'"""
def __init__(self, opts_string):
super().__init__()
self.itemlist = []
if opts_string is not None:
for opt in opts_string.split(","):
kv = opt.split("=")
if len(kv) > 1:
k, v = (kv[0], kv[1])
else:
k, v = (kv[0], None)
self[k] = v
def add(self, opts_string):
changed = False
for k, v in Options(opts_string).items():
if k in self:
if self[k] != v:
changed = True
else:
changed = True
self[k] = v
return changed, "updated options"
def remove(self, opts_string):
changed = False
for k in Options(opts_string):
if k in self:
del self[k]
changed = True
return changed, "removed options"
def keys(self):
return self.itemlist
def values(self):
return [self[key] for key in self]
def items(self):
return [(key, self[key]) for key in self]
def __iter__(self):
return iter(self.itemlist)
def __setitem__(self, key, value):
if key not in self:
self.itemlist.append(key)
super().__setitem__(key, value)
def __delitem__(self, key):
self.itemlist.remove(key)
super().__delitem__(key)
def __ne__(self, obj):
return not (isinstance(obj, Options) and sorted(self.items()) == sorted(obj.items()))
def __str__(self):
ret = []
for k, v in self.items():
if v is None:
ret.append(k)
else:
ret.append(f"{k}={v}")
return ",".join(ret)
if __name__ == "__main__":
main()