From 79d8c9bd6ef6fdc255f0ae737a860df7e91b21d5 Mon Sep 17 00:00:00 2001 From: "patchback[bot]" <45432694+patchback[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 08:01:39 +0100 Subject: [PATCH] [PR #11424/f0e3edc8 backport][stable-12] New module: `logrotate` (#11581) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New module: `logrotate` (#11424) * add module logrotate * add values for start * fix docs * version 12.5.0 and fix test --------- (cherry picked from commit f0e3edc89293ff1bc08ec16e27b04c77b278a2b1) Co-authored-by: Aleksandr Gabidullin <101321307+a-gabidullin@users.noreply.github.com> Co-authored-by: Александр Габидуллин --- .github/BOTMETA.yml | 2 + plugins/modules/logrotate.py | 937 ++++++++++++++++ tests/unit/plugins/modules/test_logrotate.py | 1039 ++++++++++++++++++ 3 files changed, 1978 insertions(+) create mode 100644 plugins/modules/logrotate.py create mode 100644 tests/unit/plugins/modules/test_logrotate.py diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 7fb8f6129a..b3873a1c42 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -930,6 +930,8 @@ files: labels: logentries $modules/logentries_msg.py: maintainers: jcftang + $modules/logrotate.py: + maintainers: a-gabidullin $modules/logstash_plugin.py: maintainers: nerzhul $modules/lvg.py: diff --git a/plugins/modules/logrotate.py b/plugins/modules/logrotate.py new file mode 100644 index 0000000000..ee9d71e8d5 --- /dev/null +++ b/plugins/modules/logrotate.py @@ -0,0 +1,937 @@ +#!/usr/bin/python +# Copyright (c) 2026 Aleksandr Gabidullin +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +DOCUMENTATION = r""" +module: logrotate +version_added: 12.5.0 +short_description: Manage logrotate configurations +description: + - Manage C(logrotate) configuration files and settings. + - Create, update, or remove C(logrotate) configurations for applications and services. +author: "Aleksandr Gabidullin (@a-gabidullin)" +requirements: + - logrotate >= 3.8.0 +attributes: + check_mode: + support: full + diff_mode: + support: full +options: + name: + description: + - Name of the logrotate configuration. + - This creates a file in O(config_dir) with this name. + type: str + required: true + aliases: [config_name] + state: + description: + - Whether the configuration should be present or absent. + type: str + choices: [present, absent] + config_dir: + description: + - Directory where logrotate configurations are stored. + - Default is V(/etc/logrotate.d) for system-wide configurations. + - Use V(~/.logrotate.d) for user-specific configurations. + - This directory must exist before using the module. + type: path + paths: + description: + - List of log file paths or patterns to rotate. + - Can include wildcards (for example V(/var/log/app/*.log)). + - Required when creating a new configuration (O(state=present) and config file does not exist). + - Optional when modifying existing configuration (for example to enable/disable). + type: list + elements: path + rotation_period: + description: + - How often to rotate the logs. + - If not specified when modifying an existing configuration, the existing value is preserved. + - When creating a new configuration, this option is only included if specified. + type: str + choices: [daily, weekly, monthly, yearly] + rotate_count: + description: + - Number of rotated log files to keep. + - Set to V(0) to disable rotation (keep only current log). + - Set to V(-1) to keep all rotated logs (not recommended). + type: int + compress: + description: + - Compress rotated log files. + type: bool + compress_options: + description: + - Options to pass to compression program. + - For O(compression_method=gzip), use V(-9) for best compression, V(-1) for fastest. + - For O(compression_method=xz), use V(-9) for best compression. + type: str + compression_method: + description: + - Compression method to use. + - Requires logrotate 3.18.0 or later for V(xz) and V(zstd). + type: str + choices: [gzip, bzip2, xz, zstd, lzma, lz4] + delay_compress: + description: + - Postpone compression of the previous log file to the next rotation cycle. + - Useful for applications that keep writing to the old log file for some time. + type: bool + no_delay_compress: + description: + - Opposite of O(delay_compress). Ensure compression happens immediately. + - Note that in logrotate, V(no_delay_compress) is the default behavior. + type: bool + shred: + description: + - Use C(shred) to securely delete rotated log files. + - Uses C(shred -u) to overwrite files before deleting. + type: bool + shred_cycles: + description: + - Number of times to overwrite files when using O(shred=true). + type: int + missing_ok: + description: + - Do not issue an error if the log file is missing. + type: bool + not_if_empty: + description: + - Do not rotate the log file if it is empty. + - Set to V(false) to rotate even empty log files (equivalent to C(ifempty) in logrotate). + type: bool + create: + description: + - Create new log file with specified permissions after rotation. + - Format is V(mode owner group) (for example V(0640 root adm)). + - Set to V(null) or omit to use V(nocreate). + type: str + copy_truncate: + description: + - Copy the log file and then truncate it in place. + - Useful for applications that cannot be told to close their logfile. + type: bool + copy: + description: + - Copy the log file but do not truncate the original. + - Takes precedence over O(rename_copy) and O(copy_truncate). + type: bool + rename_copy: + description: + - Rename and copy the log file, leaving the original in place. + type: bool + size: + description: + - Rotate log file when it grows bigger than specified size. + - Format is V(number[k|M|G]) (for example V(100M), V(1G)). + - Overrides O(rotation_period) when set. + - If not specified, existing value be preserved when modifying configuration. + - When creating new configuration, this option be omitted if not specified. + type: str + min_size: + description: + - Rotate log file only if it has grown bigger than specified size. + - Format is V(number[k|M|G]) (for example V(100M), V(1G)). + - Used with time-based rotation to avoid rotating too small files. + type: str + max_size: + description: + - Rotate log file when it grows bigger than specified size, but at most once per O(rotation_period). + - Format is V(number[k|M|G]) (for example V(100M), V(1G)). + type: str + max_age: + description: + - Remove rotated logs older than specified number of days. + type: int + date_ext: + description: + - Use date as extension for rotated files (using the date format specified in O(date_format) instead of sequential + numbers). + type: bool + date_yesterday: + description: + - Use yesterday's date for O(date_ext) instead of today's date. + - Useful for rotating logs that span midnight. + type: bool + date_format: + description: + - Format for date extension. + - Use with O(date_ext=true). + - Format specifiers are V(%Y) year, V(%m) month, V(%d) day, V(%s) seconds since epoch. + type: str + shared_scripts: + description: + - Run O(pre_rotate) and O(post_rotate) scripts only once for all matching log files. + type: bool + pre_rotate: + description: + - Commands to execute before rotating the log file. + - Can be a single string or list of commands. + type: list + elements: str + post_rotate: + description: + - Commands to execute after rotating the log file. + - Can be a single string or list of commands. + type: list + elements: str + first_action: + description: + - Commands to execute once before all log files that match the wildcard pattern are rotated. + type: list + elements: str + last_action: + description: + - Commands to execute once after all log files that match the wildcard pattern are rotated. + type: list + elements: str + pre_remove: + description: + - Commands to execute before removing rotated log files. + type: list + elements: str + su: + description: + - Set user and group for rotated files. + - Format is V(user group) (for example, V(www-data adm)). + - Set to V("") (empty string) to remove the directive from existing configurations. + - Set to V(null) or omit to leave the existing value unchanged (when modifying) or not set (when creating). + type: str + old_dir: + description: + - Move rotated logs into specified directory. + type: path + create_old_dir: + description: + - Create O(old_dir) directory if it does not exist. + type: bool + no_old_dir: + description: + - Keep rotated logs in the same directory as the original log. + type: bool + extension: + description: + - Extension to use for rotated log files (including dot). + - Useful when O(compress=false). + type: str + mail: + description: + - Mail logs to specified address when removed. + - Set to V(null) or omit to not mail logs. + type: str + mail_first: + description: + - Mail just-created log file, not the about-to-expire one. + type: bool + mail_last: + description: + - Mail about-to-expire log file (default). + type: bool + include: + description: + - Include additional configuration files from specified directory. + type: path + taboo_ext: + description: + - List of extensions that logrotate should not touch. + - Set to V(null) or empty list to clear defaults. + type: list + elements: str + enabled: + description: + - Whether the configuration should be enabled. + - When V(false), adds V(.disabled) extension to the config file. + type: bool + start: + description: + - Base number for rotated files. Allowed values are from V(0) to V(999). + - For example, V(1) gives files C(.1), C(.2), and so on instead of C(.0), C(.1). + type: int + syslog: + description: + - Send logrotate messages to syslog. + type: bool +extends_documentation_fragment: + - community.general.attributes +""" + +EXAMPLES = r""" +- name: Ensure logrotate config directory exists + ansible.builtin.file: + path: /etc/logrotate.d + state: directory + mode: '0755' + +- name: Configure log rotation for Nginx + community.general.logrotate: + name: nginx + paths: + - /var/log/nginx/*.log + rotation_period: daily + rotate_count: 14 + compress: true + compress_options: "-9" + delay_compress: true + missing_ok: true + not_if_empty: true + create: "0640 www-data adm" + shared_scripts: true + post_rotate: + - "[ -f /var/run/nginx.pid ] && kill -USR1 $(cat /var/run/nginx.pid)" + - "echo 'Nginx logs rotated'" + +- name: Configure size-based rotation for application logs + community.general.logrotate: + name: myapp + paths: + - /var/log/myapp/app.log + - /var/log/myapp/debug.log + size: 100M + rotate_count: 10 + compress: true + compress_options: "-1" + date_ext: true + date_yesterday: true + date_format: -%Y%m%d.%s + missing_ok: true + copy_truncate: true + +- name: Configure log rotation with secure deletion + community.general.logrotate: + name: secure-app + paths: + - /var/log/secure-app/*.log + rotation_period: weekly + rotate_count: 4 + shred: true + shred_cycles: 3 + compress: true + compress_options: "-9" + +- name: Configure log rotation with custom start number + community.general.logrotate: + name: custom-start + paths: + - /var/log/custom/*.log + rotation_period: monthly + rotate_count: 6 + start: 1 + compress: true + +- name: Configure log rotation with old directory + community.general.logrotate: + name: with-old-dir + paths: + - /opt/app/logs/*.log + rotation_period: weekly + rotate_count: 4 + old_dir: /var/log/archives + create_old_dir: true + compress: true + compression_method: zstd + +- name: Disable logrotate configuration + community.general.logrotate: + name: old-service + enabled: false + +- name: Remove logrotate configuration + community.general.logrotate: + name: deprecated-app + state: absent + +- name: Complex configuration with multiple scripts + community.general.logrotate: + name: complex-app + paths: + - /var/log/complex/*.log + rotation_period: monthly + rotate_count: 6 + compress: true + delay_compress: false + pre_rotate: + - "echo 'Starting rotation for complex app'" + - "systemctl stop complex-app" + post_rotate: + - "systemctl start complex-app" + - "echo 'Rotation completed'" + - "logger -t logrotate 'Complex app logs rotated'" + first_action: + - "echo 'First action: Starting batch rotation'" + last_action: + - "echo 'Last action: Batch rotation complete'" + +- name: User-specific logrotate configuration + community.general.logrotate: + name: myuser-apps + config_dir: ~/.logrotate.d + paths: + - ~/app/*.log + - ~/.cache/*/*.log + rotation_period: daily + rotate_count: 30 + compress: true + su: "{{ ansible_user_id }} users" + +- name: Configuration with copy instead of move + community.general.logrotate: + name: copy-config + paths: + - /var/log/copy-app/*.log + rotation_period: daily + rotate_count: 7 + copy: true + +- name: Configuration with syslog notifications + community.general.logrotate: + name: syslog-config + paths: + - /var/log/syslog-app/*.log + rotation_period: daily + rotate_count: 14 + syslog: true + compress: true + +- name: Configuration without compression + community.general.logrotate: + name: nocompress-config + paths: + - /var/log/nocompress/*.log + rotation_period: daily + rotate_count: 7 + compress: false + +- name: Configuration with custom taboo extensions + community.general.logrotate: + name: taboo-config + paths: + - /var/log/taboo/*.log + rotation_period: daily + rotate_count: 7 + taboo_ext: [".backup", ".tmp", ".temp"] +""" + +RETURN = r""" +config_file: + description: Path to the created/updated logrotate configuration file. + type: str + returned: success when O(state=present) + sample: /etc/logrotate.d/nginx +config_content: + description: The generated logrotate configuration content. + type: str + returned: success when O(state=present) + sample: | + /var/log/nginx/*.log { + daily + rotate 14 + compress + compress_options -9 + delay_compress + missing_ok + notifempty + create 0640 www-data adm + shared_scripts + post_rotate + [ -f /var/run/nginx.pid ] && kill -USR1 $(cat /var/run/nginx.pid) + echo 'Nginx logs rotated' + endscript + } +enabled_state: + description: Current enabled state of the configuration. + type: bool + returned: success + sample: true +backup_file: + description: Path to the backup of the original configuration file, if it was backed up. + type: str + returned: success when backup was made + sample: /etc/logrotate.d/nginx.20250101_120000 +""" + +import os +import re + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.common.text.converters import to_native + + +class LogrotateConfig: + """Logrotate configuration manager.""" + + def __init__(self, module: AnsibleModule, logrotate_bin: str) -> None: + self.module = module + self.params = module.params + self.logrotate_bin = logrotate_bin + self.result: dict[str, object] = { + "changed": False, + "config_file": "", + "config_content": "", + "enabled_state": True, + } + + self.config_dir = self.params["config_dir"] + self.config_name = self.params["name"] + self.disabled_suffix = ".disabled" + + self.config_file = self.get_config_path(self.params["enabled"]) + + def get_config_path(self, enabled: bool) -> str: + """Get config file path based on enabled state.""" + base_path = os.path.join(self.config_dir, self.config_name) + if not enabled: + return base_path + self.disabled_suffix + return base_path + + def validate_parameters(self) -> None: + """Validate module parameters.""" + if self.params.get("start") is not None: + if self.params["start"] < 0 or self.params["start"] > 999: + self.module.fail_json(msg="'start' must be between 0 and 999") + + if self.params.get("size") and self.params.get("max_size"): + self.module.fail_json(msg="'size' and 'max_size' parameters are mutually exclusive") + + su_val = self.params.get("su") + if su_val is not None: + if su_val == "": + pass + else: + su_parts = su_val.split() + if len(su_parts) != 2: + self.module.fail_json(msg="'su' parameter must be in format 'user group' or empty string to remove") + + if self.params.get("shred_cycles", 1) < 1: + self.module.fail_json(msg="'shred_cycles' must be a positive integer") + + for size_param in ["size", "min_size", "max_size"]: + if self.params.get(size_param): + if not re.match(r"^\d+[kMG]?$", self.params[size_param], re.I): + self.module.fail_json( + msg=f"'{size_param}' must be in format 'number[k|M|G]' (for example '100M', '1G')" + ) + + if self.params.get("old_dir") and self.params.get("no_old_dir"): + self.module.fail_json(msg="'old_dir' and 'no_old_dir' parameters are mutually exclusive") + + copy_params = [self.params.get("copy"), self.params.get("copy_truncate"), self.params.get("rename_copy")] + if sum(1 for v in copy_params if v) > 1: + self.module.fail_json(msg="'copy', 'copy_truncate', and 'rename_copy' parameters are mutually exclusive") + + if self.params.get("delay_compress") and self.params.get("no_delay_compress"): + self.module.fail_json(msg="'delay_compress' and 'no_delay_compress' parameters are mutually exclusive") + + if self.params["state"] == "present": + existing_content = self.read_existing_config(any_state=True) + if not existing_content and not self.params.get("paths"): + self.module.fail_json(msg="'paths' parameter is required when creating a new configuration") + + def read_existing_config(self, any_state: bool = False) -> str | None: + """Read existing configuration file. + + Args: + any_state: If True, check both enabled and disabled versions. + If False, only check based on current enabled param. + """ + if any_state: + for suffix in ["", self.disabled_suffix]: + config_path = os.path.join(self.config_dir, self.config_name + suffix) + if os.path.exists(config_path): + self.result["enabled_state"] = suffix == "" + try: + with open(config_path, "r") as f: + return f.read() + except Exception as e: + self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}") + else: + config_path = self.get_config_path(self.params["enabled"]) + if os.path.exists(config_path): + try: + with open(config_path, "r") as f: + return f.read() + except Exception as e: + self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}") + + return None + + def generate_config_content(self) -> str: + """Generate logrotate configuration content.""" + if not self.params.get("paths"): + existing_content = self.read_existing_config(any_state=True) + if existing_content: + lines = existing_content.strip().split("\n") + paths = [] + for line in lines: + line = line.strip() + if ( + line + and not line.startswith("#") + and not line.startswith("{") + and not line.startswith("}") + and "/" in line + ): + if not any( + keyword in line + for keyword in [" ", "\t", "daily", "weekly", "monthly", "yearly", "rotate", "compress"] + ): + paths.append(line) + + if paths: + self.params["paths"] = paths + else: + self.params["paths"] = [] + else: + self.module.fail_json( + msg="Cannot generate configuration: no paths specified and no existing configuration found" + ) + + lines = [] + + paths = self.params["paths"] + if isinstance(paths, str): + paths = [paths] + + for path in paths: + lines.append(path) + lines.append("{") + lines.append("") + + rotation_period = self.params.get("rotation_period") + if rotation_period is not None and not self.params.get("size") and not self.params.get("max_size"): + lines.append(f" {rotation_period}") + + if self.params.get("size") is not None: + lines.append(f" size {self.params['size']}") + elif self.params.get("max_size") is not None: + lines.append(f" maxsize {self.params['max_size']}") + + if self.params.get("min_size") is not None: + lines.append(f" minsize {self.params['min_size']}") + + rotate_count = self.params.get("rotate_count") + if rotate_count is not None: + lines.append(f" rotate {rotate_count}") + + start_val = self.params.get("start") + if start_val is not None: + lines.append(f" start {start_val}") + + compress_val = self.params.get("compress") + if compress_val is not None: + if compress_val: + comp_method = self.params.get("compression_method") + if comp_method is not None and comp_method != "gzip": + lines.append(f" compresscmd /usr/bin/{comp_method}") + if comp_method == "zstd": + lines.append(f" uncompresscmd /usr/bin/{comp_method} -d") + elif comp_method == "lz4": + lines.append(f" uncompresscmd /usr/bin/{comp_method} -d") + else: + lines.append(f" uncompresscmd /usr/bin/{comp_method}un{comp_method}") + lines.append(f" compressext .{comp_method}") + lines.append(" compress") + + if self.params.get("compress_options") is not None: + lines.append(f" compressoptions {self.params['compress_options']}") + else: + lines.append(" nocompress") + + delay_compress_val = self.params.get("delay_compress") + if delay_compress_val is not None: + if delay_compress_val: + lines.append(" delaycompress") + + no_delay_compress_val = self.params.get("no_delay_compress") + if no_delay_compress_val is not None: + if no_delay_compress_val: + lines.append(" nodelaycompress") + + shred_val = self.params.get("shred") + if shred_val is not None and shred_val: + lines.append(" shred") + shred_cycles_val = self.params.get("shred_cycles") + if shred_cycles_val is not None: + lines.append(f" shredcycles {shred_cycles_val}") + + missing_ok_val = self.params.get("missing_ok") + if missing_ok_val is not None: + if not missing_ok_val: + lines.append(" nomissingok") + else: + lines.append(" missingok") + + not_if_empty_val = self.params.get("not_if_empty") + if not_if_empty_val is not None: + if not_if_empty_val: + lines.append(" notifempty") + else: + lines.append(" ifempty") + + create_val = self.params.get("create") + if create_val is not None: + lines.append(f" create {create_val}") + + copy_val = self.params.get("copy") + rename_copy_val = self.params.get("rename_copy") + copy_truncate_val = self.params.get("copy_truncate") + + if copy_val is not None and copy_val: + lines.append(" copy") + elif rename_copy_val is not None and rename_copy_val: + lines.append(" renamecopy") + elif copy_truncate_val is not None and copy_truncate_val: + lines.append(" copytruncate") + + if self.params.get("max_age") is not None: + lines.append(f" maxage {self.params['max_age']}") + + date_ext_val = self.params.get("date_ext") + if date_ext_val is not None and date_ext_val: + lines.append(" dateext") + date_yesterday_val = self.params.get("date_yesterday") + if date_yesterday_val is not None and date_yesterday_val: + lines.append(" dateyesterday") + date_format_val = self.params.get("date_format") + if date_format_val is not None: + lines.append(f" dateformat {date_format_val}") + + shared_scripts_val = self.params.get("shared_scripts") + if shared_scripts_val is not None and shared_scripts_val: + lines.append(" sharedscripts") + + su_val = self.params.get("su") + if su_val is not None: + if su_val != "": + lines.append(f" su {su_val}") + + no_old_dir_val = self.params.get("no_old_dir") + old_dir_val = self.params.get("old_dir") + + if no_old_dir_val is not None and no_old_dir_val: + lines.append(" noolddir") + elif old_dir_val is not None: + lines.append(f" olddir {old_dir_val}") + create_old_dir_val = self.params.get("create_old_dir") + if create_old_dir_val is not None and create_old_dir_val: + lines.append(" createolddir") + + if self.params.get("extension") is not None: + lines.append(f" extension {self.params['extension']}") + + mail_val = self.params.get("mail") + if mail_val is not None: + lines.append(f" mail {mail_val}") + mail_first_val = self.params.get("mail_first") + mail_last_val = self.params.get("mail_last") + if mail_first_val is not None and mail_first_val: + lines.append(" mailfirst") + elif mail_last_val is not None and mail_last_val: + lines.append(" maillast") + + if self.params.get("include") is not None: + lines.append(f" include {self.params['include']}") + + if self.params.get("taboo_ext") is not None: + taboo_ext = self.params["taboo_ext"] + if isinstance(taboo_ext, list): + taboo_ext = " ".join(taboo_ext) + if taboo_ext.strip(): + lines.append(f" tabooext {taboo_ext}") + + syslog_val = self.params.get("syslog") + if syslog_val is not None and syslog_val: + lines.append(" syslog") + + scripts = { + "prerotate": self.params.get("pre_rotate"), + "postrotate": self.params.get("post_rotate"), + "firstaction": self.params.get("first_action"), + "lastaction": self.params.get("last_action"), + "preremove": self.params.get("pre_remove"), + } + + for script_name, script_content in scripts.items(): + if script_content is not None: + lines.append(f" {script_name}") + for command in script_content: + for line in command.strip().split("\n"): + if line.strip(): + lines.append(f" {line}") + lines.append(" endscript") + lines.append("") + + lines.append("}") + + return "\n".join(lines) + + def apply(self) -> dict[str, object]: + """Apply logrotate configuration.""" + self.validate_parameters() + state = self.params["state"] + + if not os.path.exists(self.config_dir): + self.module.fail_json( + msg=f"Config directory '{self.config_dir}' does not exist. " + f"Please create it manually or ensure the correct path is specified." + ) + + if state == "absent": + for suffix in ["", self.disabled_suffix]: + config_path = os.path.join(self.config_dir, self.config_name + suffix) + if os.path.exists(config_path): + if not self.module.check_mode: + os.remove(config_path) + self.result["changed"] = True + self.result["config_file"] = config_path + break + return self.result + + existing_content = self.read_existing_config(any_state=True) + current_enabled = self.result.get("enabled_state", True) + + target_enabled = self.params.get("enabled") + if target_enabled is None: + target_enabled = current_enabled + + only_changing_enabled = ( + existing_content is not None and not self.params.get("paths") and target_enabled != current_enabled + ) + + if only_changing_enabled: + old_path = self.get_config_path(not target_enabled) + new_path = self.get_config_path(target_enabled) + + if os.path.exists(old_path) and not os.path.exists(new_path): + self.result["changed"] = True + if not self.module.check_mode: + self.module.atomic_move(old_path, new_path, unsafe_writes=False) + + self.result["config_file"] = new_path + self.result["enabled_state"] = target_enabled + + try: + with open(new_path, "r") as f: + self.result["config_content"] = f.read() + except Exception: + self.result["config_content"] = existing_content + + return self.result + + new_content = self.generate_config_content() + self.result["config_content"] = new_content + self.result["config_file"] = self.get_config_path(target_enabled) + + needs_update = existing_content is None or existing_content != new_content or target_enabled != current_enabled + + if needs_update: + if not self.module.check_mode: + for suffix in ["", self.disabled_suffix]: + old_path = os.path.join(self.config_dir, self.config_name + suffix) + if os.path.exists(old_path): + backup_path = self.module.backup_local(old_path) + if backup_path: + self.result["backup_file"] = backup_path + os.remove(old_path) + + config_file_path = str(self.result["config_file"]) + try: + with open(config_file_path, "w") as f: + f.write(new_content) + os.chmod(config_file_path, 0o644) + except Exception as e: + self.module.fail_json(msg=f"Failed to write config file {config_file_path}: {to_native(e)}") + + test_cmd = [self.logrotate_bin, "-d", config_file_path] + rc, stdout, stderr = self.module.run_command(test_cmd) + if rc != 0: + self.module.fail_json( + msg="logrotate configuration test failed", + stderr=stderr, + stdout=stdout, + config_file=config_file_path, + ) + + self.result["changed"] = needs_update + self.result["enabled_state"] = target_enabled + return self.result + + +def main() -> None: + """Main function.""" + module = AnsibleModule( + argument_spec=dict( + name=dict(type="str", required=True, aliases=["config_name"]), + state=dict(type="str", choices=["present", "absent"]), + config_dir=dict(type="path"), + paths=dict(type="list", elements="path"), + rotation_period=dict( + type="str", + choices=["daily", "weekly", "monthly", "yearly"], + ), + rotate_count=dict(type="int"), + compress=dict(type="bool"), + compress_options=dict(type="str"), + compression_method=dict( + type="str", + choices=["gzip", "bzip2", "xz", "zstd", "lzma", "lz4"], + ), + delay_compress=dict(type="bool"), + no_delay_compress=dict(type="bool"), + shred=dict(type="bool"), + shred_cycles=dict(type="int"), + missing_ok=dict(type="bool"), + not_if_empty=dict(type="bool"), + create=dict(type="str"), + copy_truncate=dict(type="bool"), + copy=dict(type="bool"), + rename_copy=dict(type="bool"), + size=dict(type="str"), + min_size=dict(type="str"), + max_size=dict(type="str"), + max_age=dict(type="int"), + date_ext=dict(type="bool"), + date_yesterday=dict(type="bool"), + date_format=dict(type="str"), + shared_scripts=dict(type="bool"), + pre_rotate=dict(type="list", elements="str"), + post_rotate=dict(type="list", elements="str"), + first_action=dict(type="list", elements="str"), + last_action=dict(type="list", elements="str"), + pre_remove=dict(type="list", elements="str"), + su=dict(type="str"), + old_dir=dict(type="path"), + create_old_dir=dict(type="bool"), + no_old_dir=dict(type="bool"), + extension=dict(type="str"), + mail=dict(type="str"), + mail_first=dict(type="bool"), + mail_last=dict(type="bool"), + include=dict(type="path"), + taboo_ext=dict(type="list", elements="str"), + enabled=dict(type="bool"), + start=dict(type="int"), + syslog=dict(type="bool"), + ), + mutually_exclusive=[ + ["delay_compress", "no_delay_compress"], + ["old_dir", "no_old_dir"], + ["size", "max_size"], + ["copy", "copy_truncate", "rename_copy"], + ], + supports_check_mode=True, + ) + + logrotate_bin = module.get_bin_path("logrotate", required=True) + + logrotate_config = LogrotateConfig(module, logrotate_bin) + result = logrotate_config.apply() + module.exit_json(**result) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/plugins/modules/test_logrotate.py b/tests/unit/plugins/modules/test_logrotate.py new file mode 100644 index 0000000000..150923d96c --- /dev/null +++ b/tests/unit/plugins/modules/test_logrotate.py @@ -0,0 +1,1039 @@ +# Copyright (c) 2026 Aleksandr Gabidullin +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import os +import shutil +import sys +import tempfile +import unittest +from unittest.mock import Mock, mock_open, patch + + +class TestLogrotateConfig(unittest.TestCase): + """Unit tests for the logrotate_config module.""" + + @classmethod + def setUpClass(cls): + """Set up test environment.""" + cls.test_dir = tempfile.mkdtemp() + cls.mock_ansible_basic = Mock() + cls.mock_ansible_basic.AnsibleModule = Mock() + cls.mock_converters = Mock() + cls.mock_converters.to_native = str + cls.patcher_basic = patch.dict( + "sys.modules", + { + "ansible.module_utils.basic": cls.mock_ansible_basic, + "ansible.module_utils.common.text.converters": cls.mock_converters, + }, + ) + cls.patcher_basic.start() + + @classmethod + def tearDownClass(cls): + """Clean up after all tests.""" + cls.patcher_basic.stop() + if os.path.exists(cls.test_dir): + shutil.rmtree(cls.test_dir) + + def setUp(self): + """Set up test fixtures.""" + self.mock_ansible_basic.AnsibleModule.reset_mock() + self.mock_module = Mock() + self.mock_module.params = {} + self.mock_module.fail_json = Mock(side_effect=Exception("fail_json called")) + self.mock_module.exit_json = Mock() + self.mock_module.check_mode = False + self.mock_module.get_bin_path = Mock(return_value="/usr/sbin/logrotate") + self.mock_module.atomic_move = Mock() + self.mock_module.warn = Mock() + self.mock_module.run_command = Mock(return_value=(0, "", "")) + self.mock_ansible_basic.AnsibleModule.return_value = self.mock_module + self.config_dir = os.path.join(self.test_dir, "logrotate.d") + os.makedirs(self.config_dir, exist_ok=True) + for module_name in list(sys.modules.keys()): + if "logrotate" in module_name or "ansible_collections.community.general.plugins.modules" in module_name: + del sys.modules[module_name] + + def tearDown(self): + """Clean up after test.""" + pass + + def _setup_module_params(self, **params): + """Helper to set up module parameters.""" + default_params = { + "name": "test", + "state": "present", + "config_dir": self.config_dir, + "paths": ["/var/log/test/*.log"], + "rotate_count": 7, + "compress": True, + "compression_method": "gzip", + "delay_compress": False, + "missing_ok": True, + "not_if_empty": True, + "copy_truncate": False, + "date_ext": False, + "date_format": "-%Y%m%d", + "shared_scripts": False, + "enabled": True, + } + default_params.update(params) + self.mock_module.params = default_params + + def test_create_new_configuration(self): + """Test creating a new logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params() + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()) as mock_file: + with patch("os.chmod") as mock_chmod: + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("config_file", result) + self.assertIn("config_content", result) + self.assertEqual(result["enabled_state"], True) + mock_file.assert_called_once() + mock_chmod.assert_called_once() + + def test_update_existing_configuration(self): + """Test updating an existing logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(rotate_count=14) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/test/*.log { + daily + rotate 7 + compress + delaycompress + missingok + notifempty +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove") as mock_remove: + with patch("os.chmod") as mock_chmod: + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("14", result["config_content"]) + mock_remove.assert_called() + mock_chmod.assert_called_once() + + def test_remove_configuration(self): + """Test removing a logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(state="absent") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + return path in (config_path, config_path + ".disabled") + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("os.remove") as mock_remove: + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertTrue(mock_remove.called) + + def test_disable_configuration(self): + """Test disabling a logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(enabled=False) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/test/*.log { + daily + rotate 7 +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove"): + with patch("os.chmod"): + mock_file_write = mock_open() + with patch("builtins.open", mock_file_write): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertEqual(result["enabled_state"], False) + self.assertTrue(result["config_file"].endswith(".disabled")) + + def test_enable_configuration(self): + """Test enabling a disabled logrotate configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(enabled=True) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/test/*.log { + daily + rotate 7 +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path + ".disabled": + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove"): + with patch("os.chmod"): + self.mock_module.atomic_move = Mock() + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertEqual(result["enabled_state"], True) + self.assertFalse(result["config_file"].endswith(".disabled")) + + def test_validation_missing_paths(self): + """Test validation when paths are missing for new configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(paths=None) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_size_and_max_size_exclusive(self): + """Test validation when both size and max_size are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(size="100M", max_size="200M") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_check_mode(self): + """Test that no changes are made in check mode.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params() + self.mock_module.check_mode = True + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + + def test_generate_config_with_scripts(self): + """Test generating configuration with pre/post scripts.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params( + pre_rotate=["echo 'Pre-rotation'"], + post_rotate=["systemctl reload test", "logger 'Rotation done'"], + first_action=["echo 'First action'"], + last_action=["echo 'Last action'"], + ) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("prerotate", content) + self.assertIn("postrotate", content) + self.assertIn("firstaction", content) + self.assertIn("lastaction", content) + self.assertIn("systemctl reload test", content) + self.assertIn("echo 'Pre-rotation'", content) + + def test_compression_methods(self): + """Test different compression methods.""" + from ansible_collections.community.general.plugins.modules import logrotate + + compression_methods = ["gzip", "bzip2", "xz", "zstd", "lzma", "lz4"] + for method in compression_methods: + with self.subTest(method=method): + self._setup_module_params(compression_method=method) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + if method != "gzip": + self.assertIn(f"compresscmd /usr/bin/{method}", content) + if method == "zstd" or method == "lz4": + self.assertIn(f"uncompresscmd /usr/bin/{method} -d", content) + else: + uncompress_cmd = f"{method}un{method}" + self.assertIn(f"uncompresscmd /usr/bin/{uncompress_cmd}", content) + + def test_size_based_rotation(self): + """Test size-based rotation configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(size="100M", rotation_period="daily") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("size 100M", content) + self.assertNotIn("daily", content) + + def test_logrotate_not_installed(self): + """Test error when logrotate is not installed.""" + from ansible_collections.community.general.plugins.modules import logrotate + + mock_module_for_test = Mock() + mock_module_for_test.params = { + "name": "test", + "state": "present", + "config_dir": self.config_dir, + "paths": ["/var/log/test/*.log"], + "rotate_count": 7, + "compress": True, + "compression_method": "gzip", + "delay_compress": False, + "missing_ok": True, + "not_if_empty": True, + "copy_truncate": False, + "date_ext": False, + "date_format": "-%Y%m%d", + "shared_scripts": False, + "enabled": True, + } + mock_module_for_test.fail_json = Mock(side_effect=Exception("fail_json called")) + mock_module_for_test.exit_json = Mock() + mock_module_for_test.check_mode = False + + def get_bin_path_side_effect(name, required=False): + if name == "logrotate" and required: + mock_module_for_test.fail_json(msg=f"Failed to find required executable '{name}' in PATH") + return None + + mock_module_for_test.get_bin_path = Mock(side_effect=get_bin_path_side_effect) + mock_module_for_test.atomic_move = Mock() + mock_module_for_test.warn = Mock() + mock_module_for_test.run_command = Mock(return_value=(0, "", "")) + + with patch( + "ansible_collections.community.general.plugins.modules.logrotate.AnsibleModule", + return_value=mock_module_for_test, + ): + with self.assertRaises(Exception) as context: + logrotate.main() + + self.assertIn("fail_json called", str(context.exception)) + + mock_module_for_test.get_bin_path.assert_called_once_with("logrotate", required=True) + mock_module_for_test.fail_json.assert_called_once() + + def test_parse_existing_config_paths(self): + """Test parsing paths from existing configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(paths=None) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/app1/*.log +{ + daily + rotate 7 + compress +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + mock_file_read = mock_open(read_data=existing_content) + with patch("builtins.open", mock_file_read): + with patch("os.remove"): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("/var/log/app1/*.log", result["config_content"]) + + def test_no_delay_compress_parameter(self): + """Test no_delay_compress parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(no_delay_compress=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("nodelaycompress", content) + self.assertTrue(result["changed"]) + + def test_shred_and_shred_cycles_parameters(self): + """Test shred and shred_cycles parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(shred=True, shred_cycles=3) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("shred", content) + self.assertIn("shredcycles 3", content) + self.assertTrue(result["changed"]) + + def test_copy_parameter(self): + """Test copy parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(copy=True, copy_truncate=False) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("copy", content) + self.assertNotIn("copytruncate", content) + self.assertTrue(result["changed"]) + + def test_rename_copy_parameter(self): + """Test rename_copy parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(rename_copy=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("renamecopy", content) + self.assertTrue(result["changed"]) + + def test_min_size_parameter(self): + """Test min_size parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(min_size="100k") + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("minsize 100k", content) + self.assertTrue(result["changed"]) + + def test_date_yesterday_parameter(self): + """Test date_yesterday parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(date_ext=True, date_yesterday=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("dateext", content) + self.assertIn("dateyesterday", content) + self.assertTrue(result["changed"]) + + def test_create_old_dir_parameter(self): + """Test create_old_dir parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(old_dir="/var/log/archives", create_old_dir=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("olddir /var/log/archives", content) + self.assertIn("createolddir", content) + self.assertTrue(result["changed"]) + + def test_start_parameter(self): + """Test start parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(start=1) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("start 1", content) + self.assertTrue(result["changed"]) + + def test_syslog_parameter(self): + """Test syslog parameter.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(syslog=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("syslog", content) + self.assertTrue(result["changed"]) + + def test_validation_copy_and_copy_truncate_exclusive(self): + """Test validation when both copy and copy_truncate are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(copy=True, copy_truncate=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_copy_and_rename_copy_exclusive(self): + """Test validation when both copy and rename_copy are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(copy=True, rename_copy=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_shred_cycles_positive(self): + """Test validation when shred_cycles is not positive.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(shred_cycles=0) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_start_non_negative(self): + """Test validation when start is negative.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(start=-1) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_validation_old_dir_and_no_old_dir_exclusive(self): + """Test validation when both old_dir and no_old_dir are specified.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(old_dir="/var/log/archives", no_old_dir=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + with self.assertRaises(Exception) as context: + config.apply() + self.assertIn("fail_json called", str(context.exception)) + + def test_all_new_parameters_together(self): + """Test all new parameters together in one configuration.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params( + no_delay_compress=True, + shred=True, + shred_cycles=3, + copy=True, + min_size="100k", + date_ext=True, + date_yesterday=True, + old_dir="/var/log/archives", + create_old_dir=True, + start=1, + syslog=True, + rename_copy=False, + copy_truncate=False, + delay_compress=False, + ) + + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertTrue(result["changed"]) + + self.assertIn("nodelaycompress", content) + self.assertIn("shred", content) + self.assertIn("shredcycles 3", content) + self.assertIn("copy", content) + self.assertIn("minsize 100k", content) + self.assertIn("dateext", content) + self.assertIn("dateyesterday", content) + self.assertIn("olddir /var/log/archives", content) + self.assertIn("createolddir", content) + self.assertIn("start 1", content) + self.assertIn("syslog", content) + + lines = [line.strip() for line in content.split("\n")] + self.assertNotIn("copytruncate", lines) + self.assertNotIn("renamecopy", lines) + self.assertNotIn("delaycompress", lines) + + def test_parameter_interactions(self): + """Test interactions between related parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(old_dir="/var/log/archives", no_old_dir=True) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + self._setup_module_params(copy=True, rename_copy=True) + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + self._setup_module_params(copy=True, copy_truncate=True) + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + def test_size_format_validation(self): + """Test validation of size format parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + valid_sizes = ["100k", "100M", "1G", "10", "500K", "2M", "3G"] + + for size in valid_sizes: + with self.subTest(valid_size=size): + self._setup_module_params(size=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + result = config.apply() + self.assertIn(f"size {size}", result["config_content"]) + + invalid_sizes = ["100kb", "M100", "1.5G", "abc", "100 MB"] + + for size in invalid_sizes: + with self.subTest(invalid_size=size): + self._setup_module_params(size=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + def test_max_size_format_validation(self): + """Test validation of max_size format parameters.""" + from ansible_collections.community.general.plugins.modules import logrotate + + valid_sizes = ["100k", "100M", "1G", "10", "500K", "2M", "3G"] + + for size in valid_sizes: + with self.subTest(valid_size=size): + self._setup_module_params(max_size=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + result = config.apply() + self.assertIn(f"maxsize {size}", result["config_content"]) + + invalid_sizes = ["100kb", "M100", "1.5G", "abc", "100 MB"] + + for size in invalid_sizes: + with self.subTest(invalid_size=size): + self._setup_module_params(max_size=size) + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path, current_config_path=config_path): + if path == self.config_dir: + return True + elif path == current_config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + + with self.assertRaises(Exception) as context: + config.apply() + + self.assertIn("fail_json called", str(context.exception)) + + def test_logrotate_bin_used_in_apply(self): + """Test that logrotate binary path is used in apply method.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params() + + test_logrotate_path = "/usr/local/sbin/logrotate" + self.mock_module.get_bin_path.return_value = test_logrotate_path + + config_path = os.path.join(self.config_dir, "test") + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return False + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open()): + with patch("os.chmod"): + with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): + config = logrotate.LogrotateConfig(self.mock_module, test_logrotate_path) + config.apply() + + self.mock_module.run_command.assert_called_once() + call_args = self.mock_module.run_command.call_args[0][0] + self.assertEqual(call_args[0], test_logrotate_path) + + +if __name__ == "__main__": + unittest.main()