diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 9066eaa4f5..89cafd830c 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -917,6 +917,8 @@ files: labels: logentries $modules/logentries_msg.py: maintainers: jcftang + $modules/logrotate.py: + maintainers: a-gabidullin $modules/logstash_plugin.py: maintainers: nerzhul $modules/lvg.py: diff --git a/plugins/modules/logrotate.py b/plugins/modules/logrotate.py new file mode 100644 index 0000000000..ab4d78d86f --- /dev/null +++ b/plugins/modules/logrotate.py @@ -0,0 +1,938 @@ +#!/usr/bin/python +# Copyright (c) 2026 Aleksandr Gabidullin +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +DOCUMENTATION = r""" +module: logrotate +version_added: 12.3.0 +short_description: Manage logrotate configurations +description: + - Manage logrotate configuration files and settings. + - Create, update, or remove logrotate configurations for applications and services. +author: "Aleksandr Gabidullin (@a-gabidullin)" +requirements: + - logrotate >= 3.8.0 +attributes: + check_mode: + support: full + diff_mode: + support: full +options: + name: + description: + - Name of the logrotate configuration. + - This creates a file in O(config_dir) with this name. + type: str + required: true + aliases: [config_name] + state: + description: + - Whether the configuration should be present or absent. + type: str + choices: [present, absent] + config_dir: + description: + - Directory where logrotate configurations are stored. + - Default is V(/etc/logrotate.d) for system-wide configurations. + - Use V(~/.logrotate.d) for user-specific configurations. + - This directory must exist before using the module. + type: path + paths: + description: + - List of log file paths or patterns to rotate. + - Can include wildcards (for example V(/var/log/app/*.log)). + - Required when creating a new configuration (O(state=present) and config file does not exist). + - Optional when modifying existing configuration (for example to enable/disable). + type: list + elements: path + rotation_period: + description: + - How often to rotate the logs. + - If not specified, existing value be preserved when modifying configuration. + - When creating new configuration, logrotate default (daily) be used if not specified. + type: str + choices: [daily, weekly, monthly, yearly] + rotate_count: + description: + - Number of rotated log files to keep. + - Set to V(0) to disable rotation (keep only current log). + - Set to V(-1) to keep all rotated logs (not recommended). + type: int + compress: + description: + - Compress rotated log files. + type: bool + compressoptions: + description: + - Options to pass to compression program. + - For gzip, use V(-9) for best compression, V(-1) for fastest. + - For xz, use V(-9) for best compression. + type: str + compression_method: + description: + - Compression method to use. + - Requires logrotate 3.18.0 or later for V(xz) and V(zstd). + type: str + choices: [gzip, bzip2, xz, zstd, lzma, lz4] + delaycompress: + description: + - Postpone compression of the previous log file to the next rotation cycle. + - Useful for applications that keep writing to the old log file for some time. + type: bool + nodelaycompress: + description: + - Opposite of O(delaycompress). Ensure compression happens immediately. + - Note that in logrotate, V(nodelaycompress) is the default behavior. + type: bool + shred: + description: + - Use shred to securely delete rotated log files. + - Uses V(shred -u) to overwrite files before deleting. + type: bool + shredcycles: + description: + - Number of times to overwrite files when using O(shred=true). + type: int + missingok: + description: + - Do not issue an error if the log file is missing. + type: bool + ifempty: + description: + - Rotate the log file even if it is empty. + - Opposite of V(notifempty). + type: bool + notifempty: + description: + - Do not rotate the log file if it is empty. + - Opposite of V(ifempty). + type: bool + create: + description: + - Create new log file with specified permissions after rotation. + - Format is V(mode owner group) (for example V(0640 root adm)). + - Set to V(null) or omit to use V(nocreate). + type: str + copytruncate: + description: + - Copy the log file and then truncate it in place. + - Useful for applications that cannot be told to close their logfile. + type: bool + copy: + description: + - Copy the log file but do not truncate the original. + - Takes precedence over O(renamecopy) and O(copytruncate). + type: bool + renamecopy: + description: + - Rename and copy the log file, leaving the original in place. + type: bool + size: + description: + - Rotate log file when it grows bigger than specified size. + - Format is V(number)[k|M|G] (for example V(100M), V(1G)). + - Overrides O(rotation_period) when set. + type: str + minsize: + description: + - Rotate log file only if it has grown bigger than specified size. + - Format is V(number)[k|M|G] (for example V(100M), V(1G)). + - Used with time-based rotation to avoid rotating too small files. + type: str + maxsize: + description: + - Rotate log file when it grows bigger than specified size, but at most once per O(rotation_period). + - Format is V(number)[k|M|G] (for example V(100M), V(1G)). + type: str + maxage: + description: + - Remove rotated logs older than specified number of days. + type: int + dateext: + description: + - Use date as extension for rotated files (YYYYMMDD instead of sequential numbers). + type: bool + dateyesterday: + description: + - Use yesterday's date for O(dateext) instead of today's date. + - Useful for rotating logs that span midnight. + type: bool + dateformat: + description: + - Format for date extension. + - Use with O(dateext=true). + - Format specifiers are V(%Y) year, V(%m) month, V(%d) day, V(%s) seconds since epoch. + type: str + sharedscripts: + description: + - Run O(prerotate) and O(postrotate) scripts only once for all matching log files. + type: bool + prerotate: + description: + - Commands to execute before rotating the log file. + - Can be a single string or list of commands. + type: list + elements: str + postrotate: + description: + - Commands to execute after rotating the log file. + - Can be a single string or list of commands. + type: list + elements: str + firstaction: + description: + - Commands to execute once before all log files that match the wildcard pattern are rotated. + type: list + elements: str + lastaction: + description: + - Commands to execute once after all log files that match the wildcard pattern are rotated. + type: list + elements: str + preremove: + description: + - Commands to execute before removing rotated log files. + type: list + elements: str + su: + description: + - Set user and group for rotated files. + - Format is V(user group) (for example V(www-data adm)). + - Set to V(null) or omit to not set user/group. + type: str + olddir: + description: + - Move rotated logs into specified directory. + type: path + createolddir: + description: + - Create O(olddir) directory if it does not exist. + type: bool + noolddir: + description: + - Keep rotated logs in the same directory as the original log. + type: bool + extension: + description: + - Extension to use for rotated log files (including dot). + - Useful when O(compress=false). + type: str + mail: + description: + - Mail logs to specified address when removed. + - Set to V(null) or omit to not mail logs. + type: str + mailfirst: + description: + - Mail just-created log file, not the about-to-expire one. + type: bool + maillast: + description: + - Mail about-to-expire log file (default). + type: bool + include: + description: + - Include additional configuration files from specified directory. + type: path + tabooext: + description: + - List of extensions that logrotate should not touch. + - Default is V(.rpmorig .rpmsave .v .swp .rpmnew .cfsaved .rhn-cfg-tmp-*). + - Set to V(null) or empty list to clear defaults. + type: list + elements: str + enabled: + description: + - Whether the configuration should be enabled. + - When V(false), adds V(.disabled) extension to the config file. + type: bool + start: + description: + - Base number for rotated files. + - For example, V(1) gives files .1, .2, and so on instead of .0, .1. + type: int + syslog: + description: + - Send logrotate messages to syslog. + type: bool +extends_documentation_fragment: + - community.general.attributes +""" + +EXAMPLES = r""" +- name: Ensure logrotate config directory exists + ansible.builtin.file: + path: /etc/logrotate.d + state: directory + mode: '0755' + +- name: Configure log rotation for Nginx + community.general.logrotate: + name: nginx + paths: + - /var/log/nginx/*.log + rotation_period: daily + rotate_count: 14 + compress: true + compressoptions: "-9" + delaycompress: true + missingok: true + notifempty: true + create: "0640 www-data adm" + sharedscripts: true + postrotate: + - "[ -f /var/run/nginx.pid ] && kill -USR1 $(cat /var/run/nginx.pid)" + - "echo 'Nginx logs rotated'" + +- name: Configure size-based rotation for application logs + community.general.logrotate: + name: myapp + paths: + - /var/log/myapp/app.log + - /var/log/myapp/debug.log + size: 100M + rotate_count: 10 + compress: true + compressoptions: "-1" + dateext: true + dateyesterday: true + dateformat: -%Y%m%d.%s + missingok: true + copytruncate: true + +- name: Configure log rotation with secure deletion + community.general.logrotate: + name: secure-app + paths: + - /var/log/secure-app/*.log + rotation_period: weekly + rotate_count: 4 + shred: true + shredcycles: 3 + compress: true + compressoptions: "-9" + +- name: Configure log rotation with custom start number + community.general.logrotate: + name: custom-start + paths: + - /var/log/custom/*.log + rotation_period: monthly + rotate_count: 6 + start: 1 + compress: true + +- name: Configure log rotation with old directory + community.general.logrotate: + name: with-olddir + paths: + - /opt/app/logs/*.log + rotation_period: weekly + rotate_count: 4 + olddir: /var/log/archives + createolddir: true + compress: true + compression_method: zstd + +- name: Disable logrotate configuration + community.general.logrotate: + name: old-service + enabled: false + +- name: Remove logrotate configuration + community.general.logrotate: + name: deprecated-app + state: absent + +- name: Complex configuration with multiple scripts + community.general.logrotate: + name: complex-app + paths: + - /var/log/complex/*.log + rotation_period: monthly + rotate_count: 6 + compress: true + delaycompress: false + prerotate: | + echo "Starting rotation for complex app" + systemctl stop complex-app + postrotate: | + systemctl start complex-app + echo "Rotation completed" + logger -t logrotate "Complex app logs rotated" + firstaction: "echo 'First action: Starting batch rotation'" + lastaction: "echo 'Last action: Batch rotation complete'" + +- name: User-specific logrotate configuration + community.general.logrotate: + name: myuser-apps + config_dir: ~/.logrotate.d + paths: + - ~/app/*.log + - ~/.cache/*/*.log + rotation_period: daily + rotate_count: 30 + compress: true + su: "{{ ansible_user_id }} users" + +- name: Configuration with copy instead of move + community.general.logrotate: + name: copy-config + paths: + - /var/log/copy-app/*.log + rotation_period: daily + rotate_count: 7 + copy: true + +- name: Configuration with syslog notifications + community.general.logrotate: + name: syslog-config + paths: + - /var/log/syslog-app/*.log + rotation_period: daily + rotate_count: 14 + syslog: true + compress: true + +- name: Configuration without compression + community.general.logrotate: + name: nocompress-config + paths: + - /var/log/nocompress/*.log + rotation_period: daily + rotate_count: 7 + compress: false + +- name: Configuration with custom taboo extensions + community.general.logrotate: + name: taboo-config + paths: + - /var/log/taboo/*.log + rotation_period: daily + rotate_count: 7 + tabooext: [".backup", ".tmp", ".temp"] +""" + +RETURN = r""" +config_file: + description: Path to the created/updated logrotate configuration file. + type: str + returned: when O(state=present) + sample: /etc/logrotate.d/nginx +config_content: + description: The generated logrotate configuration content. + type: str + returned: when O(state=present) + sample: | + /var/log/nginx/*.log { + daily + rotate 14 + compress + compressoptions -9 + delaycompress + missingok + notifempty + create 0640 www-data adm + sharedscripts + postrotate + [ -f /var/run/nginx.pid ] && kill -USR1 $(cat /var/run/nginx.pid) + echo 'Nginx logs rotated' + endscript + } +enabled_state: + description: Current enabled state of the configuration. + type: bool + returned: always + sample: true +backup_file: + description: Path to the backup of the original configuration file, if it was backed up. + type: str + returned: when backup was made + sample: /etc/logrotate.d/nginx.20250101_120000 +""" + +import os +import re + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.common.text.converters import to_native + + +class LogrotateConfig: + """Logrotate configuration manager.""" + + def __init__(self, module: AnsibleModule, logrotate_bin: str) -> None: + self.module = module + self.params = module.params + self.logrotate_bin = logrotate_bin + self.result: dict[str, object] = { + "changed": False, + "config_file": "", + "config_content": "", + "enabled_state": True, + } + + self.config_dir = self.params["config_dir"] + self.config_name = self.params["name"] + self.disabled_suffix = ".disabled" + + self.config_file = self._get_config_path(self.params["enabled"]) + + def _get_config_path(self, enabled: bool) -> str: + """Get config file path based on enabled state.""" + base_path = os.path.join(self.config_dir, self.config_name) + if not enabled: + return base_path + self.disabled_suffix + return base_path + + def _validate_parameters(self) -> None: + """Validate module parameters.""" + if self.params["state"] == "present": + existing_content = self._read_existing_config(any_state=True) + + if not existing_content and not self.params.get("paths"): + self.module.fail_json(msg="'paths' parameter is required when creating a new configuration") + + if self.params.get("size") and self.params.get("maxsize"): + self.module.fail_json(msg="'size' and 'maxsize' parameters are mutually exclusive") + + if self.params.get("copy") and self.params.get("copytruncate"): + self.module.fail_json(msg="'copy' and 'copytruncate' are mutually exclusive") + + if self.params.get("copy") and self.params.get("renamecopy"): + self.module.fail_json(msg="'copy' and 'renamecopy' are mutually exclusive") + + if self.params.get("ifempty") and self.params.get("notifempty"): + self.module.fail_json(msg="'ifempty' and 'notifempty' are mutually exclusive") + + if self.params.get("delaycompress") and self.params.get("nodelaycompress"): + self.module.fail_json(msg="'delaycompress' and 'nodelaycompress' are mutually exclusive") + + if self.params.get("olddir") and self.params.get("noolddir"): + self.module.fail_json(msg="'olddir' and 'noolddir' are mutually exclusive") + + if self.params.get("su"): + su_parts = self.params["su"].split() + if len(su_parts) != 2: + self.module.fail_json(msg="'su' parameter must be in format 'user group'") + + if self.params.get("shredcycles", 1) < 1: + self.module.fail_json(msg="'shredcycles' must be a positive integer") + + if self.params.get("start", 0) < 0: + self.module.fail_json(msg="'start' must be a non-negative integer") + + for size_param in ["size", "minsize", "maxsize"]: + if self.params.get(size_param): + if not re.match(r"^\d+[kMG]?$", self.params[size_param], re.I): + self.module.fail_json( + msg=f"'{size_param}' must be in format 'number[k|M|G]' (for example '100M', '1G')" + ) + + def _read_existing_config(self, any_state: bool = False) -> str | None: + """Read existing configuration file. + + Args: + any_state: If True, check both enabled and disabled versions. + If False, only check based on current enabled param. + """ + if any_state: + for suffix in ["", self.disabled_suffix]: + config_path = os.path.join(self.config_dir, self.config_name + suffix) + if os.path.exists(config_path): + self.result["enabled_state"] = suffix == "" + try: + with open(config_path, "r") as f: + return f.read() + except Exception as e: + self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}") + else: + config_path = self._get_config_path(self.params["enabled"]) + if os.path.exists(config_path): + try: + with open(config_path, "r") as f: + return f.read() + except Exception as e: + self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}") + + return None + + def _generate_config_content(self) -> str: + """Generate logrotate configuration content.""" + if not self.params.get("paths"): + existing_content = self._read_existing_config(any_state=True) + if existing_content: + lines = existing_content.strip().split("\n") + paths = [] + for line in lines: + line = line.strip() + if ( + line + and not line.startswith("#") + and not line.startswith("{") + and not line.startswith("}") + and "/" in line + ): + if not any( + keyword in line + for keyword in [" ", "\t", "daily", "weekly", "monthly", "yearly", "rotate", "compress"] + ): + paths.append(line) + + if paths: + self.params["paths"] = paths + else: + self.params["paths"] = [] + else: + self.module.fail_json( + msg="Cannot generate configuration: no paths specified and no existing configuration found" + ) + + lines = [] + + paths = self.params["paths"] + if isinstance(paths, str): + paths = [paths] + + for path in paths: + lines.append(path) + lines.append("{") + lines.append("") + + rotation_period = self.params.get("rotation_period") + if rotation_period is not None and not self.params.get("size") and not self.params.get("maxsize"): + lines.append(f" {rotation_period}") + + if self.params.get("size") is not None: + lines.append(f" size {self.params['size']}") + elif self.params.get("maxsize") is not None: + lines.append(f" maxsize {self.params['maxsize']}") + + if self.params.get("minsize") is not None: + lines.append(f" minsize {self.params['minsize']}") + + rotate_count = self.params.get("rotate_count") + if rotate_count is not None: + lines.append(f" rotate {rotate_count}") + + start_val = self.params.get("start") + if start_val is not None: + lines.append(f" start {start_val}") + + compress_val = self.params.get("compress") + if compress_val is not None: + if compress_val: + comp_method = self.params.get("compression_method") + if comp_method is not None and comp_method != "gzip": + lines.append(f" compresscmd /usr/bin/{comp_method}") + if comp_method == "zstd": + lines.append(f" uncompresscmd /usr/bin/{comp_method} -d") + elif comp_method == "lz4": + lines.append(f" uncompresscmd /usr/bin/{comp_method} -d") + else: + lines.append(f" uncompresscmd /usr/bin/{comp_method}un{comp_method}") + lines.append(f" compressext .{comp_method}") + lines.append(" compress") + + if self.params.get("compressoptions") is not None: + lines.append(f" compressoptions {self.params['compressoptions']}") + else: + lines.append(" nocompress") + + delaycompress_val = self.params.get("delaycompress") + if delaycompress_val is not None: + if delaycompress_val: + lines.append(" delaycompress") + + nodelaycompress_val = self.params.get("nodelaycompress") + if nodelaycompress_val is not None: + if nodelaycompress_val: + lines.append(" nodelaycompress") + + shred_val = self.params.get("shred") + if shred_val is not None and shred_val: + lines.append(" shred") + shredcycles_val = self.params.get("shredcycles") + if shredcycles_val is not None: + lines.append(f" shredcycles {shredcycles_val}") + + missingok_val = self.params.get("missingok") + if missingok_val is not None: + if not missingok_val: + lines.append(" nomissingok") + else: + lines.append(" missingok") + + ifempty_val = self.params.get("ifempty") + notifempty_val = self.params.get("notifempty") + + if ifempty_val is not None and ifempty_val: + lines.append(" ifempty") + elif notifempty_val is not None and notifempty_val: + lines.append(" notifempty") + + create_val = self.params.get("create") + if create_val is not None: + lines.append(f" create {create_val}") + + copy_val = self.params.get("copy") + renamecopy_val = self.params.get("renamecopy") + copytruncate_val = self.params.get("copytruncate") + + if copy_val is not None and copy_val: + lines.append(" copy") + elif renamecopy_val is not None and renamecopy_val: + lines.append(" renamecopy") + elif copytruncate_val is not None and copytruncate_val: + lines.append(" copytruncate") + + if self.params.get("maxage") is not None: + lines.append(f" maxage {self.params['maxage']}") + + dateext_val = self.params.get("dateext") + if dateext_val is not None and dateext_val: + lines.append(" dateext") + dateyesterday_val = self.params.get("dateyesterday") + if dateyesterday_val is not None and dateyesterday_val: + lines.append(" dateyesterday") + dateformat_val = self.params.get("dateformat") + if dateformat_val is not None: + lines.append(f" dateformat {dateformat_val}") + + sharedscripts_val = self.params.get("sharedscripts") + if sharedscripts_val is not None and sharedscripts_val: + lines.append(" sharedscripts") + + if self.params.get("su") is not None: + lines.append(f" su {self.params['su']}") + + noolddir_val = self.params.get("noolddir") + olddir_val = self.params.get("olddir") + + if noolddir_val is not None and noolddir_val: + lines.append(" noolddir") + elif olddir_val is not None: + lines.append(f" olddir {olddir_val}") + createolddir_val = self.params.get("createolddir") + if createolddir_val is not None and createolddir_val: + lines.append(" createolddir") + + if self.params.get("extension") is not None: + lines.append(f" extension {self.params['extension']}") + + mail_val = self.params.get("mail") + if mail_val is not None: + lines.append(f" mail {mail_val}") + mailfirst_val = self.params.get("mailfirst") + maillast_val = self.params.get("maillast") + if mailfirst_val is not None and mailfirst_val: + lines.append(" mailfirst") + elif maillast_val is not None and maillast_val: + lines.append(" maillast") + + if self.params.get("include") is not None: + lines.append(f" include {self.params['include']}") + + if self.params.get("tabooext") is not None: + tabooext = self.params["tabooext"] + if isinstance(tabooext, list): + tabooext = " ".join(tabooext) + if tabooext.strip(): + lines.append(f" tabooext {tabooext}") + + syslog_val = self.params.get("syslog") + if syslog_val is not None and syslog_val: + lines.append(" syslog") + + scripts = { + "prerotate": self.params.get("prerotate"), + "postrotate": self.params.get("postrotate"), + "firstaction": self.params.get("firstaction"), + "lastaction": self.params.get("lastaction"), + "preremove": self.params.get("preremove"), + } + + for script_name, script_content in scripts.items(): + if script_content is not None: + lines.append(f" {script_name}") + for command in script_content: + for line in command.strip().split("\n"): + if line.strip(): + lines.append(f" {line}") + lines.append(" endscript") + lines.append("") + + lines.append("}") + + return "\n".join(lines) + + def apply(self) -> dict[str, object]: + """Apply logrotate configuration.""" + self._validate_parameters() + state = self.params["state"] + + if not os.path.exists(self.config_dir): + self.module.fail_json( + msg=f"Config directory '{self.config_dir}' does not exist. " + f"Please create it manually or ensure the correct path is specified." + ) + + if state == "absent": + for suffix in ["", self.disabled_suffix]: + config_path = os.path.join(self.config_dir, self.config_name + suffix) + if os.path.exists(config_path): + if not self.module.check_mode: + os.remove(config_path) + self.result["changed"] = True + self.result["config_file"] = config_path + break + return self.result + + existing_content = self._read_existing_config(any_state=True) + current_enabled = self.result.get("enabled_state", True) + + target_enabled = self.params.get("enabled") + if target_enabled is None: + target_enabled = current_enabled + + only_changing_enabled = ( + existing_content is not None and not self.params.get("paths") and target_enabled != current_enabled + ) + + if only_changing_enabled: + old_path = self._get_config_path(not target_enabled) + new_path = self._get_config_path(target_enabled) + + if os.path.exists(old_path) and not os.path.exists(new_path): + self.result["changed"] = True + if not self.module.check_mode: + self.module.atomic_move(old_path, new_path, unsafe_writes=False) + + self.result["config_file"] = new_path + self.result["enabled_state"] = target_enabled + + try: + with open(new_path, "r") as f: + self.result["config_content"] = f.read() + except Exception: + self.result["config_content"] = existing_content + + return self.result + + new_content = self._generate_config_content() + self.result["config_content"] = new_content + self.result["config_file"] = self._get_config_path(target_enabled) + + needs_update = False + + if existing_content is None: + needs_update = True + elif existing_content != new_content: + needs_update = True + elif target_enabled != current_enabled: + needs_update = True + + if needs_update: + self.result["changed"] = True + + if not self.module.check_mode: + for suffix in ["", self.disabled_suffix]: + old_path = os.path.join(self.config_dir, self.config_name + suffix) + if os.path.exists(old_path): + backup_path = self.module.backup_local(old_path) + if backup_path: + self.result["backup_file"] = backup_path + os.remove(old_path) + + try: + config_file_path = str(self.result["config_file"]) + with open(config_file_path, "w") as f: + f.write(new_content) + os.chmod(config_file_path, 0o644) + except Exception as e: + self.module.fail_json( + msg=f"Failed to write config file {self.result['config_file']}: {to_native(e)}" + ) + + if not self.module.check_mode: + test_cmd = [self.logrotate_bin, "-d", str(self.result["config_file"])] + rc, stdout, stderr = self.module.run_command(test_cmd) + if rc != 0: + self.module.warn(f"logrotate configuration test failed: {stderr}") + + self.result["enabled_state"] = target_enabled + return self.result + + +def main() -> None: + """Main function.""" + module = AnsibleModule( + argument_spec=dict( + name=dict(type="str", required=True, aliases=["config_name"]), + state=dict(type="str", choices=["present", "absent"]), + config_dir=dict(type="path"), + paths=dict(type="list", elements="path"), + rotation_period=dict( + type="str", + choices=["daily", "weekly", "monthly", "yearly"], + ), + rotate_count=dict(type="int"), + compress=dict(type="bool"), + compressoptions=dict(type="str"), + compression_method=dict( + type="str", + choices=["gzip", "bzip2", "xz", "zstd", "lzma", "lz4"], + ), + delaycompress=dict(type="bool"), + nodelaycompress=dict(type="bool"), + shred=dict(type="bool"), + shredcycles=dict(type="int"), + missingok=dict(type="bool"), + ifempty=dict(type="bool"), + notifempty=dict(type="bool"), + create=dict(type="str"), + copytruncate=dict(type="bool"), + copy=dict(type="bool"), + renamecopy=dict(type="bool"), + size=dict(type="str"), + minsize=dict(type="str"), + maxsize=dict(type="str"), + maxage=dict(type="int"), + dateext=dict(type="bool"), + dateyesterday=dict(type="bool"), + dateformat=dict(type="str"), + sharedscripts=dict(type="bool"), + prerotate=dict(type="list", elements="str"), + postrotate=dict(type="list", elements="str"), + firstaction=dict(type="list", elements="str"), + lastaction=dict(type="list", elements="str"), + preremove=dict(type="list", elements="str"), + su=dict(type="str"), + olddir=dict(type="path"), + createolddir=dict(type="bool"), + noolddir=dict(type="bool"), + extension=dict(type="str"), + mail=dict(type="str"), + mailfirst=dict(type="bool"), + maillast=dict(type="bool"), + include=dict(type="path"), + tabooext=dict(type="list", elements="str"), + enabled=dict(type="bool"), + start=dict(type="int"), + syslog=dict(type="bool"), + ), + supports_check_mode=True, + ) + + logrotate_bin = module.get_bin_path("logrotate", required=True) + + logrotate_config = LogrotateConfig(module, logrotate_bin) + result = logrotate_config.apply() + module.exit_json(**result) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/plugins/modules/test_logrotate.py b/tests/unit/plugins/modules/test_logrotate.py new file mode 100644 index 0000000000..1769703216 --- /dev/null +++ b/tests/unit/plugins/modules/test_logrotate.py @@ -0,0 +1,1059 @@ +# Copyright (c) 2026 Aleksandr Gabidullin +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import os +import shutil +import sys +import tempfile +import unittest +from unittest.mock import Mock, mock_open, patch + + +class TestLogrotateConfig(unittest.TestCase): + """Unit tests for the logrotate_config module.""" + + @classmethod + def setUpClass(cls): + """Set up test environment.""" + cls.test_dir = tempfile.mkdtemp() + cls.mock_ansible_basic = Mock() + cls.mock_ansible_basic.AnsibleModule = Mock() + cls.mock_converters = Mock() + cls.mock_converters.to_native = str + cls.patcher_basic = patch.dict( + "sys.modules", + { + "ansible.module_utils.basic": cls.mock_ansible_basic, + "ansible.module_utils.common.text.converters": cls.mock_converters, + }, + ) + cls.patcher_basic.start() + + @classmethod + def tearDownClass(cls): + """Clean up after all tests.""" + cls.patcher_basic.stop() + if os.path.exists(cls.test_dir): + shutil.rmtree(cls.test_dir) + + def setUp(self): + """Set up test fixtures.""" + self.mock_ansible_basic.AnsibleModule.reset_mock() + self.mock_module = Mock() + self.mock_module.params = {} + self.mock_module.fail_json = Mock(side_effect=Exception("fail_json called")) + self.mock_module.exit_json = Mock() + self.mock_module.check_mode = False + self.mock_module.get_bin_path = Mock(return_value="/usr/sbin/logrotate") + self.mock_module.atomic_move = Mock() + self.mock_module.warn = Mock() + self.mock_module.run_command = Mock(return_value=(0, "", "")) + self.mock_ansible_basic.AnsibleModule.return_value = self.mock_module + self.config_dir = os.path.join(self.test_dir, "logrotate.d") + os.makedirs(self.config_dir, exist_ok=True) + for module_name in list(sys.modules.keys()): + if "logrotate" in module_name or "ansible_collections.community.general.plugins.modules" in module_name: + del sys.modules[module_name] + + def tearDown(self): + """Clean up after test.""" + pass + + def _setup_module_params(self, **params): + """Helper to set up module parameters.""" + default_params = { + "name": "test", + "state": "present", + "config_dir": self.config_dir, + "paths": ["/var/log/test/*.log"], + "rotate_count": 7, + "compress": True, + "compression_method": "gzip", + "delaycompress": False, + "missingok": True, + "ifempty": False, + "notifempty": True, + "copytruncate": False, + "dateext": False, + "dateformat": "-%Y%m%d", + "sharedscripts": False, + "enabled": True, + } + default_params.update(params) + self.mock_module.params = default_params + + def test_create_new_configuration(self): + """Test creating a new logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params() + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()) as mock_file: + with patch("os.chmod") as mock_chmod: + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("config_file", result) + self.assertIn("config_content", result) + self.assertEqual(result["enabled_state"], True) + mock_file.assert_called_once() + mock_chmod.assert_called_once() + + def test_update_existing_configuration(self): + """Test updating an existing logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(rotate_count=14) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/test/*.log { + daily + rotate 7 + compress + delaycompress + missingok + notifempty +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove") as mock_remove: + with patch("os.chmod") as mock_chmod: + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("14", result["config_content"]) + mock_remove.assert_called() + mock_chmod.assert_called_once() + + def test_remove_configuration(self): + """Test removing a logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(state="absent") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + return path in (config_path, config_path + ".disabled") + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.remove") as mock_remove: + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertTrue(mock_remove.called) + + def test_disable_configuration(self): + """Test disabling a logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(enabled=False) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/test/*.log { + daily + rotate 7 +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove"): + with patch("os.chmod"): + mock_file_write = mock_open() + with patch("builtins.open", mock_file_write): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertEqual(result["enabled_state"], False) + self.assertTrue(result["config_file"].endswith(".disabled")) + + def test_enable_configuration(self): + """Test enabling a disabled logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(enabled=True) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/test/*.log { + daily + rotate 7 +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path + ".disabled": + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove"): + with patch("os.chmod"): + self.mock_module.atomic_move = Mock() + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertEqual(result["enabled_state"], True) + self.assertFalse(result["config_file"].endswith(".disabled")) + + def test_validation_missing_paths(self): + """Test validation when paths are missing for new configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(paths=None) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_size_and_maxsize_exclusive(self): + """Test validation when both size and maxsize are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(size="100M", maxsize="200M") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_check_mode(self): + """Test that no changes are made in check mode.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params() + self.mock_module.check_mode = True + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + + def test_generate_config_with_scripts(self): + """Test generating configuration with pre/post scripts.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params( + prerotate=["echo 'Pre-rotation'"], + postrotate=["systemctl reload test", "logger 'Rotation done'"], + firstaction=["echo 'First action'"], + lastaction=["echo 'Last action'"], + ) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("prerotate", content) + self.assertIn("postrotate", content) + self.assertIn("firstaction", content) + self.assertIn("lastaction", content) + self.assertIn("systemctl reload test", content) + self.assertIn("echo 'Pre-rotation'", content) + + def test_compression_methods(self): + """Test different compression methods.""" + from ansible_collections.community.general.plugins.modules import logrotate + + compression_methods = ["gzip", "bzip2", "xz", "zstd", "lzma", "lz4"] + for method in compression_methods: + with self.subTest(method=method): + self._setup_module_params(compression_method=method) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + if method != "gzip": + self.assertIn(f"compresscmd /usr/bin/{method}", content) + if method == "zstd" or method == "lz4": + self.assertIn(f"uncompresscmd /usr/bin/{method} -d", content) + else: + uncompress_cmd = f"{method}un{method}" + self.assertIn(f"uncompresscmd /usr/bin/{uncompress_cmd}", content) + + def test_size_based_rotation(self): + """Test size-based rotation configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(size="100M", rotation_period="daily") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("size 100M", content) + self.assertNotIn("daily", content) + + def test_logrotate_not_installed(self): + """Test error when logrotate is not installed.""" + from ansible_collections.community.general.plugins.modules import logrotate + + mock_module_for_test = Mock() + mock_module_for_test.params = { + "name": "test", + "state": "present", + "config_dir": self.config_dir, + "paths": ["/var/log/test/*.log"], + "rotate_count": 7, + "compress": True, + "compression_method": "gzip", + "delaycompress": False, + "missingok": True, + "ifempty": False, + "notifempty": True, + "copytruncate": False, + "dateext": False, + "dateformat": "-%Y%m%d", + "sharedscripts": False, + "enabled": True, + } + mock_module_for_test.fail_json = Mock(side_effect=Exception("fail_json called")) + mock_module_for_test.exit_json = Mock() + mock_module_for_test.check_mode = False + + def get_bin_path_side_effect(name, required=False): + if name == "logrotate" and required: + mock_module_for_test.fail_json(msg=f"Failed to find required executable '{name}' in PATH") + return None + + mock_module_for_test.get_bin_path = Mock(side_effect=get_bin_path_side_effect) + mock_module_for_test.atomic_move = Mock() + mock_module_for_test.warn = Mock() + mock_module_for_test.run_command = Mock(return_value=(0, "", "")) + + with patch( + "ansible_collections.community.general.plugins.modules.logrotate.AnsibleModule", + return_value=mock_module_for_test, + ): + with self.assertRaises(Exception) as context: + logrotate.main() + + self.assertIn("fail_json called", str(context.exception)) + + mock_module_for_test.get_bin_path.assert_called_once_with("logrotate", required=True) + mock_module_for_test.fail_json.assert_called_once() + + def test_parse_existing_config_paths(self): + """Test parsing paths from existing configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(paths=None) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/app1/*.log +{ + daily + rotate 7 + compress +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + mock_file_read = mock_open(read_data=existing_content) + with patch("builtins.open", mock_file_read): + with patch("os.remove"): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("/var/log/app1/*.log", result["config_content"]) + + def test_nodelaycompress_parameter(self): + """Test nodelaycompress parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(nodelaycompress=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("nodelaycompress", content) + self.assertTrue(result["changed"]) + + def test_shred_and_shredcycles_parameters(self): + """Test shred and shredcycles parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(shred=True, shredcycles=3) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("shred", content) + self.assertIn("shredcycles 3", content) + self.assertTrue(result["changed"]) + + def test_copy_parameter(self): + """Test copy parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(copy=True, copytruncate=False) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("copy", content) + self.assertNotIn("copytruncate", content) + self.assertTrue(result["changed"]) + + def test_renamecopy_parameter(self): + """Test renamecopy parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(renamecopy=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("renamecopy", content) + self.assertTrue(result["changed"]) + + def test_minsize_parameter(self): + """Test minsize parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(minsize="100k") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("minsize 100k", content) + self.assertTrue(result["changed"]) + + def test_dateyesterday_parameter(self): + """Test dateyesterday parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(dateext=True, dateyesterday=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("dateext", content) + self.assertIn("dateyesterday", content) + self.assertTrue(result["changed"]) + + def test_createolddir_parameter(self): + """Test createolddir parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(olddir="/var/log/archives", createolddir=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("olddir /var/log/archives", content) + self.assertIn("createolddir", content) + self.assertTrue(result["changed"]) + + def test_start_parameter(self): + """Test start parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(start=1) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("start 1", content) + self.assertTrue(result["changed"]) + + def test_syslog_parameter(self): + """Test syslog parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(syslog=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("syslog", content) + self.assertTrue(result["changed"]) + + def test_validation_copy_and_copytruncate_exclusive(self): + """Test validation when both copy and copytruncate are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(copy=True, copytruncate=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_copy_and_renamecopy_exclusive(self): + """Test validation when both copy and renamecopy are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(copy=True, renamecopy=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_shredcycles_positive(self): + """Test validation when shredcycles is not positive.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(shredcycles=0) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_start_non_negative(self): + """Test validation when start is negative.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(start=-1) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_olddir_and_noolddir_exclusive(self): + """Test validation when both olddir and noolddir are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(olddir="/var/log/archives", noolddir=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_all_new_parameters_together(self): + """Test all new parameters together in one configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params( + nodelaycompress=True, + shred=True, + shredcycles=3, + copy=True, + minsize="100k", + dateext=True, + dateyesterday=True, + olddir="/var/log/archives", + createolddir=True, + start=1, + syslog=True, + renamecopy=False, + copytruncate=False, + delaycompress=False, + ) + + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertTrue(result["changed"]) + + self.assertIn("nodelaycompress", content) + self.assertIn("shred", content) + self.assertIn("shredcycles 3", content) + self.assertIn("copy", content) + self.assertIn("minsize 100k", content) + self.assertIn("dateext", content) + self.assertIn("dateyesterday", content) + self.assertIn("olddir /var/log/archives", content) + self.assertIn("createolddir", content) + self.assertIn("start 1", content) + self.assertIn("syslog", content) + + lines = [line.strip() for line in content.split("\n")] + self.assertNotIn("copytruncate", lines) + self.assertNotIn("renamecopy", lines) + self.assertNotIn("delaycompress", lines) + + def test_parameter_interactions(self): + """Test interactions between related parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(olddir="/var/log/archives", noolddir=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + self._setup_module_params(copy=True, renamecopy=True) + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + self._setup_module_params(copy=True, copytruncate=True) + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + def test_size_format_validation(self): + """Test validation of size format parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + valid_sizes = ["100k", "100M", "1G", "10", "500K", "2M", "3G"] + + for size in valid_sizes: + with self.subTest(valid_size=size): + self._setup_module_params(size=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + result = config.apply() + self.assertIn(f"size {size}", result["config_content"]) + + invalid_sizes = ["100kb", "M100", "1.5G", "abc", "100 MB"] + + for size in invalid_sizes: + with self.subTest(invalid_size=size): + self._setup_module_params(size=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + def test_maxsize_format_validation(self): + """Test validation of maxsize format parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + valid_sizes = ["100k", "100M", "1G", "10", "500K", "2M", "3G"] + + for size in valid_sizes: + with self.subTest(valid_size=size): + self._setup_module_params(maxsize=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + result = config.apply() + self.assertIn(f"maxsize {size}", result["config_content"]) + + invalid_sizes = ["100kb", "M100", "1.5G", "abc", "100 MB"] + + for size in invalid_sizes: + with self.subTest(invalid_size=size): + self._setup_module_params(maxsize=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + def test_logrotate_bin_used_in_apply(self): + """Test that logrotate binary path is used in apply method.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params() + + test_logrotate_path = "/usr/local/sbin/logrotate" + self.mock_module.get_bin_path.return_value = test_logrotate_path + + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.makedirs"): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + config = logrotate.LogrotateConfig(self.mock_module, test_logrotate_path) + config.apply() + + self.mock_module.run_command.assert_called_once() + call_args = self.mock_module.run_command.call_args[0][0] + self.assertEqual(call_args[0], test_logrotate_path) + + +if __name__ == "__main__": + unittest.main()