1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-03-22 05:09:12 +00:00

Merge branch 'main' into ghsecretslist

This commit is contained in:
Thomas Sjögren 2026-03-15 16:58:32 +01:00 committed by GitHub
commit b4c79ffb53
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1821 additions and 157 deletions

2
.github/BOTMETA.yml vendored
View file

@ -642,6 +642,8 @@ files:
maintainers: adrianmoisey
$modules/github_repo.py:
maintainers: atorrescogollo
$modules/github_secrets.py:
maintainers: konstruktoid
$modules/github_secrets_info.py:
maintainers: konstruktoid
$modules/gitlab_:

View file

@ -0,0 +1,2 @@
bugfixes:
- ipa_dnsrecord - fix idempotency bug when using ``dnsttl`` due to wrong Python types (https://github.com/ansible-collections/community.general/pull/11559).

View file

@ -0,0 +1,3 @@
minor_changes:
- nsupdate - replace ``list(map(...))`` constructs with Python comprehensions (https://github.com/ansible-collections/community.general/pull/11590).
- timezone - replace ``list(map(...))`` constructs with Python comprehensions (https://github.com/ansible-collections/community.general/pull/11590).

View file

@ -0,0 +1,401 @@
#!/usr/bin/python
# Copyright (c) Ansible project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
DOCUMENTATION = r"""
module: github_secrets
short_description: Manage GitHub repository or organization secrets
description:
- Create, update, or delete secrets in a GitHub repository or organization.
author:
- Thomas Sjögren (@konstruktoid)
version_added: '12.5.0'
requirements:
- pynacl
extends_documentation_fragment:
- community.general.attributes
attributes:
check_mode:
support: full
diff_mode:
support: none
options:
organization:
description:
- The GitHub username or organization name.
type: str
required: true
aliases: ["org", "username"]
repository:
description:
- The name of the repository.
- If not provided, the secret will be managed at the organization level.
type: str
aliases: ["repo"]
key:
description:
- The name of the secret.
type: str
value:
description:
- The value of the secret. Required when O(state=present).
type: str
visibility:
description:
- The visibility of the secret when set at the organization level.
- Required when O(state=present) and O(repository) is not set.
type: str
choices: ["all", "private", "selected"]
state:
description:
- The desired state of the secret.
type: str
choices: ["present", "absent"]
default: "present"
api_url:
description:
- The base URL for the GitHub API.
type: str
default: "https://api.github.com"
token:
description:
- The GitHub token used for authentication.
type: str
required: true
"""
EXAMPLES = r"""
- name: Add Github secret
community.general.github_secrets:
token: "{{ lookup('ansible.builtin.env', 'GITHUB_TOKEN') }}"
repository: "ansible"
organization: "ansible"
key: "TEST_SECRET"
value: "bob"
state: "present"
- name: Delete Github secret
community.general.github_secrets:
token: "{{ lookup('ansible.builtin.env', 'GITHUB_TOKEN') }}"
repository: "ansible"
organization: "ansible"
key: "TEST_SECRET"
state: "absent"
"""
RETURN = r"""
result:
description: The result of the module.
type: dict
returned: always
sample: {
"msg": "OK (2 bytes)",
"response": "Secret created",
"status": 201
}
"""
import json
from http import HTTPStatus
from typing import Any
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.urls import fetch_url
from ansible_collections.community.general.plugins.module_utils import deps
with deps.declare(
"pynacl",
reason="pynacl is a required dependency",
url="https://pypi.org/project/PyNaCl/",
):
from nacl import encoding, public
def get_public_key(
module: AnsibleModule,
api_url: str,
headers: dict[str, str],
organization: str,
repository: str,
) -> tuple[str, str]:
"""Retrieve the GitHub Actions public key used to encrypt secrets."""
if repository:
url = f"{api_url}/repos/{organization}/{repository}/actions/secrets/public-key"
else:
url = f"{api_url}/orgs/{organization}/actions/secrets/public-key"
resp, info = fetch_url(module, url, headers=headers)
if info["status"] != HTTPStatus.OK:
module.fail_json(msg=f"Failed to get public key: {info}")
data = json.loads(resp.read())
return data["key_id"], data["key"]
def encrypt_secret(public_key: str, secret_value: str) -> str:
"""Encrypt a secret value using GitHub's public key."""
key = public.PublicKey(
public_key.encode("utf-8"),
encoding.Base64Encoder(), # type: ignore [arg-type]
)
sealed_box = public.SealedBox(key)
encrypted = sealed_box.encrypt(secret_value.encode("utf-8"))
return encoding.Base64Encoder.encode(encrypted).decode("utf-8")
def check_secret(
module: AnsibleModule,
api_url: str,
headers: dict[str, str],
organization: str,
repository: str,
key: str,
) -> dict[str, int]:
url = (
f"{api_url}/repos/{organization}/{repository}/actions/secrets/{key}"
if repository
else f"{api_url}/orgs/{organization}/actions/secrets/{key}"
)
resp, info = fetch_url(module, url, headers=headers)
if info["status"] in (HTTPStatus.OK, HTTPStatus.NOT_FOUND):
return {"status": info["status"]}
else:
module.fail_json(msg=f"Failed to check secret: {info}")
def upsert_secret(
module: AnsibleModule,
api_url: str,
headers: dict[str, str],
organization: str,
repository: str,
key: str,
encrypted_value: str,
key_id: str,
) -> dict[str, Any]:
"""Create or update a GitHub Actions secret."""
url = (
f"{api_url}/repos/{organization}/{repository}/actions/secrets/{key}"
if repository
else f"{api_url}/orgs/{organization}/actions/secrets/{key}"
)
payload = {
"encrypted_value": encrypted_value,
"key_id": key_id,
}
if not repository and module.params.get("visibility"):
payload["visibility"] = module.params["visibility"]
if module.check_mode:
secret_present = check_secret(module, api_url, headers, organization, repository, key)
if secret_present["status"] == HTTPStatus.NOT_FOUND:
check_mode_msg = "OK (2 bytes)"
check_mode_status = HTTPStatus.CREATED.value
info = {
"msg": check_mode_msg,
"status": check_mode_status,
}
else:
check_mode_msg = "OK (unknown bytes)"
check_mode_status = HTTPStatus.NO_CONTENT
info = {
"msg": check_mode_msg,
"status": check_mode_status,
}
else:
resp, info = fetch_url(
module,
url,
headers=headers,
data=json.dumps(payload).encode("utf-8"),
method="PUT",
)
if info["status"] not in (HTTPStatus.CREATED, HTTPStatus.NO_CONTENT):
module.fail_json(msg=f"Failed to upsert secret: {info}")
return info
def delete_secret(
module: AnsibleModule,
api_url: str,
headers: dict[str, str],
organization: str,
repository: str,
key: str,
) -> dict[str, Any]:
"""Delete a GitHub Actions secret."""
url = (
f"{api_url}/repos/{organization}/{repository}/actions/secrets/{key}"
if repository
else f"{api_url}/orgs/{organization}/actions/secrets/{key}"
)
if module.check_mode:
secret_present = check_secret(module, api_url, headers, organization, repository, key)
info = {
"msg": (
"HTTP Error 404: Not Found"
if secret_present["status"] == HTTPStatus.NOT_FOUND
else "OK (unknown bytes)"
),
"status": (
HTTPStatus.NO_CONTENT if secret_present["status"] == HTTPStatus.OK else secret_present["status"]
),
}
else:
resp, info = fetch_url(
module,
url,
headers=headers,
method="DELETE",
)
if info["status"] not in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND):
module.fail_json(msg=f"Failed to delete secret: {info}")
return info
def main() -> None:
"""Ansible module entry point."""
argument_spec = {
"organization": {
"type": "str",
"aliases": ["org", "username"],
"required": True,
},
"repository": {"type": "str", "aliases": ["repo"]},
"key": {"type": "str", "no_log": False},
"value": {"type": "str", "no_log": True},
"visibility": {"type": "str", "choices": ["all", "private", "selected"]},
"state": {
"type": "str",
"choices": ["present", "absent"],
"default": "present",
},
"api_url": {"type": "str", "default": "https://api.github.com"},
"token": {"type": "str", "required": True, "no_log": True},
}
module = AnsibleModule(
argument_spec=argument_spec,
required_if=[("state", "present", ["value"])],
required_by={"value": "key"},
supports_check_mode=True,
)
deps.validate(module)
organization: str = module.params["organization"]
repository: str = module.params["repository"]
key: str = module.params["key"]
value: str = module.params["value"]
visibility: str = module.params.get("visibility")
state: str = module.params["state"]
api_url: str = module.params["api_url"]
token: str = module.params["token"]
if state == "present" and value is None:
module.fail_json(
msg="Invalid parameters",
details="The 'value' parameter cannot be empty",
params=module.params,
)
if state == "present" and not repository and not visibility:
module.fail_json(
msg="Invalid parameters",
details="When 'state' is 'present' and 'repository' is not set, 'visibility' must be provided",
params=module.params,
)
result: dict[str, Any] = {}
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
if state == "present":
key_id, public_key = get_public_key(
module,
api_url,
headers,
organization,
repository,
)
encrypted_value = encrypt_secret(public_key, value)
upsert = upsert_secret(
module,
api_url,
headers,
organization,
repository,
key,
encrypted_value,
key_id,
)
response_msg = "Secret created" if upsert["status"] == HTTPStatus.CREATED else "Secret updated"
result["changed"] = True
result.update(
result={
"status": upsert["status"],
"msg": upsert.get("msg"),
"response": response_msg,
},
)
if state == "absent":
delete = delete_secret(
module,
api_url,
headers,
organization,
repository,
key,
)
if delete["status"] == HTTPStatus.NO_CONTENT:
result["changed"] = True
result.update(
result={
"status": delete["status"],
"msg": delete.get("msg"),
"response": "Secret deleted",
},
)
if delete["status"] == HTTPStatus.NOT_FOUND:
result["changed"] = False
result.update(
result={
"status": delete["status"],
"msg": delete.get("msg"),
"response": "Secret not found",
},
)
module.exit_json(**result)
if __name__ == "__main__":
main()

View file

@ -206,9 +206,12 @@ class DNSRecordIPAClient(IPAClient):
def dnsrecord_find(self, zone_name, record_name):
if record_name == "@":
return self._post_json(method="dnsrecord_show", name=zone_name, item={"idnsname": record_name, "all": True})
method = "dnsrecord_show"
else:
return self._post_json(method="dnsrecord_find", name=zone_name, item={"idnsname": record_name, "all": True})
method = "dnsrecord_find"
result = self._post_json(method=method, name=zone_name, item={"idnsname": record_name, "all": True})
result["dnsttl"] = [int(v) for v in result["dnsttl"]]
return result
def dnsrecord_add(self, zone_name=None, record_name=None, details=None):
item = dict(idnsname=record_name)

View file

@ -268,7 +268,7 @@ class RecordManager:
self.fqdn = module.params["record"]
if self.module.params["type"].lower() == "txt" and self.module.params["value"] is not None:
self.value = list(map(self.txt_helper, self.module.params["value"]))
self.value = [self.txt_helper(x) for x in self.module.params["value"]]
else:
self.value = self.module.params["value"]

View file

@ -660,7 +660,7 @@ class DarwinTimezone(Timezone):
# Lookup the list of supported timezones via `systemsetup -listtimezones`.
# Note: Skip the first line that contains the label 'Time Zones:'
out = self.execute(self.systemsetup, "-listtimezones").splitlines()[1:]
tz_list = list(map(lambda x: x.strip(), out))
tz_list = [x.strip() for x in out]
if tz not in tz_list:
self.abort(f'given timezone "{tz}" is not available')
return tz

View file

@ -138,10 +138,6 @@ def get_option(option):
return None
def serialize_groups(groups):
return list(map(str, groups))
@pytest.fixture(scope="module")
def inventory():
r = InventoryModule()

View file

@ -0,0 +1,360 @@
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
import json
from unittest.mock import MagicMock, patch
import pytest
from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import (
AnsibleExitJson,
AnsibleFailJson,
exit_json,
fail_json,
set_module_args,
)
from ansible_collections.community.general.plugins.modules import github_secrets
PUBLIC_KEY_PAYLOAD = {
"key_id": "100",
"key": "j6gSA9mu5bO8ig+YBNU6oXRhGwd3Z4EFiS9rVmU9gwo=",
}
def make_fetch_url_response(body, status=200):
response = MagicMock()
response.read.return_value = json.dumps(body).encode("utf-8")
info = {"status": status, "msg": f"OK ({len(json.dumps(body))} bytes)"}
return (response, info)
@pytest.fixture(autouse=True)
def patch_module():
with patch.multiple(
"ansible.module_utils.basic.AnsibleModule",
exit_json=exit_json,
fail_json=fail_json,
):
yield
@pytest.fixture
def fetch_url_mock():
with patch.object(github_secrets, "fetch_url") as mock:
yield mock
def test_encrypt_secret():
result = github_secrets.encrypt_secret(PUBLIC_KEY_PAYLOAD["key"], "ansible")
assert isinstance(result, str)
assert result != "ansible"
assert len(result) > 70
def test_fail_without_parameters():
with pytest.raises(AnsibleFailJson):
with set_module_args({}):
github_secrets.main()
def test_fail_present_without_value():
with pytest.raises(AnsibleFailJson) as exc:
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": None,
"state": "present",
"token": "ghp_test_token",
}
):
github_secrets.main()
assert "value' parameter cannot be empty" in exc.value.args[0]["details"]
def test_fail_org_secret_present_without_visibility():
with pytest.raises(AnsibleFailJson) as exc:
with set_module_args(
{
"organization": "myorg",
"key": "ORG_SECRET",
"value": "org_value",
"state": "present",
"token": "ghp_test_token",
}
):
github_secrets.main()
assert "'visibility' must be provided" in exc.value.args[0]["details"]
def test_create_repo_secret(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=201),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": "secret_value",
"state": "present",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is True
assert result["result"]["status"] == 201
assert result["result"]["response"] == "Secret created"
def test_update_repo_secret(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=204),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": "new_value",
"state": "present",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is True
assert result["result"]["response"] == "Secret updated"
def test_create_repo_secret_check_mode(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=404),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": "new_value",
"state": "present",
"token": "ghp_test_token",
"_ansible_check_mode": True,
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is True
assert result["result"]["status"] == 201
assert result["result"]["response"] == "Secret created"
def test_update_repo_secret_check_mode(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=200),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": "new_value",
"state": "present",
"token": "ghp_test_token",
"_ansible_check_mode": True,
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is True
assert result["result"]["status"] == 204
assert result["result"]["response"] == "Secret updated"
def test_delete_repo_secret_check_mode(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=200),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"state": "absent",
"token": "ghp_test_token",
"_ansible_check_mode": True,
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is True
assert result["result"]["status"] == 204
assert result["result"]["response"] == "Secret deleted"
def test_update_empty_repo_secret(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=204),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": "",
"state": "present",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is True
assert result["result"]["response"] == "Secret updated"
def test_update_missing_value_repo_secret(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=204),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"state": "present",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleFailJson) as exc:
github_secrets.main()
assert "the following are missing: value" in exc.value.args[0]["msg"]
def test_delete_repo_secret(fetch_url_mock):
fetch_url_mock.return_value = make_fetch_url_response({}, status=204)
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"state": "absent",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is True
assert result["result"]["status"] == 204
assert result["result"]["response"] == "Secret deleted"
def test_delete_dne_repo_secret(fetch_url_mock):
fetch_url_mock.return_value = make_fetch_url_response({}, status=404)
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "DOES_NOT_EXIST",
"state": "absent",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleExitJson) as exc:
github_secrets.main()
result = exc.value.args[0]
assert result["changed"] is False
assert result["result"]["status"] == 404
assert result["result"]["response"] == "Secret not found"
def test_fail_get_public_key(fetch_url_mock):
fetch_url_mock.return_value = make_fetch_url_response({}, status=403)
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": "secret_value",
"state": "present",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleFailJson) as exc:
github_secrets.main()
assert "Failed to get public key" in exc.value.args[0]["msg"]
def test_fail_upsert_secret(fetch_url_mock):
fetch_url_mock.side_effect = [
make_fetch_url_response(PUBLIC_KEY_PAYLOAD),
make_fetch_url_response({}, status=422),
]
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"value": "secret_value",
"state": "present",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleFailJson) as exc:
github_secrets.main()
assert "Failed to upsert secret" in exc.value.args[0]["msg"]
def test_fail_delete_secret(fetch_url_mock):
fetch_url_mock.return_value = make_fetch_url_response({}, status=503)
with set_module_args(
{
"organization": "myorg",
"repository": "myrepo",
"key": "MY_SECRET",
"state": "absent",
"token": "ghp_test_token",
}
):
with pytest.raises(AnsibleFailJson) as exc:
github_secrets.main()
assert "Failed to delete secret" in exc.value.args[0]["msg"]

