diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index b3873a1c42..c4e8bf2e60 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -642,6 +642,8 @@ files: maintainers: adrianmoisey $modules/github_repo.py: maintainers: atorrescogollo + $modules/github_secrets.py: + maintainers: konstruktoid $modules/gitlab_: keywords: gitlab source_control maintainers: $team_gitlab diff --git a/plugins/modules/github_secrets.py b/plugins/modules/github_secrets.py new file mode 100644 index 0000000000..7390d37fa9 --- /dev/null +++ b/plugins/modules/github_secrets.py @@ -0,0 +1,401 @@ +#!/usr/bin/python + +# Copyright (c) Ansible project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +DOCUMENTATION = r""" +module: github_secrets +short_description: Manage GitHub repository or organization secrets +description: + - Create, update, or delete secrets in a GitHub repository or organization. +author: + - Thomas Sjögren (@konstruktoid) +version_added: '12.5.0' +requirements: + - pynacl +extends_documentation_fragment: + - community.general.attributes +attributes: + check_mode: + support: full + diff_mode: + support: none +options: + organization: + description: + - The GitHub username or organization name. + type: str + required: true + aliases: ["org", "username"] + repository: + description: + - The name of the repository. + - If not provided, the secret will be managed at the organization level. + type: str + aliases: ["repo"] + key: + description: + - The name of the secret. + type: str + value: + description: + - The value of the secret. Required when O(state=present). + type: str + visibility: + description: + - The visibility of the secret when set at the organization level. + - Required when O(state=present) and O(repository) is not set. + type: str + choices: ["all", "private", "selected"] + state: + description: + - The desired state of the secret. + type: str + choices: ["present", "absent"] + default: "present" + api_url: + description: + - The base URL for the GitHub API. + type: str + default: "https://api.github.com" + token: + description: + - The GitHub token used for authentication. + type: str + required: true +""" + +EXAMPLES = r""" +- name: Add Github secret + community.general.github_secrets: + token: "{{ lookup('ansible.builtin.env', 'GITHUB_TOKEN') }}" + repository: "ansible" + organization: "ansible" + key: "TEST_SECRET" + value: "bob" + state: "present" + +- name: Delete Github secret + community.general.github_secrets: + token: "{{ lookup('ansible.builtin.env', 'GITHUB_TOKEN') }}" + repository: "ansible" + organization: "ansible" + key: "TEST_SECRET" + state: "absent" +""" + +RETURN = r""" +result: + description: The result of the module. + type: dict + returned: always + sample: { + "msg": "OK (2 bytes)", + "response": "Secret created", + "status": 201 + } +""" + +import json +from http import HTTPStatus +from typing import Any + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.urls import fetch_url + +from ansible_collections.community.general.plugins.module_utils import deps + +with deps.declare( + "pynacl", + reason="pynacl is a required dependency", + url="https://pypi.org/project/PyNaCl/", +): + from nacl import encoding, public + + +def get_public_key( + module: AnsibleModule, + api_url: str, + headers: dict[str, str], + organization: str, + repository: str, +) -> tuple[str, str]: + """Retrieve the GitHub Actions public key used to encrypt secrets.""" + if repository: + url = f"{api_url}/repos/{organization}/{repository}/actions/secrets/public-key" + else: + url = f"{api_url}/orgs/{organization}/actions/secrets/public-key" + + resp, info = fetch_url(module, url, headers=headers) + + if info["status"] != HTTPStatus.OK: + module.fail_json(msg=f"Failed to get public key: {info}") + + data = json.loads(resp.read()) + return data["key_id"], data["key"] + + +def encrypt_secret(public_key: str, secret_value: str) -> str: + """Encrypt a secret value using GitHub's public key.""" + key = public.PublicKey( + public_key.encode("utf-8"), + encoding.Base64Encoder(), # type: ignore [arg-type] + ) + sealed_box = public.SealedBox(key) + encrypted = sealed_box.encrypt(secret_value.encode("utf-8")) + return encoding.Base64Encoder.encode(encrypted).decode("utf-8") + + +def check_secret( + module: AnsibleModule, + api_url: str, + headers: dict[str, str], + organization: str, + repository: str, + key: str, +) -> dict[str, int]: + url = ( + f"{api_url}/repos/{organization}/{repository}/actions/secrets/{key}" + if repository + else f"{api_url}/orgs/{organization}/actions/secrets/{key}" + ) + + resp, info = fetch_url(module, url, headers=headers) + + if info["status"] in (HTTPStatus.OK, HTTPStatus.NOT_FOUND): + return {"status": info["status"]} + else: + module.fail_json(msg=f"Failed to check secret: {info}") + + +def upsert_secret( + module: AnsibleModule, + api_url: str, + headers: dict[str, str], + organization: str, + repository: str, + key: str, + encrypted_value: str, + key_id: str, +) -> dict[str, Any]: + """Create or update a GitHub Actions secret.""" + url = ( + f"{api_url}/repos/{organization}/{repository}/actions/secrets/{key}" + if repository + else f"{api_url}/orgs/{organization}/actions/secrets/{key}" + ) + + payload = { + "encrypted_value": encrypted_value, + "key_id": key_id, + } + + if not repository and module.params.get("visibility"): + payload["visibility"] = module.params["visibility"] + + if module.check_mode: + secret_present = check_secret(module, api_url, headers, organization, repository, key) + if secret_present["status"] == HTTPStatus.NOT_FOUND: + check_mode_msg = "OK (2 bytes)" + check_mode_status = HTTPStatus.CREATED.value + + info = { + "msg": check_mode_msg, + "status": check_mode_status, + } + else: + check_mode_msg = "OK (unknown bytes)" + check_mode_status = HTTPStatus.NO_CONTENT + + info = { + "msg": check_mode_msg, + "status": check_mode_status, + } + else: + resp, info = fetch_url( + module, + url, + headers=headers, + data=json.dumps(payload).encode("utf-8"), + method="PUT", + ) + + if info["status"] not in (HTTPStatus.CREATED, HTTPStatus.NO_CONTENT): + module.fail_json(msg=f"Failed to upsert secret: {info}") + + return info + + +def delete_secret( + module: AnsibleModule, + api_url: str, + headers: dict[str, str], + organization: str, + repository: str, + key: str, +) -> dict[str, Any]: + """Delete a GitHub Actions secret.""" + url = ( + f"{api_url}/repos/{organization}/{repository}/actions/secrets/{key}" + if repository + else f"{api_url}/orgs/{organization}/actions/secrets/{key}" + ) + + if module.check_mode: + secret_present = check_secret(module, api_url, headers, organization, repository, key) + info = { + "msg": ( + "HTTP Error 404: Not Found" + if secret_present["status"] == HTTPStatus.NOT_FOUND + else "OK (unknown bytes)" + ), + "status": ( + HTTPStatus.NO_CONTENT if secret_present["status"] == HTTPStatus.OK else secret_present["status"] + ), + } + else: + resp, info = fetch_url( + module, + url, + headers=headers, + method="DELETE", + ) + + if info["status"] not in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND): + module.fail_json(msg=f"Failed to delete secret: {info}") + + return info + + +def main() -> None: + """Ansible module entry point.""" + argument_spec = { + "organization": { + "type": "str", + "aliases": ["org", "username"], + "required": True, + }, + "repository": {"type": "str", "aliases": ["repo"]}, + "key": {"type": "str", "no_log": False}, + "value": {"type": "str", "no_log": True}, + "visibility": {"type": "str", "choices": ["all", "private", "selected"]}, + "state": { + "type": "str", + "choices": ["present", "absent"], + "default": "present", + }, + "api_url": {"type": "str", "default": "https://api.github.com"}, + "token": {"type": "str", "required": True, "no_log": True}, + } + + module = AnsibleModule( + argument_spec=argument_spec, + required_if=[("state", "present", ["value"])], + required_by={"value": "key"}, + supports_check_mode=True, + ) + + deps.validate(module) + + organization: str = module.params["organization"] + repository: str = module.params["repository"] + key: str = module.params["key"] + value: str = module.params["value"] + visibility: str = module.params.get("visibility") + state: str = module.params["state"] + api_url: str = module.params["api_url"] + token: str = module.params["token"] + + if state == "present" and value is None: + module.fail_json( + msg="Invalid parameters", + details="The 'value' parameter cannot be empty", + params=module.params, + ) + + if state == "present" and not repository and not visibility: + module.fail_json( + msg="Invalid parameters", + details="When 'state' is 'present' and 'repository' is not set, 'visibility' must be provided", + params=module.params, + ) + + result: dict[str, Any] = {} + + headers = { + "Accept": "application/vnd.github+json", + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + + if state == "present": + key_id, public_key = get_public_key( + module, + api_url, + headers, + organization, + repository, + ) + + encrypted_value = encrypt_secret(public_key, value) + + upsert = upsert_secret( + module, + api_url, + headers, + organization, + repository, + key, + encrypted_value, + key_id, + ) + + response_msg = "Secret created" if upsert["status"] == HTTPStatus.CREATED else "Secret updated" + + result["changed"] = True + result.update( + result={ + "status": upsert["status"], + "msg": upsert.get("msg"), + "response": response_msg, + }, + ) + + if state == "absent": + delete = delete_secret( + module, + api_url, + headers, + organization, + repository, + key, + ) + + if delete["status"] == HTTPStatus.NO_CONTENT: + result["changed"] = True + result.update( + result={ + "status": delete["status"], + "msg": delete.get("msg"), + "response": "Secret deleted", + }, + ) + + if delete["status"] == HTTPStatus.NOT_FOUND: + result["changed"] = False + result.update( + result={ + "status": delete["status"], + "msg": delete.get("msg"), + "response": "Secret not found", + }, + ) + + module.exit_json(**result) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/plugins/modules/test_github_secrets.py b/tests/unit/plugins/modules/test_github_secrets.py new file mode 100644 index 0000000000..8e24624be6 --- /dev/null +++ b/tests/unit/plugins/modules/test_github_secrets.py @@ -0,0 +1,360 @@ +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest +from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import ( + AnsibleExitJson, + AnsibleFailJson, + exit_json, + fail_json, + set_module_args, +) + +from ansible_collections.community.general.plugins.modules import github_secrets + +PUBLIC_KEY_PAYLOAD = { + "key_id": "100", + "key": "j6gSA9mu5bO8ig+YBNU6oXRhGwd3Z4EFiS9rVmU9gwo=", +} + + +def make_fetch_url_response(body, status=200): + response = MagicMock() + response.read.return_value = json.dumps(body).encode("utf-8") + info = {"status": status, "msg": f"OK ({len(json.dumps(body))} bytes)"} + return (response, info) + + +@pytest.fixture(autouse=True) +def patch_module(): + with patch.multiple( + "ansible.module_utils.basic.AnsibleModule", + exit_json=exit_json, + fail_json=fail_json, + ): + yield + + +@pytest.fixture +def fetch_url_mock(): + with patch.object(github_secrets, "fetch_url") as mock: + yield mock + + +def test_encrypt_secret(): + result = github_secrets.encrypt_secret(PUBLIC_KEY_PAYLOAD["key"], "ansible") + assert isinstance(result, str) + assert result != "ansible" + assert len(result) > 70 + + +def test_fail_without_parameters(): + with pytest.raises(AnsibleFailJson): + with set_module_args({}): + github_secrets.main() + + +def test_fail_present_without_value(): + with pytest.raises(AnsibleFailJson) as exc: + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": None, + "state": "present", + "token": "ghp_test_token", + } + ): + github_secrets.main() + assert "value' parameter cannot be empty" in exc.value.args[0]["details"] + + +def test_fail_org_secret_present_without_visibility(): + with pytest.raises(AnsibleFailJson) as exc: + with set_module_args( + { + "organization": "myorg", + "key": "ORG_SECRET", + "value": "org_value", + "state": "present", + "token": "ghp_test_token", + } + ): + github_secrets.main() + assert "'visibility' must be provided" in exc.value.args[0]["details"] + + +def test_create_repo_secret(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=201), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": "secret_value", + "state": "present", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is True + assert result["result"]["status"] == 201 + assert result["result"]["response"] == "Secret created" + + +def test_update_repo_secret(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=204), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": "new_value", + "state": "present", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is True + assert result["result"]["response"] == "Secret updated" + + +def test_create_repo_secret_check_mode(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=404), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": "new_value", + "state": "present", + "token": "ghp_test_token", + "_ansible_check_mode": True, + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is True + assert result["result"]["status"] == 201 + assert result["result"]["response"] == "Secret created" + + +def test_update_repo_secret_check_mode(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=200), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": "new_value", + "state": "present", + "token": "ghp_test_token", + "_ansible_check_mode": True, + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is True + assert result["result"]["status"] == 204 + assert result["result"]["response"] == "Secret updated" + + +def test_delete_repo_secret_check_mode(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=200), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "state": "absent", + "token": "ghp_test_token", + "_ansible_check_mode": True, + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is True + assert result["result"]["status"] == 204 + assert result["result"]["response"] == "Secret deleted" + + +def test_update_empty_repo_secret(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=204), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": "", + "state": "present", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is True + assert result["result"]["response"] == "Secret updated" + + +def test_update_missing_value_repo_secret(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=204), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "state": "present", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleFailJson) as exc: + github_secrets.main() + + assert "the following are missing: value" in exc.value.args[0]["msg"] + + +def test_delete_repo_secret(fetch_url_mock): + fetch_url_mock.return_value = make_fetch_url_response({}, status=204) + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "state": "absent", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is True + assert result["result"]["status"] == 204 + assert result["result"]["response"] == "Secret deleted" + + +def test_delete_dne_repo_secret(fetch_url_mock): + fetch_url_mock.return_value = make_fetch_url_response({}, status=404) + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "DOES_NOT_EXIST", + "state": "absent", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleExitJson) as exc: + github_secrets.main() + + result = exc.value.args[0] + assert result["changed"] is False + assert result["result"]["status"] == 404 + assert result["result"]["response"] == "Secret not found" + + +def test_fail_get_public_key(fetch_url_mock): + fetch_url_mock.return_value = make_fetch_url_response({}, status=403) + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": "secret_value", + "state": "present", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleFailJson) as exc: + github_secrets.main() + assert "Failed to get public key" in exc.value.args[0]["msg"] + + +def test_fail_upsert_secret(fetch_url_mock): + fetch_url_mock.side_effect = [ + make_fetch_url_response(PUBLIC_KEY_PAYLOAD), + make_fetch_url_response({}, status=422), + ] + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "value": "secret_value", + "state": "present", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleFailJson) as exc: + github_secrets.main() + assert "Failed to upsert secret" in exc.value.args[0]["msg"] + + +def test_fail_delete_secret(fetch_url_mock): + fetch_url_mock.return_value = make_fetch_url_response({}, status=503) + + with set_module_args( + { + "organization": "myorg", + "repository": "myrepo", + "key": "MY_SECRET", + "state": "absent", + "token": "ghp_test_token", + } + ): + with pytest.raises(AnsibleFailJson) as exc: + github_secrets.main() + assert "Failed to delete secret" in exc.value.args[0]["msg"] diff --git a/tests/unit/requirements.txt b/tests/unit/requirements.txt index 78a082fc16..6a30a9ee84 100644 --- a/tests/unit/requirements.txt +++ b/tests/unit/requirements.txt @@ -19,6 +19,7 @@ linode_api4 # APIv4 python-gitlab PyGithub httmock +pynacl # requirement for maven_artifact module lxml