# # Copyright (c) 2021, Andreas Botzner # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations import traceback import typing as t from ansible.module_utils.basic import AnsibleModule, missing_required_lib if t.TYPE_CHECKING: from ansible.module_utils.basic import AnsibleModule REDIS_IMP_ERR: str | None = None try: from redis import Redis from redis import __version__ as redis_version HAS_REDIS_PACKAGE = True REDIS_IMP_ERR = None except ImportError: REDIS_IMP_ERR = traceback.format_exc() HAS_REDIS_PACKAGE = False CERTIFI_IMPORT_ERROR: str | None = None try: import certifi HAS_CERTIFI_PACKAGE = True except ImportError: CERTIFI_IMPORT_ERROR = traceback.format_exc() HAS_CERTIFI_PACKAGE = False def fail_imports(module: AnsibleModule, needs_certifi: bool = True) -> None: errors: list[str] = [] traceback: list[str] = [] if not HAS_REDIS_PACKAGE: errors.append(missing_required_lib("redis")) traceback.append(REDIS_IMP_ERR) # type: ignore if not HAS_CERTIFI_PACKAGE and needs_certifi: errors.append(missing_required_lib("certifi")) traceback.append(CERTIFI_IMPORT_ERROR) # type: ignore if errors: module.fail_json(msg="\n".join(errors), traceback="\n".join(traceback)) def redis_auth_argument_spec(tls_default: bool = True) -> dict[str, t.Any]: return dict( login_host=dict( type="str", default="localhost", ), login_user=dict(type="str"), login_password=dict(type="str", no_log=True), login_port=dict(type="int", default=6379), tls=dict(type="bool", default=tls_default), validate_certs=dict(type="bool", default=True), ca_certs=dict(type="str"), client_cert_file=dict(type="str"), client_key_file=dict(type="str"), ) def redis_auth_params(module: AnsibleModule) -> dict[str, t.Any]: login_host = module.params["login_host"] login_user = module.params["login_user"] login_password = module.params["login_password"] login_port = module.params["login_port"] tls = module.params["tls"] validate_certs = "required" if module.params["validate_certs"] else None ca_certs = module.params["ca_certs"] if tls and ca_certs is None: ca_certs = str(certifi.where()) client_cert_file = module.params["client_cert_file"] client_key_file = module.params["client_key_file"] if tuple(map(int, redis_version.split("."))) < (3, 4, 0) and login_user is not None: module.fail_json(msg="The option `username` in only supported with redis >= 3.4.0.") params = { "host": login_host, "port": login_port, "password": login_password, "ssl_ca_certs": ca_certs, "ssl_certfile": client_cert_file, "ssl_keyfile": client_key_file, "ssl_cert_reqs": validate_certs, "ssl": tls, } if login_user is not None: params["username"] = login_user return params class RedisAnsible: """Base class for Redis module""" def __init__(self, module: AnsibleModule) -> None: self.module = module self.connection = self._connect() def _connect(self) -> Redis: try: return Redis(**redis_auth_params(self.module)) except Exception as e: self.module.fail_json(msg=f"{e}")