View file

@ -7,7 +7,6 @@ from __future__ import annotations
import unittest
from unittest import mock
import pytest
from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import (
AnsibleExitJson,
AnsibleFailJson,
@ -15,12 +14,9 @@ from ansible_collections.community.internal_test_tools.tests.unit.plugins.module
from ansible_collections.community.general.plugins.modules import monit
TEST_OUTPUT = """
%s '%s'
status %s
monitoring status Not monitored
monitoring mode active
"""
from .uthelper import RunCommandMock, UTHelper
UTHelper.from_module(monit, __name__, mocks=[RunCommandMock])
class MonitTest(unittest.TestCase):
@ -39,29 +35,11 @@ class MonitTest(unittest.TestCase):
side_effect = [side_effect]
return mock.patch.object(self.monit, "get_status", side_effect=side_effect)
def test_change_state_success(self):
with self.patch_status([monit.Status("OK"), monit.Status("NOT_MONITORED")]):
with self.assertRaises(AnsibleExitJson):
self.monit.stop()
self.module.fail_json.assert_not_called()
self.module.run_command.assert_called_with(["monit", "stop", "processX"], check_rc=True)
def test_change_state_fail(self):
with self.patch_status([monit.Status("OK")] * 3):
with self.assertRaises(AnsibleFailJson):
self.monit.stop()
def test_reload_fail(self):
self.module.run_command.return_value = (1, "stdout", "stderr")
with self.assertRaises(AnsibleFailJson):
self.monit.reload()
def test_reload(self):
self.module.run_command.return_value = (0, "", "")
with self.patch_status(monit.Status("OK")):
with self.assertRaises(AnsibleExitJson):
self.monit.reload()
def test_wait_for_status_to_stop_pending(self):
status = [
monit.Status("MISSING"),
@ -74,11 +52,6 @@ class MonitTest(unittest.TestCase):
self.monit.wait_for_monit_to_stop_pending()
self.assertEqual(get_status.call_count, len(status))
def test_wait_for_status_change(self):
with self.patch_status([monit.Status("NOT_MONITORED"), monit.Status("OK")]) as get_status:
self.monit.wait_for_status_change(monit.Status("NOT_MONITORED"))
self.assertEqual(get_status.call_count, 2)
def test_wait_for_status_change_fail(self):
with self.patch_status([monit.Status("OK")] * 3):
with self.assertRaises(AnsibleFailJson):
@ -93,76 +66,3 @@ class MonitTest(unittest.TestCase):
with self.patch_status([monit.Status("NOT_MONITORED")] * 3):
with self.assertRaises(AnsibleFailJson):
self.monit.monitor()
def test_timeout(self):
self.monit.timeout = 0
with self.patch_status(monit.Status("NOT_MONITORED").pending()):
with self.assertRaises(AnsibleFailJson):
self.monit.wait_for_monit_to_stop_pending()
BASIC_OUTPUT_CASES = [
(TEST_OUTPUT % ("Process", "processX", member.value), monit.Status(member.name)) for member in monit.StatusValue
]
@pytest.mark.parametrize(
"output, expected",
BASIC_OUTPUT_CASES
+ [
("", monit.Status("MISSING")),
(TEST_OUTPUT % ("Process", "processY", "OK"), monit.Status("MISSING")),
(TEST_OUTPUT % ("Process", "processX", "Not Monitored - start pending"), monit.Status("OK", is_pending=True)),
(
TEST_OUTPUT % ("Process", "processX", "Monitored - stop pending"),
monit.Status("NOT_MONITORED", is_pending=True),
),
(TEST_OUTPUT % ("Process", "processX", "Monitored - restart pending"), monit.Status("OK", is_pending=True)),
(TEST_OUTPUT % ("Process", "processX", "Not Monitored - monitor pending"), monit.Status("OK", is_pending=True)),
(TEST_OUTPUT % ("Process", "processX", "Does not exist"), monit.Status("DOES_NOT_EXIST")),
(TEST_OUTPUT % ("Process", "processX", "Not monitored"), monit.Status("NOT_MONITORED")),
(TEST_OUTPUT % ("Process", "processX", "Running"), monit.Status("OK")),
(TEST_OUTPUT % ("Process", "processX", "Execution failed | Does not exist"), monit.Status("EXECUTION_FAILED")),
(TEST_OUTPUT % ("Process", "processX", "Some Unknown Status"), monit.Status("EXECUTION_FAILED")),
],
)
def test_parse_status(output, expected):
module = mock.MagicMock()
status = monit.Monit(module, "", "processX", 0)._parse_status(output, "")
assert status == expected
@pytest.mark.parametrize(
"output, expected",
BASIC_OUTPUT_CASES
+ [
(TEST_OUTPUT % ("Process", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("File", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Fifo", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Filesystem", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Directory", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Remote host", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("System", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Program", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Network", "processX", "OK"), monit.Status("OK")),
(TEST_OUTPUT % ("Unsupported", "processX", "OK"), monit.Status("MISSING")),
],
)
def test_parse_status_supports_all_services(output, expected):
status = monit.Monit(None, "", "processX", 0)._parse_status(output, "")
assert status == expected
@pytest.mark.parametrize(
"output, expected",
[
("This is monit version 5.18.1", "5.18.1"),
("This is monit version 12.18", "12.18"),
("This is monit version 5.1.12", "5.1.12"),
],
)
def test_parse_version(output, expected):
module = mock.MagicMock()
module.run_command.return_value = (0, output, "")
raw_version, version_tuple = monit.Monit(module, "", "processX", 0)._get_monit_version()
assert raw_version == expected

View file

@ -0,0 +1,996 @@
# Copyright (c) Ansible project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
---
anchors:
# version commands (version > 5.18 → adds -B flag to status/summary commands)
version_526: &version_526
command: [/testbin/monit, -V]
environ: {check_rc: true}
rc: 0
out: "This is monit version 5.26.0\n"
err: ''
version_518: &version_518
command: [/testbin/monit, -V]
environ: {check_rc: true}
rc: 0
out: "This is monit version 5.18.0\n"
err: ''
version_1218: &version_1218
command: [/testbin/monit, -V]
environ: {check_rc: true}
rc: 0
out: "This is monit version 12.18\n"
err: ''
version_5181: &version_5181
command: [/testbin/monit, -V]
environ: {check_rc: true}
rc: 0
out: "This is monit version 5.18.1\n"
err: ''
version_5112: &version_5112
command: [/testbin/monit, -V]
environ: {check_rc: true}
rc: 0
out: "This is monit version 5.1.12\n"
err: ''
# summary commands (presence check)
summary_present_526: &summary_present_526
command: [/testbin/monit, summary, -B]
environ: {check_rc: true}
rc: 0
out: "processX\n"
err: ''
summary_not_present_526: &summary_not_present_526
command: [/testbin/monit, summary, -B]
environ: {check_rc: true}
rc: 0
out: "other_process\n"
err: ''
summary_present_518: &summary_present_518
command: [/testbin/monit, summary]
environ: {check_rc: true}
rc: 0
out: "processX\n"
err: ''
# status commands with -B (version > 5.18)
status_ok_526: &status_ok_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status OK\n"
err: ''
status_not_monitored_526: &status_not_monitored_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Not monitored\n"
err: ''
# status commands without -B (version <= 5.18)
status_ok_no_b: &status_ok_no_b
command: [/testbin/monit, status, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status OK\n"
err: ''
status_not_monitored_no_b: &status_not_monitored_no_b
command: [/testbin/monit, status, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Not monitored\n"
err: ''
# status text variants for parse_status coverage (with -B)
status_ok_raw_526: &status_ok_raw_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status ok\n"
err: ''
status_running_526: &status_running_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Running\n"
err: ''
status_not_monitored_raw_526: &status_not_monitored_raw_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status not_monitored\n"
err: ''
status_missing_raw_526: &status_missing_raw_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status missing\n"
err: ''
status_initializing_526: &status_initializing_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status initializing\n"
err: ''
status_does_not_exist_raw_526: &status_does_not_exist_raw_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status does_not_exist\n"
err: ''
status_does_not_exist_text_526: &status_does_not_exist_text_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Does not exist\n"
err: ''
status_exec_failed_raw_526: &status_exec_failed_raw_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status execution_failed\n"
err: ''
status_exec_failed_pipe_526: &status_exec_failed_pipe_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Execution failed | Does not exist\n"
err: ''
status_unknown_526: &status_unknown_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Some Unknown Status\n"
err: ''
status_start_pending_526: &status_start_pending_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Not Monitored - start pending\n"
err: ''
status_stop_pending_526: &status_stop_pending_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Monitored - stop pending\n"
err: ''
status_restart_pending_526: &status_restart_pending_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Monitored - restart pending\n"
err: ''
status_monitor_pending_526: &status_monitor_pending_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'processX'\n status Not Monitored - monitor pending\n"
err: ''
status_wrong_name_526: &status_wrong_name_526
command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Process 'otherProcess'\n status OK\n"
err: ''
test_cases:
# ------------------------------------------------------------------ no-change cases
- id: state_present_already_present
input:
name: processX
state: present
output:
changed: false
name: processX
state: present
mocks:
run_command:
- *version_526
- *summary_present_526
- id: state_present_already_present_old_monit
input:
name: processX
state: present
output:
changed: false
name: processX
state: present
mocks:
run_command:
- *version_518
- *summary_present_518
- id: state_started_already_running
# main(): is_process_present → wait_for_pending (get_status) → is_process_running (get_status)
# running=True, state in [started, monitored] → exit changed=False
input:
name: processX
state: started
output:
changed: false
name: processX
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_ok_526 # wait_for_monit_to_stop_pending
- *status_ok_526 # is_process_running
- id: state_monitored_already_running
input:
name: processX
state: monitored
output:
changed: false
name: processX
state: monitored
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_ok_526
- *status_ok_526
# ------------------------------------------------------------------ state changes
- id: state_stopped_currently_running
# change_state: get_status → stop → wait_for_status_change (status changes on 1st check)
input:
name: processX
state: stopped
output:
changed: true
name: processX
monit_version: "5.26.0"
state: stopped
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_ok_526 # wait_for_monit_to_stop_pending
- *status_ok_526 # is_process_running (running=True)
- *status_ok_526 # change_state: current_status = get_status()
- command: [/testbin/monit, stop, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_not_monitored_526 # wait_for_status_change: status changed → done
- id: state_started_not_running
# process is not running → monit.start() → status changes to OK on 1st check
input:
name: processX
state: started
output:
changed: true
name: processX
monit_version: "5.26.0"
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_not_monitored_526 # wait_for_monit_to_stop_pending
- *status_not_monitored_526 # is_process_running (running=False)
- *status_not_monitored_526 # change_state: current_status = get_status()
- command: [/testbin/monit, start, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526 # wait_for_status_change: status changed → done
- id: state_unmonitored_currently_running
input:
name: processX
state: unmonitored
output:
changed: true
name: processX
monit_version: "5.26.0"
state: unmonitored
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_ok_526 # wait_for_monit_to_stop_pending
- *status_ok_526 # is_process_running (running=True)
- *status_ok_526 # change_state: current_status = get_status()
- command: [/testbin/monit, unmonitor, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_not_monitored_526 # wait_for_status_change: status changed → done
- id: state_monitored_not_running
# process not running → monit.monitor() → status changes to OK (invert_expected=True → success)
input:
name: processX
state: monitored
output:
changed: true
name: processX
monit_version: "5.26.0"
state: monitored
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_not_monitored_526 # wait_for_monit_to_stop_pending
- *status_not_monitored_526 # is_process_running (running=False)
- *status_not_monitored_526 # change_state: current_status = get_status()
- command: [/testbin/monit, monitor, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526 # wait_for_status_change: changed → done; match inverted → success
- id: state_restarted
# restart works regardless of running state; process not running → restart → becomes OK
input:
name: processX
state: restarted
output:
changed: true
name: processX
monit_version: "5.26.0"
state: restarted
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_not_monitored_526 # wait_for_monit_to_stop_pending
- *status_not_monitored_526 # is_process_running (running=False)
- *status_not_monitored_526 # change_state: current_status = get_status()
- command: [/testbin/monit, restart, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526 # wait_for_status_change: changed → done; OK == OK → success
# ------------------------------------------------------------------ reload
- id: state_reloaded
# reload exits before any presence/status checks; no version check either
input:
name: processX
state: reloaded
output:
changed: true
state: reloaded
mocks:
run_command:
- command: [/testbin/monit, reload]
environ: {}
rc: 0
out: ''
err: ''
# ------------------------------------------------------------------ present (not yet)
- id: state_present_not_yet_present
# not present in summary → Monit.present(): reload + summary check (present on 1st try)
input:
name: processX
state: present
output:
changed: true
name: processX
monit_version: "5.26.0"
state: present
mocks:
run_command:
- *version_526
- *summary_not_present_526 # initial presence check → not present
- command: [/testbin/monit, reload, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *summary_present_526 # while loop: present on first check → exit
# ------------------------------------------------------------------ failure cases
- id: state_reloaded_fail
# reload returns rc=1 → fail_json("monit reload failed")
input:
name: processX
state: reloaded
output:
failed: true
mocks:
run_command:
- command: [/testbin/monit, reload]
environ: {}
rc: 1
out: "stdout"
err: "stderr"
- id: state_started_pending_timeout
# timeout=0 + status=MISSING (empty output) → wait_for_monit_to_stop_pending times out
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- id: state_stopped_status_change_on_retry
# same as state_stopped_currently_running but status is unchanged on the first check
# in wait_for_status_change; changes only on the second check (one retry loop)
input:
name: processX
state: stopped
output:
changed: true
name: processX
monit_version: "5.26.0"
state: stopped
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_ok_526 # wait_for_monit_to_stop_pending
- *status_ok_526 # is_process_running (running=True)
- *status_ok_526 # change_state: current_status = get_status()
- command: [/testbin/monit, stop, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526 # wait_for_status_change: initial (same → enters loop)
- *status_not_monitored_526 # loop_count=1, validate=False → changed!
# ------------------------------------------------------------------ check mode
- id: state_stopped_check_mode
# check mode: presence + pending + running checks happen, then exit_if_check_mode fires
flags:
check: true
input:
name: processX
state: stopped
output:
changed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_ok_526 # wait_for_monit_to_stop_pending
- *status_ok_526 # is_process_running (running=True) → exit_if_check_mode
# ------------------------------------------------------------------ parse_status coverage
# Each case targets a specific status string → parsed Status → verifiable module behavior.
# "ok", "running" → OK → state=started, already running → changed=false (4 commands)
# "not_monitored_raw" → NOT_MONITORED → start → changed=true (7 commands)
# in waiting_status (missing, initializing, does_not_exist) → timeout=0 → fail (3 commands)
# EXECUTION_FAILED → not running → special fast path in wait_for_status_change (7 commands)
# pending → is_pending=True → timeout=0 → fail (3 commands)
- id: parse_status_ok_raw
# "ok" (raw enum value, lowercase) → Status("OK") → running → no change
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_ok_raw_526
- *status_ok_raw_526
- id: parse_status_running
# "Running" → Status("OK") via the RUNNING→OK alias → running → no change
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_running_526
- *status_running_526
- id: parse_status_not_monitored_raw
# "not_monitored" (raw, underscore) → Status("NOT_MONITORED") → not running → start
input:
name: processX
state: started
output:
changed: true
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_not_monitored_raw_526 # wait_for_monit_to_stop_pending
- *status_not_monitored_raw_526 # is_process_running (False)
- *status_not_monitored_raw_526 # change_state: current_status
- command: [/testbin/monit, start, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526
- id: parse_status_missing_raw
# "missing" (raw enum value) → Status("MISSING") → in waiting_status → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_missing_raw_526
- id: parse_status_wrong_name
# process name not found in output → Status("MISSING") → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_wrong_name_526
- id: parse_status_initializing
# "initializing" → Status("INITIALIZING") → in waiting_status → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_initializing_526
- id: parse_status_does_not_exist_raw
# "does_not_exist" (raw) → Status("DOES_NOT_EXIST") → in waiting_status → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_does_not_exist_raw_526
- id: parse_status_does_not_exist_text
# "Does not exist" → DOES_NOT_EXIST (via space→underscore normalisation) → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_does_not_exist_text_526
- id: parse_status_exec_failed_raw
# "execution_failed" → EXECUTION_FAILED → not running, but wait_for_status_change
# returns immediately (special case: current_status.value == EXECUTION_FAILED)
input:
name: processX
state: started
output:
changed: true
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_exec_failed_raw_526 # wait_for_monit_to_stop_pending (not in waiting_status)
- *status_exec_failed_raw_526 # is_process_running (False)
- *status_exec_failed_raw_526 # change_state: current_status
- command: [/testbin/monit, start, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526 # wait_for_status_change returns immediately (EXECUTION_FAILED)
- id: parse_status_exec_failed_pipe
# "Execution failed | Does not exist" → EXECUTION_FAILED (regex stops before |)
input:
name: processX
state: started
output:
changed: true
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_exec_failed_pipe_526
- *status_exec_failed_pipe_526
- *status_exec_failed_pipe_526
- command: [/testbin/monit, start, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526
- id: parse_status_unknown
# "Some Unknown Status" → EXECUTION_FAILED (via module.warn + fallback)
input:
name: processX
state: started
output:
changed: true
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_unknown_526
- *status_unknown_526
- *status_unknown_526
- command: [/testbin/monit, start, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_ok_526
- id: parse_status_start_pending
# "Not Monitored - start pending" → Status("OK", is_pending=True) → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_start_pending_526
- id: parse_status_stop_pending
# "Monitored - stop pending" → Status("NOT_MONITORED", is_pending=True) → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_stop_pending_526
- id: parse_status_restart_pending
# "Monitored - restart pending" → Status("OK", is_pending=True) → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_restart_pending_526
- id: parse_status_monitor_pending
# "Not Monitored - monitor pending" → Status("OK", is_pending=True) → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- *status_monitor_pending_526
# ------------------------------------------------------------------ parse_service_types coverage
# Each recognised service type: process name found → status OK → running → changed=false
# "Unsupported" is not in MONIT_SERVICES → MISSING → timeout
- id: parse_service_file
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "File 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "File 'processX'\n status OK\n"
err: ''
- id: parse_service_fifo
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Fifo 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Fifo 'processX'\n status OK\n"
err: ''
- id: parse_service_filesystem
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Filesystem 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Filesystem 'processX'\n status OK\n"
err: ''
- id: parse_service_directory
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Directory 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Directory 'processX'\n status OK\n"
err: ''
- id: parse_service_remote_host
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Remote host 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Remote host 'processX'\n status OK\n"
err: ''
- id: parse_service_system
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "System 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "System 'processX'\n status OK\n"
err: ''
- id: parse_service_program
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Program 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Program 'processX'\n status OK\n"
err: ''
- id: parse_service_network
input:
name: processX
state: started
output:
changed: false
state: started
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Network 'processX'\n status OK\n"
err: ''
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Network 'processX'\n status OK\n"
err: ''
- id: parse_service_unsupported
# "Unsupported" not in MONIT_SERVICES → _parse_status returns MISSING → timeout
input:
name: processX
state: started
timeout: 0
output:
failed: true
mocks:
run_command:
- *version_526
- *summary_present_526
- command: [/testbin/monit, status, -B, processX]
environ: {check_rc: true}
rc: 0
out: "Unsupported 'processX'\n status OK\n"
err: ''
# ------------------------------------------------------------------ parse_version coverage
# state=stopped (running process) exercises exit_success() which includes monit_version.
# Commands with/without -B reveal whether the version tuple was parsed correctly.
- id: parse_version_1218
# "12.18" → (12, 18) > (5, 18) → -B added; monit_version="12.18" in output
input:
name: processX
state: stopped
output:
changed: true
monit_version: "12.18"
state: stopped
mocks:
run_command:
- *version_1218
- *summary_present_526 # -B because (12,18) > (5,18)
- *status_ok_526
- *status_ok_526
- *status_ok_526
- command: [/testbin/monit, stop, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_not_monitored_526
- id: parse_version_5181
# "5.18.1" → (5, 18) not > (5, 18) → no -B; monit_version="5.18.1" in output
input:
name: processX
state: stopped
output:
changed: true
monit_version: "5.18.1"
state: stopped
mocks:
run_command:
- *version_5181
- *summary_present_518 # no -B because (5,18) == (5,18)
- *status_ok_no_b
- *status_ok_no_b
- *status_ok_no_b
- command: [/testbin/monit, stop, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_not_monitored_no_b
- id: parse_version_5112
# "5.1.12" → (5, 1) not > (5, 18) → no -B; monit_version="5.1.12" in output
input:
name: processX
state: stopped
output:
changed: true
monit_version: "5.1.12"
state: stopped
mocks:
run_command:
- *version_5112
- *summary_present_518 # no -B because (5,1) < (5,18)
- *status_ok_no_b
- *status_ok_no_b
- *status_ok_no_b
- command: [/testbin/monit, stop, processX]
environ: {check_rc: true}
rc: 0
out: ''
err: ''
- *status_not_monitored_no_b

View file

@ -2505,7 +2505,7 @@ def test_create_bridge(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"ipv4.addresses",
"10.10.10.10/24",
@ -2542,7 +2542,7 @@ def test_mod_bridge(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"ipv4.addresses",
"10.10.10.10/24",
@ -2596,7 +2596,7 @@ def test_create_bridge_slave(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["bridge-port.path-cost", "100"]:
assert param in args_text
@ -2624,7 +2624,7 @@ def test_mod_bridge_slave(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["bridge-port.path-cost", "100"]:
assert param in args_text
@ -2843,7 +2843,7 @@ def test_create_vlan_con(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["ipv4.addresses", "10.10.10.10/24", "ipv4.gateway", "10.10.10.1", "vlan.id", "10"]:
assert param in args_text
@ -2871,7 +2871,7 @@ def test_mod_vlan_conn(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["ipv4.addresses", "10.10.10.10/24", "ipv4.gateway", "10.10.10.1", "vlan.id", "10"]:
assert param in args_text
@ -2915,7 +2915,7 @@ def test_create_vxlan(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"connection.interface-name",
"vxlan-existent_nw_device",
@ -2951,7 +2951,7 @@ def test_vxlan_mod(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["vxlan.local", "192.168.225.5", "vxlan.remote", "192.168.225.6", "vxlan.id", "11"]:
assert param in args_text
@ -2995,7 +2995,7 @@ def test_create_vxlan_multicast(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "vxlan_multicast_test"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"connection.interface-name",
"vxlan-device",
@ -3040,7 +3040,7 @@ def test_create_ipip(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"connection.interface-name",
"ipip-existent_nw_device",
@ -3078,7 +3078,7 @@ def test_ipip_mod(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["ip-tunnel.local", "192.168.225.5", "ip-tunnel.remote", "192.168.225.6"]:
assert param in args_text
@ -3122,7 +3122,7 @@ def test_create_sit(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"connection.interface-name",
"sit-existent_nw_device",
@ -3160,7 +3160,7 @@ def test_sit_mod(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["ip-tunnel.local", "192.168.225.5", "ip-tunnel.remote", "192.168.225.6"]:
assert param in args_text
@ -3224,7 +3224,7 @@ def test_create_gre(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"connection.interface-name",
"gre-existent_nw_device",
@ -3266,7 +3266,7 @@ def test_gre_mod(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["ip-tunnel.local", "192.168.225.5", "ip-tunnel.remote", "192.168.225.6"]:
assert param in args_text
@ -3351,7 +3351,7 @@ def test_create_ethernet_static(mocked_generic_connection_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"ethernet_non_existant",
@ -3410,7 +3410,7 @@ def test_ethernet_connection_static_ipv4_address_static_route_with_metric_modify
assert add_args[0][2] == "modify"
assert add_args[0][3] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in ["ipv4.routes", "192.168.200.0/24 192.168.1.1", "ipv4.route-metric", "10"]:
assert param in add_args_text
@ -3444,7 +3444,7 @@ def test_ethernet_connection_static_ipv4_address_static_route_with_metric_clear(
assert add_args[0][2] == "modify"
assert add_args[0][3] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in ["ipv4.routes", ""]:
assert param in add_args_text
@ -3483,7 +3483,7 @@ def test_ethernet_connection_static_ipv6_address_static_route_create(
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
@ -3525,7 +3525,7 @@ def test_ethernet_connection_static_ipv6_address_static_route_metric_modify(
assert add_args[0][2] == "modify"
assert add_args[0][3] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in ["ipv6.routes", "fd2e:446f:d85d:5::/64 2001:beef:cafe:10::2", "ipv6.route-metric", "10"]:
assert param in add_args_text
@ -3561,7 +3561,7 @@ def test_ethernet_connection_static_ipv6_address_multiple_static_routes_with_met
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
@ -3603,7 +3603,7 @@ def test_ethernet_connection_sriov_vfs_create(mocked_ethernet_connection_with_sr
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
@ -3647,7 +3647,7 @@ def test_ethernet_connection_static_ipv6_address_static_route_with_metric_create
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
@ -3697,7 +3697,7 @@ def test_ethernet_connection_static_ipv6_address_static_route_create_2(
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
@ -3753,7 +3753,7 @@ def test_create_wireless(mocked_wireless_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"wireless_non_existant",
@ -3806,7 +3806,7 @@ def test_create_secure_wireless(mocked_secure_wireless_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"wireless_non_existant",
@ -3867,7 +3867,7 @@ def test_create_secure_wireless_failure(mocked_secure_wireless_create_failure, c
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"wireless_non_existant",
@ -3921,7 +3921,7 @@ def test_modify_secure_wireless(mocked_secure_wireless_modify, capfd):
assert add_args[0][2] == "modify"
assert add_args[0][3] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"wireless_non_existant",
@ -3986,7 +3986,7 @@ def test_modify_secure_wireless_failure(mocked_secure_wireless_modify_failure, c
assert add_args[0][2] == "modify"
assert add_args[0][3] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"wireless_non_existant",
@ -4026,7 +4026,7 @@ def test_create_dummy_static(mocked_generic_connection_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"dummy_non_existant",
@ -4099,7 +4099,7 @@ def test_dummy_connection_static_with_custom_mtu_modify(mocked_dummy_connection_
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["802-3-ethernet.mtu", "0"]:
assert param in args_text
@ -4129,7 +4129,7 @@ def test_create_gsm(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in [
"connection.interface-name",
"gsm_non_existant",
@ -4167,7 +4167,7 @@ def test_gsm_mod(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["gsm.username", "t-mobile", "gsm.password", "tm"]:
assert param in args_text
@ -4214,7 +4214,7 @@ def test_create_ethernet_with_multiple_ip4_addresses_static(mocked_generic_conne
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"ethernet_non_existant",
@ -4262,7 +4262,7 @@ def test_create_ethernet_with_multiple_ip6_addresses_static(mocked_generic_conne
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"ethernet_non_existant",
@ -4374,7 +4374,7 @@ def test_create_ethernet_addr_gen_mode_and_ip6_privacy_static(mocked_generic_con
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"ethernet_non_existant",
@ -4442,7 +4442,7 @@ def test_create_wireguard(mocked_generic_connection_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"wg_non_existant",
@ -4498,7 +4498,7 @@ def test_wireguard_mod(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["wireguard.listen-port", "51820"]:
assert param in args_text
@ -4557,7 +4557,7 @@ def test_create_vpn_l2tp(mocked_generic_connection_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "vpn_l2tp"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.autoconnect",
@ -4608,7 +4608,7 @@ def test_create_vpn_pptp(mocked_generic_connection_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "vpn_pptp"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.autoconnect",
@ -4666,7 +4666,7 @@ def test_infiniband_connection_static_transport_mode_connected(
assert add_args[0][2] == "modify"
assert add_args[0][3] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in ["infiniband.transport-mode", "connected"]:
assert param in add_args_text
@ -4721,7 +4721,7 @@ def test_create_macvlan(mocked_generic_connection_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "non_existent_nw_device"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in [
"connection.interface-name",
"macvlan_non_existant",
@ -4777,7 +4777,7 @@ def test_macvlan_mod(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["macvlan.mode", "2"]:
assert param in args_text
@ -5115,7 +5115,7 @@ def test_create_loopback(mocked_generic_connection_create, capfd):
assert add_args[0][5] == "con-name"
assert add_args[0][6] == "lo"
add_args_text = list(map(to_text, add_args[0]))
add_args_text = [to_text(x) for x in add_args[0]]
for param in ["connection.interface-name", "lo", "ipv4.addresses", "127.0.0.1/8"]:
assert param in add_args_text
@ -5186,7 +5186,7 @@ def test_create_vrf_con(mocked_generic_connection_create, capfd):
assert args[0][5] == "con-name"
assert args[0][6] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["ipv4.addresses", "10.10.10.10/24", "ipv4.gateway", "10.10.10.1", "table", "10"]:
assert param in args_text
@ -5214,7 +5214,7 @@ def test_mod_vrf_conn(mocked_generic_connection_modify, capfd):
assert args[0][2] == "modify"
assert args[0][3] == "non_existent_nw_device"
args_text = list(map(to_text, args[0]))
args_text = [to_text(x) for x in args[0]]
for param in ["ipv4.addresses", "10.10.10.10/24", "ipv4.gateway", "10.10.10.1", "table", "10"]:
assert param in args_text

View file

@ -19,6 +19,7 @@ linode_api4 # APIv4
python-gitlab
PyGithub
httmock
pynacl
# requirement for maven_artifact module
lxml