1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-03-22 05:09:12 +00:00

[PR #11592/2d685e7a backport][stable-12] test(monit): use uthelper (#11593)

test(monit): use uthelper (#11592)

(cherry picked from commit 2d685e7a85)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2026-03-14 22:34:22 +01:00 committed by GitHub
parent 99ebbbdf49
commit 86616b1559
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 999 additions and 103 deletions

View file

@ -7,7 +7,6 @@ from __future__ import annotations
import unittest
from unittest import mock
import pytest
from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import (
AnsibleExitJson,
AnsibleFailJson,
@ -15,12 +14,9 @@ from ansible_collections.community.internal_test_tools.tests.unit.plugins.module
from ansible_collections.community.general.plugins.modules import monit
TEST_OUTPUT = """
%s '%s'
status %s
monitoring status Not monitored
monitoring mode active
"""
from .uthelper import RunCommandMock, UTHelper
UTHelper.from_module(monit, __name__, mocks=[RunCommandMock])
class MonitTest(unittest.TestCase):
@ -39,29 +35,11 @@ class MonitTest(unittest.TestCase):
side_effect = [side_effect]
return mock.patch.object(self.monit, "get_status", side_effect=side_effect)
def test_change_state_success(self):
with self.patch_status([monit.Status("OK"), monit.Status("NOT_MONITORED")]):
with self.assertRaises(AnsibleExitJson):
self.monit.stop()
self.module.fail_json.assert_not_called()
self.module.run_command.assert_called_with(["monit", "stop", "processX"], check_rc=True)
def test_change_state_fail(self):
with self.patch_status([monit.Status("OK")] * 3):
with self.assertRaises(AnsibleFailJson):
self.monit.stop()
def test_reload_fail(self):
self.module.run_command.return_value = (1, "stdout", "stderr")
with self.assertRaises(AnsibleFailJson):
self.monit.reload()
def test_reload(self):
self.module.run_command.return_value = (0, "", "")
with self.patch_status(monit.Status("OK")):
with self.assertRaises(AnsibleExitJson):
self.monit.reload()
def test_wait_for_status_to_stop_pending(self):
status = [
monit.Status("MISSING"),
@ -74,11 +52,6 @@ class MonitTest(unittest.TestCase):
self.monit.wait_for_monit_to_stop_pending()
self.assertEqual(get_status.call_count, len(status))
def test_wait_for_status_change(self):
with self.patch_status([monit.Status("NOT_MONITORED"), monit.Status("OK")]) as get_status:
self.monit.wait_for_status_change(monit.Status("NOT_MONITORED"))
self.assertEqual(get_status.call_count, 2)
def test_wait_for_status_change_fail(self):
with self.patch_status([monit.Status("OK")] * 3):
with self.assertRaises(AnsibleFailJson):
@ -93,76 +66,3 @@ class MonitTest(unittest.TestCase):
with self.patch_status([monit.Status("NOT_MONITORED")] * 3):
with self.assertRaises(AnsibleFailJson):
self.monit.monitor()
def test_timeout(self):
self.monit.timeout = 0
with self.patch_status(monit.Status("NOT_MONITORED").pending()):
with self.assertRaises(AnsibleFailJson):
self.monit.wait_for_monit_to_stop_pending()
BASIC_OUTPUT_CASES = [
(TEST_OUTPUT % ("Process", "processX", member.value), monit.Status(member.name)) for member in monit.StatusValue
]
@pytest.mark.parametrize(
"output, expected",
BASIC_OUTPUT_CASES
+ [
("", monit.Status("MISSING")),
(TEST_OUTPUT % ("Process", "processY", "OK"), monit.Status("MISSING")),
(TEST_OUTPUT % ("Process", "processX", "Not Monitored - start pending"), monit.Status("OK", is_pending=True)),
(
TEST_OUTPUT % ("Process", "processX", "Monitored - stop pending"),
monit.Status("NOT_MONITORED", is_pending=True),
),
(TEST_OUTPUT % ("Process", "processX", "Monitored - restart pending"), monit.Status("OK", is_pending=True)),
(TEST_OUTPUT % ("Process", "processX", "Not Monitored - monitor pending"), monit.Status("OK", is_pending=True)),
(TEST_OUTPUT % ("Process", "processX", "Does not exist"), monit.Status("DOES_NOT_EXIST")),
(TEST_OUTPUT % ("Process", "processX", "Not monitored"), monit.Status("NOT_MONITORED")),
(TEST_OUTPUT % ("Process", "processX", "Running"), monit.Status("OK")),
(TEST_OUTPUT % ("Process", "processX", "Execution failed | Does not exist"), monit.Status("EXECUTION_FAILED")),
(TEST_OUTPUT % ("Process", "processX", "Some Unknown Status"), monit.Status("EXECUTION_FAILED")),
],
)
def test_parse_status(output, expected):
module = mock.MagicMock()
status = monit.Monit(module, "", "processX", 0)._parse_status(output, "")
assert status == expected
@pytest.mark.parametrize(
"output, expected",
BASIC_OUTPUT_CASES
+ [
(TEST_OUTPUT % ("Process", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("File", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Fifo", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Filesystem", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Directory", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Remote host", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("System", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Program", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Network", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Unsupported", "processX", "OK"), monit.Status("MISSING")),
],
)
def test_parse_status_supports_all_services(output, expected):
status = monit.Monit(None, "", "processX", 0)._parse_status(output, "")
assert status == expected
@pytest.mark.parametrize(
"output, expected",
[
("This is monit version 5.18.1", "5.18.1"),
("This is monit version 12.18", "12.18"),
("This is monit version 5.1.12", "5.1.12"),
],
)
def test_parse_version(output, expected):
module = mock.MagicMock()
module.run_command.return_value = (0, output, "")
raw_version, version_tuple = monit.Monit(module, "", "processX", 0)._get_monit_version()
assert raw_version == expected