# Copyright (c) Ansible project # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations import unittest from unittest import mock import pytest from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import ( AnsibleExitJson, AnsibleFailJson, ) from ansible_collections.community.general.plugins.modules import monit TEST_OUTPUT = """ %s '%s' status %s monitoring status Not monitored monitoring mode active """ class MonitTest(unittest.TestCase): def setUp(self): self.module = mock.MagicMock() self.module.exit_json.side_effect = AnsibleExitJson self.module.fail_json.side_effect = AnsibleFailJson self.monit = monit.Monit(self.module, "monit", "processX", 1) self.monit._status_change_retry_count = 1 mock_sleep = mock.patch("time.sleep") mock_sleep.start() self.addCleanup(mock_sleep.stop) def patch_status(self, side_effect): if not isinstance(side_effect, list): side_effect = [side_effect] return mock.patch.object(self.monit, "get_status", side_effect=side_effect) def test_change_state_success(self): with self.patch_status([monit.Status("OK"), monit.Status("NOT_MONITORED")]): with self.assertRaises(AnsibleExitJson): self.monit.stop() self.module.fail_json.assert_not_called() self.module.run_command.assert_called_with(["monit", "stop", "processX"], check_rc=True) def test_change_state_fail(self): with self.patch_status([monit.Status("OK")] * 3): with self.assertRaises(AnsibleFailJson): self.monit.stop() def test_reload_fail(self): self.module.run_command.return_value = (1, "stdout", "stderr") with self.assertRaises(AnsibleFailJson): self.monit.reload() def test_reload(self): self.module.run_command.return_value = (0, "", "") with self.patch_status(monit.Status("OK")): with self.assertRaises(AnsibleExitJson): self.monit.reload() def test_wait_for_status_to_stop_pending(self): status = [ monit.Status("MISSING"), monit.Status("DOES_NOT_EXIST"), monit.Status("INITIALIZING"), monit.Status("OK").pending(), monit.Status("OK"), ] with self.patch_status(status) as get_status: self.monit.wait_for_monit_to_stop_pending() self.assertEqual(get_status.call_count, len(status)) def test_wait_for_status_change(self): with self.patch_status([monit.Status("NOT_MONITORED"), monit.Status("OK")]) as get_status: self.monit.wait_for_status_change(monit.Status("NOT_MONITORED")) self.assertEqual(get_status.call_count, 2) def test_wait_for_status_change_fail(self): with self.patch_status([monit.Status("OK")] * 3): with self.assertRaises(AnsibleFailJson): self.monit.wait_for_status_change(monit.Status("OK")) def test_monitor(self): with self.patch_status([monit.Status("NOT_MONITORED"), monit.Status("OK").pending(), monit.Status("OK")]): with self.assertRaises(AnsibleExitJson): self.monit.monitor() def test_monitor_fail(self): with self.patch_status([monit.Status("NOT_MONITORED")] * 3): with self.assertRaises(AnsibleFailJson): self.monit.monitor() def test_timeout(self): self.monit.timeout = 0 with self.patch_status(monit.Status("NOT_MONITORED").pending()): with self.assertRaises(AnsibleFailJson): self.monit.wait_for_monit_to_stop_pending() BASIC_OUTPUT_CASES = [ (TEST_OUTPUT % ("Process", "processX", member.value), monit.Status(member.name)) for member in monit.StatusValue ] @pytest.mark.parametrize( "output, expected", BASIC_OUTPUT_CASES + [ ("", monit.Status("MISSING")), (TEST_OUTPUT % ("Process", "processY", "OK"), monit.Status("MISSING")), (TEST_OUTPUT % ("Process", "processX", "Not Monitored - start pending"), monit.Status("OK", is_pending=True)), ( TEST_OUTPUT % ("Process", "processX", "Monitored - stop pending"), monit.Status("NOT_MONITORED", is_pending=True), ), (TEST_OUTPUT % ("Process", "processX", "Monitored - restart pending"), monit.Status("OK", is_pending=True)), (TEST_OUTPUT % ("Process", "processX", "Not Monitored - monitor pending"), monit.Status("OK", is_pending=True)), (TEST_OUTPUT % ("Process", "processX", "Does not exist"), monit.Status("DOES_NOT_EXIST")), (TEST_OUTPUT % ("Process", "processX", "Not monitored"), monit.Status("NOT_MONITORED")), (TEST_OUTPUT % ("Process", "processX", "Running"), monit.Status("OK")), (TEST_OUTPUT % ("Process", "processX", "Execution failed | Does not exist"), monit.Status("EXECUTION_FAILED")), (TEST_OUTPUT % ("Process", "processX", "Some Unknown Status"), monit.Status("EXECUTION_FAILED")), ], ) def test_parse_status(output, expected): module = mock.MagicMock() status = monit.Monit(module, "", "processX", 0)._parse_status(output, "") assert status == expected @pytest.mark.parametrize( "output, expected", BASIC_OUTPUT_CASES + [ (TEST_OUTPUT % ("Process", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("File", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("Fifo", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("Filesystem", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("Directory", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("Remote host", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("System", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("Program", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("Network", "processX", "OK"), monit.Status("OK")), (TEST_OUTPUT % ("Unsupported", "processX", "OK"), monit.Status("MISSING")), ], ) def test_parse_status_supports_all_services(output, expected): status = monit.Monit(None, "", "processX", 0)._parse_status(output, "") assert status == expected @pytest.mark.parametrize( "output, expected", [ ("This is monit version 5.18.1", "5.18.1"), ("This is monit version 12.18", "12.18"), ("This is monit version 5.1.12", "5.1.12"), ], ) def test_parse_version(output, expected): module = mock.MagicMock() module.run_command.return_value = (0, output, "") raw_version, version_tuple = monit.Monit(module, "", "processX", 0)._get_monit_version() assert raw_version == expected