diff --git a/plugins/module_utils/identity/keycloak/keycloak.py b/plugins/module_utils/identity/keycloak/keycloak.py index 3e19d5abe6..681d2417f3 100644 --- a/plugins/module_utils/identity/keycloak/keycloak.py +++ b/plugins/module_utils/identity/keycloak/keycloak.py @@ -4,16 +4,21 @@ from __future__ import annotations - +import copy import json import traceback -import copy +import typing as t from urllib.parse import urlencode, quote from urllib.error import HTTPError from ansible.module_utils.urls import open_url from ansible.module_utils.common.text.converters import to_native, to_text +if t.TYPE_CHECKING: + from collections.abc import Sequence + from ansible.module_utils.basic import AnsibleModule + + URL_REALM_INFO = "{url}/realms/{realm}" URL_REALMS = "{url}/admin/realms" URL_REALM = "{url}/admin/realms/{realm}" @@ -140,7 +145,7 @@ URL_AUTHZ_CUSTOM_POLICY = "{url}/admin/realms/{realm}/clients/{client_id}/authz/ URL_AUTHZ_CUSTOM_POLICIES = "{url}/admin/realms/{realm}/clients/{client_id}/authz/resource-server/policy" -def keycloak_argument_spec(): +def keycloak_argument_spec() -> dict[str, t.Any]: """ Returns argument_spec of options common to keycloak_*-modules @@ -161,20 +166,20 @@ def keycloak_argument_spec(): ) -def camel(words): +def camel(words: str) -> str: return words.split("_")[0] + "".join(x.capitalize() or "_" for x in words.split("_")[1:]) class KeycloakError(Exception): - def __init__(self, msg, authError=None): + def __init__(self, msg: str, authError: Exception | None = None) -> None: self.msg = msg self.authError = authError - def __str__(self): + def __str__(self) -> str: return str(self.msg) -def _token_request(module_params, payload): +def _token_request(module_params: dict[str, t.Any], payload: dict[str, t.Any]) -> str: """Obtains connection header with token for the authentication, using the provided auth_username/auth_password :param module_params: parameters of the module @@ -189,7 +194,7 @@ def _token_request(module_params, payload): 'refresh_token' for type 'refresh_token'. :return: access token """ - base_url = module_params.get("auth_keycloak_url") + base_url = module_params["auth_keycloak_url"] if not base_url.lower().startswith(("http", "https")): raise KeycloakError(f"auth_url '{base_url}' should either start with 'http' or 'https'.") auth_realm = module_params.get("auth_realm") @@ -219,7 +224,7 @@ def _token_request(module_params, payload): raise KeycloakError(f"Could not obtain access token from {auth_url}: {e}", authError=e) from e -def _request_token_using_credentials(module_params): +def _request_token_using_credentials(module_params: dict[str, t.Any]) -> str: """Obtains connection header with token for the authentication, using the provided auth_username/auth_password :param module_params: parameters of the module. Must include 'auth_username' and 'auth_password'. @@ -243,7 +248,7 @@ def _request_token_using_credentials(module_params): return _token_request(module_params, payload) -def _request_token_using_refresh_token(module_params): +def _request_token_using_refresh_token(module_params: dict[str, t.Any]) -> str: """Obtains connection header with token for the authentication, using the provided refresh_token :param module_params: parameters of the module. Must include 'refresh_token'. @@ -265,7 +270,7 @@ def _request_token_using_refresh_token(module_params): return _token_request(module_params, payload) -def _request_token_using_client_credentials(module_params): +def _request_token_using_client_credentials(module_params: dict[str, t.Any]) -> str: """Obtains connection header with token for the authentication, using the provided auth_client_id and auth_client_secret by grant_type client_credentials. Ensure that the used client uses client authorization @@ -288,7 +293,7 @@ def _request_token_using_client_credentials(module_params): return _token_request(module_params, payload) -def get_token(module_params): +def get_token(module_params: dict[str, t.Any]) -> dict[str, str]: """Obtains connection header with token for the authentication, token already given or obtained from credentials :param module_params: parameters of the module @@ -308,7 +313,7 @@ def get_token(module_params): return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} -def is_struct_included(struct1, struct2, exclude=None): +def is_struct_included(struct1: object, struct2: object, exclude: Sequence[str] | None = None) -> bool: """ This function compare if the first parameter structure is included in the second. The function use every elements of struct1 and validates they are present in the struct2 structure. @@ -372,7 +377,7 @@ class KeycloakAPI: is obtained through OpenID connect """ - def __init__(self, module, connection_header): + def __init__(self, module: AnsibleModule, connection_header: dict[str, str]) -> None: self.module = module self.baseurl = self.module.params.get("auth_keycloak_url") self.validate_certs = self.module.params.get("validate_certs") @@ -380,7 +385,7 @@ class KeycloakAPI: self.restheaders = connection_header self.http_agent = self.module.params.get("http_agent") - def _request(self, url, method, data=None): + def _request(self, url: str, method: str, data: str | bytes | None = None): """Makes a request to Keycloak and returns the raw response. If a 401 is returned, attempts to re-authenticate using first the module's refresh_token (if provided) @@ -394,7 +399,7 @@ class KeycloakAPI: :return: raw API response """ - def make_request_catching_401(): + def make_request_catching_401() -> object | HTTPError: try: return open_url( url, @@ -423,7 +428,7 @@ class KeycloakAPI: r = make_request_catching_401() except KeycloakError as e: # Token refresh returns 400 if token is expired/invalid, so continue on if we get a 400 - if e.authError is not None and e.authError.code != 400: + if e.authError is not None and e.authError.code != 400: # type: ignore # TODO! raise e if isinstance(r, Exception): @@ -448,7 +453,7 @@ class KeycloakAPI: r = make_request_catching_401() except KeycloakError as e: # Token refresh returns 400 if token is expired/invalid, so continue on if we get a 400 - if e.authError is not None and e.authError.code != 400: + if e.authError is not None and e.authError.code != 400: # type: ignore # TODO! raise e if isinstance(r, Exception): @@ -457,7 +462,7 @@ class KeycloakAPI: return r - def _request_and_deserialize(self, url, method, data=None): + def _request_and_deserialize(self, url: str, method: str, data: str | bytes | None = None): """Wraps the _request method with JSON deserialization of the response. :param url: request path @@ -467,7 +472,7 @@ class KeycloakAPI: """ return json.loads(self._request(url, method, data).read()) - def get_realm_info_by_id(self, realm="master"): + def get_realm_info_by_id(self, realm: str = "master") -> dict[str, t.Any] | None: """Obtain realm public info by id :param realm: realm id @@ -491,7 +496,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not obtain realm {realm}: {e}", exception=traceback.format_exc()) - def get_realm_keys_metadata_by_id(self, realm="master"): + def get_realm_keys_metadata_by_id(self, realm: str = "master") -> dict[str, t.Any] | None: """Obtain realm public info by id :param realm: realm id @@ -522,7 +527,7 @@ class KeycloakAPI: # The Keycloak API expects the realm name (like `master`) not the ID when fetching the realm data. # See the Keycloak API docs: https://www.keycloak.org/docs-api/latest/rest-api/#_realms_admin - def get_realm_by_id(self, realm="master"): + def get_realm_by_id(self, realm: str = "master") -> dict[str, t.Any] | None: """Obtain realm representation by id :param realm: realm id @@ -546,7 +551,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not obtain realm {realm}: {e}", exception=traceback.format_exc()) - def update_realm(self, realmrep, realm="master"): + def update_realm(self, realmrep, realm: str = "master"): """Update an existing realm :param realmrep: corresponding (partial/full) realm representation with updates :param realm: realm to be updated in Keycloak @@ -571,7 +576,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create realm {realmrep['id']}: {e}", exception=traceback.format_exc()) - def delete_realm(self, realm="master"): + def delete_realm(self, realm: str = "master"): """Delete a realm from Keycloak :param realm: realm to be deleted @@ -584,7 +589,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not delete realm {realm}: {e}", exception=traceback.format_exc()) - def get_clients(self, realm="master", filter=None): + def get_clients(self, realm: str = "master", filter=None): """Obtains client representations for clients in a realm :param realm: realm to be queried @@ -604,7 +609,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain list of clients for realm {realm}: {e}") - def get_client_by_clientid(self, client_id, realm="master"): + def get_client_by_clientid(self, client_id, realm: str = "master"): """Get client representation by clientId :param client_id: The clientId to be queried :param realm: realm from which to obtain the client representation @@ -616,7 +621,7 @@ class KeycloakAPI: else: return None - def get_client_by_id(self, id, realm="master"): + def get_client_by_id(self, id, realm: str = "master"): """Obtain client representation by id :param id: id (not clientId) of client to be queried @@ -640,7 +645,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not obtain client {id} for realm {realm}: {e}") - def get_client_id(self, client_id, realm="master"): + def get_client_id(self, client_id, realm: str = "master"): """Obtain id of client by client_id :param client_id: client_id of client to be queried @@ -653,7 +658,7 @@ class KeycloakAPI: else: return None - def update_client(self, id, clientrep, realm="master"): + def update_client(self, id, clientrep, realm: str = "master"): """Update an existing client :param id: id (not clientId) of client to be updated in Keycloak :param clientrep: corresponding (partial/full) client representation with updates @@ -667,7 +672,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update client {id} in realm {realm}: {e}") - def create_client(self, clientrep, realm="master"): + def create_client(self, clientrep, realm: str = "master"): """Create a client in keycloak :param clientrep: Client representation of client to be created. Must at least contain field clientId. :param realm: realm for client to be created. @@ -680,7 +685,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create client {clientrep['clientId']} in realm {realm}: {e}") - def delete_client(self, id, realm="master"): + def delete_client(self, id, realm: str = "master"): """Delete a client from Keycloak :param id: id (not clientId) of client to be deleted @@ -694,7 +699,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not delete client {id} in realm {realm}: {e}") - def get_client_roles_by_id(self, cid, realm="master"): + def get_client_roles_by_id(self, cid, realm: str = "master"): """Fetch the roles of the a client on the Keycloak server. :param cid: ID of the client from which to obtain the rolemappings. @@ -707,7 +712,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch rolemappings for client {cid} in realm {realm}: {e}") - def get_client_role_id_by_name(self, cid, name, realm="master"): + def get_client_role_id_by_name(self, cid, name, realm: str = "master"): """Get the role ID of a client. :param cid: ID of the client from which to obtain the rolemappings. @@ -721,7 +726,7 @@ class KeycloakAPI: return role["id"] return None - def get_client_group_rolemapping_by_id(self, gid, cid, rid, realm="master"): + def get_client_group_rolemapping_by_id(self, gid, cid, rid, realm: str = "master"): """Obtain client representation by id :param gid: ID of the group from which to obtain the rolemappings. @@ -742,7 +747,7 @@ class KeycloakAPI: ) return None - def get_client_group_available_rolemappings(self, gid, cid, realm="master"): + def get_client_group_available_rolemappings(self, gid, cid, realm: str = "master"): """Fetch the available role of a client in a specified group on the Keycloak server. :param gid: ID of the group from which to obtain the rolemappings. @@ -760,7 +765,7 @@ class KeycloakAPI: e, msg=f"Could not fetch available rolemappings for client {cid} in group {gid}, realm {realm}: {e}" ) - def get_client_group_composite_rolemappings(self, gid, cid, realm="master"): + def get_client_group_composite_rolemappings(self, gid, cid, realm: str = "master"): """Fetch the composite role of a client in a specified group on the Keycloak server. :param gid: ID of the group from which to obtain the rolemappings. @@ -778,7 +783,7 @@ class KeycloakAPI: e, msg=f"Could not fetch available rolemappings for client {cid} in group {gid}, realm {realm}: {e}" ) - def get_role_by_id(self, rid, realm="master"): + def get_role_by_id(self, rid, realm: str = "master"): """Fetch a role by its id on the Keycloak server. :param rid: ID of the role. @@ -791,7 +796,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch role for id {rid} in realm {realm}: {e}") - def get_client_roles_by_id_composite_rolemappings(self, rid, cid, realm="master"): + def get_client_roles_by_id_composite_rolemappings(self, rid, cid, realm: str = "master"): """Fetch a role by its id on the Keycloak server. :param rid: ID of the composite role. @@ -805,7 +810,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch role for id {rid} and cid {cid} in realm {realm}: {e}") - def add_client_roles_by_id_composite_rolemapping(self, rid, roles_rep, realm="master"): + def add_client_roles_by_id_composite_rolemapping(self, rid, roles_rep, realm: str = "master"): """Assign roles to composite role :param rid: ID of the composite role. @@ -819,7 +824,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not assign roles to composite role {rid} and realm {realm}: {e}") - def add_group_realm_rolemapping(self, gid, role_rep, realm="master"): + def add_group_realm_rolemapping(self, gid, role_rep, realm: str = "master"): """Add the specified realm role to specified group on the Keycloak server. :param gid: ID of the group to add the role mapping. @@ -833,7 +838,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could add realm role mappings for group {gid}, realm {realm}: {e}") - def delete_group_realm_rolemapping(self, gid, role_rep, realm="master"): + def delete_group_realm_rolemapping(self, gid, role_rep, realm: str = "master"): """Delete the specified realm role from the specified group on the Keycloak server. :param gid: ID of the group from which to obtain the rolemappings. @@ -847,7 +852,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not delete realm role mappings for group {gid}, realm {realm}: {e}") - def add_group_rolemapping(self, gid, cid, role_rep, realm="master"): + def add_group_rolemapping(self, gid, cid, role_rep, realm: str = "master"): """Fetch the composite role of a client in a specified group on the Keycloak server. :param gid: ID of the group from which to obtain the rolemappings. @@ -866,7 +871,7 @@ class KeycloakAPI: e, msg=f"Could not fetch available rolemappings for client {cid} in group {gid}, realm {realm}: {e}" ) - def delete_group_rolemapping(self, gid, cid, role_rep, realm="master"): + def delete_group_rolemapping(self, gid, cid, role_rep, realm: str = "master"): """Delete the rolemapping of a client in a specified group on the Keycloak server. :param gid: ID of the group from which to obtain the rolemappings. @@ -885,7 +890,7 @@ class KeycloakAPI: e, msg=f"Could not delete available rolemappings for client {cid} in group {gid}, realm {realm}: {e}" ) - def get_client_user_rolemapping_by_id(self, uid, cid, rid, realm="master"): + def get_client_user_rolemapping_by_id(self, uid, cid, rid, realm: str = "master"): """Obtain client representation by id :param uid: ID of the user from which to obtain the rolemappings. @@ -906,7 +911,7 @@ class KeycloakAPI: ) return None - def get_client_user_available_rolemappings(self, uid, cid, realm="master"): + def get_client_user_available_rolemappings(self, uid, cid, realm: str = "master"): """Fetch the available role of a client for a specified user on the Keycloak server. :param uid: ID of the user from which to obtain the rolemappings. @@ -924,7 +929,7 @@ class KeycloakAPI: e, msg=f"Could not fetch effective rolemappings for client {cid} and user {uid}, realm {realm}: {e}" ) - def get_client_user_composite_rolemappings(self, uid, cid, realm="master"): + def get_client_user_composite_rolemappings(self, uid, cid, realm: str = "master"): """Fetch the composite role of a client for a specified user on the Keycloak server. :param uid: ID of the user from which to obtain the rolemappings. @@ -940,7 +945,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch available rolemappings for user {uid} of realm {realm}: {e}") - def get_realm_user_rolemapping_by_id(self, uid, rid, realm="master"): + def get_realm_user_rolemapping_by_id(self, uid, rid, realm: str = "master"): """Obtain role representation by id :param uid: ID of the user from which to obtain the rolemappings. @@ -958,7 +963,7 @@ class KeycloakAPI: self.fail_request(e, msg=f"Could not fetch rolemappings for user {uid}, realm {realm}: {e}") return None - def get_realm_user_available_rolemappings(self, uid, realm="master"): + def get_realm_user_available_rolemappings(self, uid, realm: str = "master"): """Fetch the available role of a realm for a specified user on the Keycloak server. :param uid: ID of the user from which to obtain the rolemappings. @@ -971,7 +976,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch available rolemappings for user {uid} of realm {realm}: {e}") - def get_realm_user_composite_rolemappings(self, uid, realm="master"): + def get_realm_user_composite_rolemappings(self, uid, realm: str = "master"): """Fetch the composite role of a realm for a specified user on the Keycloak server. :param uid: ID of the user from which to obtain the rolemappings. @@ -984,7 +989,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch effective rolemappings for user {uid}, realm {realm}: {e}") - def get_user_by_username(self, username, realm="master"): + def get_user_by_username(self, username, realm: str = "master"): """Fetch a keycloak user within a realm based on its username. If the user does not exist, None is returned. @@ -1009,7 +1014,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain the user for realm {realm} and username {username}: {e}") - def get_service_account_user_by_client_id(self, client_id, realm="master"): + def get_service_account_user_by_client_id(self, client_id, realm: str = "master"): """Fetch a keycloak service account user within a realm based on its client_id. If the user does not exist, None is returned. @@ -1030,7 +1035,7 @@ class KeycloakAPI: e, msg=f"Could not obtain the service-account-user for realm {realm} and client_id {client_id}: {e}" ) - def add_user_rolemapping(self, uid, cid, role_rep, realm="master"): + def add_user_rolemapping(self, uid, cid, role_rep, realm: str = "master"): """Assign a realm or client role to a specified user on the Keycloak server. :param uid: ID of the user roles are assigned to. @@ -1060,7 +1065,7 @@ class KeycloakAPI: msg=f"Could not map roles to userId {cid} for client {uid}, realm {realm} and roles {json.dumps(role_rep)}: {e}", ) - def delete_user_rolemapping(self, uid, cid, role_rep, realm="master"): + def delete_user_rolemapping(self, uid, cid, role_rep, realm: str = "master"): """Delete the rolemapping of a client in a specified user on the Keycloak server. :param uid: ID of the user from which to remove the rolemappings. @@ -1089,7 +1094,7 @@ class KeycloakAPI: msg=f"Could not remove roles {json.dumps(role_rep)} for client {cid} from userId {uid}, realm {realm}: {e}", ) - def get_client_templates(self, realm="master"): + def get_client_templates(self, realm: str = "master"): """Obtains client template representations for client templates in a realm :param realm: realm to be queried @@ -1106,7 +1111,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain list of client templates for realm {realm}: {e}") - def get_client_template_by_id(self, id, realm="master"): + def get_client_template_by_id(self, id, realm: str = "master"): """Obtain client template representation by id :param id: id (not name) of client template to be queried @@ -1124,7 +1129,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain client template {id} for realm {realm}: {e}") - def get_client_template_by_name(self, name, realm="master"): + def get_client_template_by_name(self, name, realm: str = "master"): """Obtain client template representation by name :param name: name of client template to be queried @@ -1138,7 +1143,7 @@ class KeycloakAPI: return result[0] return None - def get_client_template_id(self, name, realm="master"): + def get_client_template_id(self, name, realm: str = "master"): """Obtain client template id by name :param name: name of client template to be queried @@ -1151,7 +1156,7 @@ class KeycloakAPI: else: return None - def update_client_template(self, id, clienttrep, realm="master"): + def update_client_template(self, id, clienttrep, realm: str = "master"): """Update an existing client template :param id: id (not name) of client template to be updated in Keycloak :param clienttrep: corresponding (partial/full) client template representation with updates @@ -1165,7 +1170,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update client template {id} in realm {realm}: {e}") - def create_client_template(self, clienttrep, realm="master"): + def create_client_template(self, clienttrep, realm: str = "master"): """Create a client in keycloak :param clienttrep: Client template representation of client template to be created. Must at least contain field name :param realm: realm for client template to be created in @@ -1178,7 +1183,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create client template {clienttrep['clientId']} in realm {realm}: {e}") - def delete_client_template(self, id, realm="master"): + def delete_client_template(self, id, realm: str = "master"): """Delete a client template from Keycloak :param id: id (not name) of client to be deleted @@ -1192,7 +1197,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not delete client template {id} in realm {realm}: {e}") - def get_clientscopes(self, realm="master"): + def get_clientscopes(self, realm: str = "master"): """Fetch the name and ID of all clientscopes on the Keycloak server. To fetch the full data of the group, make a subsequent call to @@ -1207,7 +1212,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch list of clientscopes in realm {realm}: {e}") - def get_clientscope_by_clientscopeid(self, cid, realm="master"): + def get_clientscope_by_clientscopeid(self, cid, realm: str = "master"): """Fetch a keycloak clientscope from the provided realm using the clientscope's unique ID. If the clientscope does not exist, None is returned. @@ -1228,7 +1233,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not clientscope group {cid} in realm {realm}: {e}") - def get_clientscope_by_name(self, name, realm="master"): + def get_clientscope_by_name(self, name, realm: str = "master"): """Fetch a keycloak clientscope within a realm based on its name. The Keycloak API does not allow filtering of the clientscopes resource by name. @@ -1251,7 +1256,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch clientscope {name} in realm {realm}: {e}") - def create_clientscope(self, clientscoperep, realm="master"): + def create_clientscope(self, clientscoperep, realm: str = "master"): """Create a Keycloak clientscope. :param clientscoperep: a ClientScopeRepresentation of the clientscope to be created. Must contain at minimum the field name. @@ -1263,7 +1268,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create clientscope {clientscoperep['name']} in realm {realm}: {e}") - def update_clientscope(self, clientscoperep, realm="master"): + def update_clientscope(self, clientscoperep, realm: str = "master"): """Update an existing clientscope. :param grouprep: A GroupRepresentation of the updated group. @@ -1277,7 +1282,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update clientscope {clientscoperep['name']} in realm {realm}: {e}") - def delete_clientscope(self, name=None, cid=None, realm="master"): + def delete_clientscope(self, name=None, cid=None, realm: str = "master"): """Delete a clientscope. One of name or cid must be provided. Providing the clientscope ID is preferred as it avoids a second lookup to @@ -1313,7 +1318,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to delete clientscope {cid}: {e}") - def get_clientscope_protocolmappers(self, cid, realm="master"): + def get_clientscope_protocolmappers(self, cid, realm: str = "master"): """Fetch the name and ID of all clientscopes on the Keycloak server. To fetch the full data of the group, make a subsequent call to @@ -1329,7 +1334,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch list of protocolmappers in realm {realm}: {e}") - def get_clientscope_protocolmapper_by_protocolmapperid(self, pid, cid, realm="master"): + def get_clientscope_protocolmapper_by_protocolmapperid(self, pid, cid, realm: str = "master"): """Fetch a keycloak clientscope from the provided realm using the clientscope's unique ID. If the clientscope does not exist, None is returned. @@ -1352,7 +1357,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch protocolmapper {cid} in realm {realm}: {e}") - def get_clientscope_protocolmapper_by_name(self, cid, name, realm="master"): + def get_clientscope_protocolmapper_by_name(self, cid, name, realm: str = "master"): """Fetch a keycloak clientscope within a realm based on its name. The Keycloak API does not allow filtering of the clientscopes resource by name. @@ -1378,7 +1383,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch protocolmapper {name} in realm {realm}: {e}") - def create_clientscope_protocolmapper(self, cid, mapper_rep, realm="master"): + def create_clientscope_protocolmapper(self, cid, mapper_rep, realm: str = "master"): """Create a Keycloak clientscope protocolmapper. :param cid: Id of the clientscope. @@ -1391,7 +1396,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create protocolmapper {mapper_rep['name']} in realm {realm}: {e}") - def update_clientscope_protocolmappers(self, cid, mapper_rep, realm="master"): + def update_clientscope_protocolmappers(self, cid, mapper_rep, realm: str = "master"): """Update an existing clientscope. :param cid: Id of the clientscope. @@ -1481,7 +1486,7 @@ class KeycloakAPI: if scope_type == "optional": return URL_CLIENT_OPTIONAL_CLIENTSCOPE - def add_default_clientscope(self, id, realm="master", client_id=None): + def add_default_clientscope(self, id, realm: str = "master", client_id=None): """Add a client scope as default either on realm or client level. :param id: Client scope Id. @@ -1490,7 +1495,7 @@ class KeycloakAPI: """ self._action_type_clientscope(id, client_id, "default", realm, "add") - def add_optional_clientscope(self, id, realm="master", client_id=None): + def add_optional_clientscope(self, id, realm: str = "master", client_id=None): """Add a client scope as optional either on realm or client level. :param id: Client scope Id. @@ -1499,7 +1504,7 @@ class KeycloakAPI: """ self._action_type_clientscope(id, client_id, "optional", realm, "add") - def delete_default_clientscope(self, id, realm="master", client_id=None): + def delete_default_clientscope(self, id, realm: str = "master", client_id=None): """Remove a client scope as default either on realm or client level. :param id: Client scope Id. @@ -1508,7 +1513,7 @@ class KeycloakAPI: """ self._action_type_clientscope(id, client_id, "default", realm, "delete") - def delete_optional_clientscope(self, id, realm="master", client_id=None): + def delete_optional_clientscope(self, id, realm: str = "master", client_id=None): """Remove a client scope as optional either on realm or client level. :param id: Client scope Id. @@ -1517,7 +1522,9 @@ class KeycloakAPI: """ self._action_type_clientscope(id, client_id, "optional", realm, "delete") - def _action_type_clientscope(self, id=None, client_id=None, scope_type="default", realm="master", action="add"): + def _action_type_clientscope( + self, id=None, client_id=None, scope_type="default", realm: str = "master", action="add" + ): """Delete or add a clientscope of type. :param name: The name of the clientscope. A lookup will be performed to retrieve the clientscope ID. :param client_id: The ID of the clientscope (preferred to name). @@ -1537,7 +1544,7 @@ class KeycloakAPI: place = "realm" if client_id is None else f"client {client_id}" self.fail_request(e, msg=f"Unable to {action} {scope_type} clientscope {id} @ {place} : {e}") - def create_clientsecret(self, id, realm="master"): + def create_clientsecret(self, id, realm: str = "master"): """Generate a new client secret by id :param id: id (not clientId) of client to be queried @@ -1557,7 +1564,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not obtain clientsecret of client {id} for realm {realm}: {e}") - def get_clientsecret(self, id, realm="master"): + def get_clientsecret(self, id, realm: str = "master"): """Obtain client secret by id :param id: id (not clientId) of client to be queried @@ -1577,7 +1584,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not obtain clientsecret of client {id} for realm {realm}: {e}") - def get_groups(self, realm="master"): + def get_groups(self, realm: str = "master"): """Fetch the name and ID of all groups on the Keycloak server. To fetch the full data of the group, make a subsequent call to @@ -1591,7 +1598,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch list of groups in realm {realm}: {e}") - def get_group_by_groupid(self, gid, realm="master"): + def get_group_by_groupid(self, gid, realm: str = "master"): """Fetch a keycloak group from the provided realm using the group's unique ID. If the group does not exist, None is returned. @@ -1611,7 +1618,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch group {gid} in realm {realm}: {e}") - def get_subgroups(self, parent, realm="master"): + def get_subgroups(self, parent, realm: str = "master"): if "subGroupCount" in parent: # Since version 23, when GETting a group Keycloak does not # return subGroups but only a subGroupCount. @@ -1626,7 +1633,7 @@ class KeycloakAPI: subgroups = parent["subGroups"] return subgroups - def get_group_by_name(self, name, realm="master", parents=None): + def get_group_by_name(self, name, realm: str = "master", parents=None): """Fetch a keycloak group within a realm based on its name. The Keycloak API does not allow filtering of the Groups resource by name. @@ -1668,7 +1675,7 @@ class KeycloakAPI: return (parent["name"], False) - def get_subgroup_by_chain(self, name_chain, realm="master"): + def get_subgroup_by_chain(self, name_chain, realm: str = "master"): """Access a subgroup API object by walking down a given name/id chain. Groups can be given either as by name or by ID, the first element @@ -1710,7 +1717,7 @@ class KeycloakAPI: return tmp - def get_subgroup_direct_parent(self, parents, realm="master", children_to_resolve=None): + def get_subgroup_direct_parent(self, parents, realm: str = "master", children_to_resolve=None): """Get keycloak direct parent group API object for a given chain of parents. To successfully work the API for subgroups we actually don't need @@ -1751,7 +1758,7 @@ class KeycloakAPI: children_to_resolve.append(cp) return self.get_subgroup_direct_parent(parents[1:], realm=realm, children_to_resolve=children_to_resolve) - def create_group(self, grouprep, realm="master"): + def create_group(self, grouprep, realm: str = "master"): """Create a Keycloak group. :param grouprep: a GroupRepresentation of the group to be created. Must contain at minimum the field name. @@ -1763,7 +1770,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create group {grouprep['name']} in realm {realm}: {e}") - def create_subgroup(self, parents, grouprep, realm="master"): + def create_subgroup(self, parents, grouprep, realm: str = "master"): """Create a Keycloak subgroup. :param parents: list of one or more parent groups @@ -1792,7 +1799,7 @@ class KeycloakAPI: msg=f"Could not create subgroup {grouprep['name']} for parent group {parent_id} in realm {realm}: {e}", ) - def update_group(self, grouprep, realm="master"): + def update_group(self, grouprep, realm: str = "master"): """Update an existing group. :param grouprep: A GroupRepresentation of the updated group. @@ -1805,7 +1812,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update group {grouprep['name']} in realm {realm}: {e}") - def delete_group(self, name=None, groupid=None, realm="master"): + def delete_group(self, name=None, groupid=None, realm: str = "master"): """Delete a group. One of name or groupid must be provided. Providing the group ID is preferred as it avoids a second lookup to @@ -1840,7 +1847,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to delete group {groupid}: {e}") - def get_realm_roles(self, realm="master"): + def get_realm_roles(self, realm: str = "master"): """Obtains role representations for roles in a realm :param realm: realm to be queried @@ -1856,7 +1863,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain list of roles for realm {realm}: {e}") - def get_realm_role(self, name, realm="master"): + def get_realm_role(self, name, realm: str = "master"): """Fetch a keycloak role from the provided realm using the role's name. If the role does not exist, None is returned. @@ -1874,7 +1881,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch role {name} in realm {realm}: {e}") - def create_realm_role(self, rolerep, realm="master"): + def create_realm_role(self, rolerep, realm: str = "master"): """Create a Keycloak realm role. :param rolerep: a RoleRepresentation of the role to be created. Must contain at minimum the field name. @@ -1889,7 +1896,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create role {rolerep['name']} in realm {realm}: {e}") - def update_realm_role(self, rolerep, realm="master"): + def update_realm_role(self, rolerep, realm: str = "master"): """Update an existing realm role. :param rolerep: A RoleRepresentation of the updated role. @@ -1908,7 +1915,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update role {rolerep['name']} in realm {realm}: {e}") - def get_role_composites(self, rolerep, clientid=None, realm="master"): + def get_role_composites(self, rolerep, clientid=None, realm: str = "master"): composite_url = "" try: if clientid is not None: @@ -1926,7 +1933,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not get role {rolerep['name']} composites in realm {realm}: {e}") - def create_role_composites(self, rolerep, composites, clientid=None, realm="master"): + def create_role_composites(self, rolerep, composites, clientid=None, realm: str = "master"): composite_url = "" try: if clientid is not None: @@ -1945,7 +1952,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create role {rolerep['name']} composites in realm {realm}: {e}") - def delete_role_composites(self, rolerep, composites, clientid=None, realm="master"): + def delete_role_composites(self, rolerep, composites, clientid=None, realm: str = "master"): composite_url = "" try: if clientid is not None: @@ -1964,7 +1971,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create role {rolerep['name']} composites in realm {realm}: {e}") - def update_role_composites(self, rolerep, composites, clientid=None, realm="master"): + def update_role_composites(self, rolerep, composites, clientid=None, realm: str = "master"): # Get existing composites existing_composites = self.get_role_composites(rolerep=rolerep, clientid=clientid, realm=realm) composites_to_be_created = [] @@ -2021,7 +2028,7 @@ class KeycloakAPI: rolerep=rolerep, composites=composites_to_be_deleted, clientid=clientid, realm=realm ) - def delete_realm_role(self, name, realm="master"): + def delete_realm_role(self, name, realm: str = "master"): """Delete a realm role. :param name: The name of the role. @@ -2033,7 +2040,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to delete role {name} in realm {realm}: {e}") - def get_client_roles(self, clientid, realm="master"): + def get_client_roles(self, clientid, realm: str = "master"): """Obtains role representations for client roles in a specific client :param clientid: Client id to be queried @@ -2053,7 +2060,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain list of roles for client {clientid} in realm {realm}: {e}") - def get_client_role(self, name, clientid, realm="master"): + def get_client_role(self, name, clientid, realm: str = "master"): """Fetch a keycloak client role from the provided realm using the role's name. :param name: Name of the role to fetch. @@ -2076,7 +2083,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch role {name} for client {clientid} in realm {realm}: {e}") - def create_client_role(self, rolerep, clientid, realm="master"): + def create_client_role(self, rolerep, clientid, realm: str = "master"): """Create a Keycloak client role. :param rolerep: a RoleRepresentation of the role to be created. Must contain at minimum the field name. @@ -2110,7 +2117,7 @@ class KeycloakAPI: keycloak_compatible_composites["realm"].append(composite["name"]) return keycloak_compatible_composites - def update_client_role(self, rolerep, clientid, realm="master"): + def update_client_role(self, rolerep, clientid, realm: str = "master"): """Update an existing client role. :param rolerep: A RoleRepresentation of the updated role. @@ -2136,7 +2143,7 @@ class KeycloakAPI: e, msg=f"Could not update role {rolerep['name']} for client {clientid} in realm {realm}: {e}" ) - def delete_client_role(self, name, clientid, realm="master"): + def delete_client_role(self, name, clientid, realm: str = "master"): """Delete a role. One of name or roleid must be provided. :param name: The name of the role. @@ -2152,7 +2159,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to delete role {name} for client {clientid} in realm {realm}: {e}") - def get_authentication_flow_by_alias(self, alias, realm="master"): + def get_authentication_flow_by_alias(self, alias, realm: str = "master"): """ Get an authentication flow by its alias :param alias: Alias of the authentication flow to get. @@ -2173,7 +2180,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable get authentication flow {alias}: {e}") - def delete_authentication_flow_by_id(self, id, realm="master"): + def delete_authentication_flow_by_id(self, id, realm: str = "master"): """ Delete an authentication flow from Keycloak :param id: id of authentication flow to be deleted @@ -2187,7 +2194,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not delete authentication flow {id} in realm {realm}: {e}") - def copy_auth_flow(self, config, realm="master"): + def copy_auth_flow(self, config, realm: str = "master"): """ Create a new authentication flow from a copy of another. :param config: Representation of the authentication flow to create. @@ -2213,7 +2220,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not copy authentication flow {config['alias']} in realm {realm}: {e}") - def create_empty_auth_flow(self, config, realm="master"): + def create_empty_auth_flow(self, config, realm: str = "master"): """ Create a new empty authentication flow. :param config: Representation of the authentication flow to create. @@ -2239,7 +2246,7 @@ class KeycloakAPI: e, msg=f"Could not create empty authentication flow {config['alias']} in realm {realm}: {e}" ) - def update_authentication_executions(self, flowAlias, updatedExec, realm="master"): + def update_authentication_executions(self, flowAlias, updatedExec, realm: str = "master"): """Update authentication executions :param flowAlias: name of the parent flow @@ -2262,7 +2269,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Unable to update executions {updatedExec}: {e}") - def add_authenticationConfig_to_execution(self, executionId, authenticationConfig, realm="master"): + def add_authenticationConfig_to_execution(self, executionId, authenticationConfig, realm: str = "master"): """Add autenticatorConfig to the execution :param executionId: id of execution @@ -2278,7 +2285,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to add authenticationConfig {executionId}: {e}") - def delete_authentication_config(self, configId, realm="master"): + def delete_authentication_config(self, configId, realm: str = "master"): """Delete authenticator config :param configId: id of authentication config @@ -2290,7 +2297,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to delete authentication config {configId}: {e}") - def create_subflow(self, subflowName, flowAlias, realm="master", flowType="basic-flow"): + def create_subflow(self, subflowName, flowAlias, realm: str = "master", flowType="basic-flow"): """Create new sublow on the flow :param subflowName: name of the subflow to create @@ -2312,7 +2319,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to create new subflow {subflowName}: {e}") - def create_execution(self, execution, flowAlias, realm="master"): + def create_execution(self, execution, flowAlias, realm: str = "master"): """Create new execution on the flow :param execution: name of execution to create @@ -2338,7 +2345,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Unable to create new execution '{flowAlias}' {execution['providerId']}: {e}") - def change_execution_priority(self, executionId, diff, realm="master"): + def change_execution_priority(self, executionId, diff, realm: str = "master"): """Raise or lower execution priority of diff time :param executionId: id of execution to lower priority @@ -2366,7 +2373,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to change execution priority {executionId}: {e}") - def get_executions_representation(self, config, realm="master"): + def get_executions_representation(self, config, realm: str = "master"): """ Get a representation of the executions for an authentication flow. :param config: Representation of the authentication flow @@ -2399,7 +2406,7 @@ class KeycloakAPI: e, msg=f"Could not get executions for authentication flow {config['alias']} in realm {realm}: {e}" ) - def get_required_actions(self, realm="master"): + def get_required_actions(self, realm: str = "master"): """ Get required actions. :param realm: Realm name (not id). @@ -2415,7 +2422,7 @@ class KeycloakAPI: except Exception: return None - def register_required_action(self, rep, realm="master"): + def register_required_action(self, rep, realm: str = "master"): """ Register required action. :param rep: JSON containing 'providerId', and 'name' attributes. @@ -2434,7 +2441,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to register required action {rep['name']} in realm {realm}: {e}") - def update_required_action(self, alias, rep, realm="master"): + def update_required_action(self, alias, rep, realm: str = "master"): """ Update required action. :param alias: Alias of required action. @@ -2454,7 +2461,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to update required action {alias} in realm {realm}: {e}") - def delete_required_action(self, alias, realm="master"): + def delete_required_action(self, alias, realm: str = "master"): """ Delete required action. :param alias: Alias of required action. @@ -2472,7 +2479,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to delete required action {alias} in realm {realm}: {e}") - def get_identity_providers(self, realm="master"): + def get_identity_providers(self, realm: str = "master"): """Fetch representations for identity providers in a realm :param realm: realm to be queried :return: list of representations for identity providers @@ -2487,7 +2494,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain list of identity providers for realm {realm}: {e}") - def get_identity_provider(self, alias, realm="master"): + def get_identity_provider(self, alias, realm: str = "master"): """Fetch identity provider representation from a realm using the idp's alias. If the identity provider does not exist, None is returned. :param alias: Alias of the identity provider to fetch. @@ -2504,7 +2511,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch identity provider {alias} in realm {realm}: {e}") - def create_identity_provider(self, idprep, realm="master"): + def create_identity_provider(self, idprep, realm: str = "master"): """Create an identity provider. :param idprep: Identity provider representation of the idp to be created. :param realm: Realm in which this identity provider resides, default "master". @@ -2516,7 +2523,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create identity provider {idprep['alias']} in realm {realm}: {e}") - def update_identity_provider(self, idprep, realm="master"): + def update_identity_provider(self, idprep, realm: str = "master"): """Update an existing identity provider. :param idprep: Identity provider representation of the idp to be updated. :param realm: Realm in which this identity provider resides, default "master". @@ -2528,7 +2535,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update identity provider {idprep['alias']} in realm {realm}: {e}") - def delete_identity_provider(self, alias, realm="master"): + def delete_identity_provider(self, alias, realm: str = "master"): """Delete an identity provider. :param alias: Alias of the identity provider. :param realm: Realm in which this identity provider resides, default "master". @@ -2539,7 +2546,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Unable to delete identity provider {alias} in realm {realm}: {e}") - def get_identity_provider_mappers(self, alias, realm="master"): + def get_identity_provider_mappers(self, alias, realm: str = "master"): """Fetch representations for identity provider mappers :param alias: Alias of the identity provider. :param realm: realm to be queried @@ -2557,7 +2564,7 @@ class KeycloakAPI: e, msg=f"Could not obtain list of identity provider mappers for idp {alias} in realm {realm}: {e}" ) - def fetch_idp_endpoints_import_config_url(self, fromUrl, providerId="oidc", realm="master"): + def fetch_idp_endpoints_import_config_url(self, fromUrl, providerId="oidc", realm: str = "master"): """Import an identity provider configuration through Keycloak server from a well-known URL. :param fromUrl: URL to import the identity provider configuration from. "param providerId: Provider ID of the identity provider to import, default 'oidc'. @@ -2571,7 +2578,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not import the IdP config in realm {realm}: {e}") - def get_identity_provider_mapper(self, mid, alias, realm="master"): + def get_identity_provider_mapper(self, mid, alias, realm: str = "master"): """Fetch identity provider representation from a realm using the idp's alias. If the identity provider does not exist, None is returned. :param mid: Unique ID of the mapper to fetch. @@ -2593,7 +2600,7 @@ class KeycloakAPI: msg=f"Could not fetch mapper {mid} for identity provider {alias} in realm {realm}: {e}" ) - def create_identity_provider_mapper(self, mapper, alias, realm="master"): + def create_identity_provider_mapper(self, mapper, alias, realm: str = "master"): """Create an identity provider mapper. :param mapper: IdentityProviderMapperRepresentation of the mapper to be created. :param alias: Alias of the identity provider. @@ -2609,7 +2616,7 @@ class KeycloakAPI: msg=f"Could not create identity provider mapper {mapper['name']} for idp {alias} in realm {realm}: {e}", ) - def update_identity_provider_mapper(self, mapper, alias, realm="master"): + def update_identity_provider_mapper(self, mapper, alias, realm: str = "master"): """Update an existing identity provider. :param mapper: IdentityProviderMapperRepresentation of the mapper to be updated. :param alias: Alias of the identity provider. @@ -2624,7 +2631,7 @@ class KeycloakAPI: e, msg=f"Could not update mapper {mapper['id']} for identity provider {alias} in realm {realm}: {e}" ) - def delete_identity_provider_mapper(self, mid, alias, realm="master"): + def delete_identity_provider_mapper(self, mid, alias, realm: str = "master"): """Delete an identity provider. :param mid: Unique ID of the mapper to delete. :param alias: Alias of the identity provider. @@ -2638,7 +2645,7 @@ class KeycloakAPI: e, msg=f"Unable to delete mapper {mid} for identity provider {alias} in realm {realm}: {e}" ) - def get_components(self, filter=None, realm="master"): + def get_components(self, filter=None, realm: str = "master"): """Fetch representations for components in a realm :param realm: realm to be queried :param filter: search filter @@ -2657,7 +2664,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not obtain list of components for realm {realm}: {e}") - def get_component(self, cid, realm="master"): + def get_component(self, cid, realm: str = "master"): """Fetch component representation from a realm using its cid. If the component does not exist, None is returned. :param cid: Unique ID of the component to fetch. @@ -2674,7 +2681,7 @@ class KeycloakAPI: except Exception as e: self.module.fail_json(msg=f"Could not fetch component {cid} in realm {realm}: {e}") - def create_component(self, comprep, realm="master"): + def create_component(self, comprep, realm: str = "master"): """Create an component. :param comprep: Component representation of the component to be created. :param realm: Realm in which this component resides, default "master". @@ -2690,7 +2697,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not create component in realm {realm}: {e}") - def update_component(self, comprep, realm="master"): + def update_component(self, comprep, realm: str = "master"): """Update an existing component. :param comprep: Component representation of the component to be updated. :param realm: Realm in which this component resides, default "master". @@ -2705,7 +2712,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update component {cid} in realm {realm}: {e}") - def delete_component(self, cid, realm="master"): + def delete_component(self, cid, realm: str = "master"): """Delete an component. :param cid: Unique ID of the component. :param realm: Realm in which this component resides, default "master". @@ -2757,7 +2764,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not delete scope {id} for client {client_id} in realm {realm}: {e}") - def get_user_by_id(self, user_id, realm="master"): + def get_user_by_id(self, user_id, realm: str = "master"): """ Get a User by its ID. :param user_id: ID of the user. @@ -2771,7 +2778,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not get user {user_id} in realm {realm}: {e}") - def create_user(self, userrep, realm="master"): + def create_user(self, userrep, realm: str = "master"): """ Create a new User. :param userrep: Representation of the user to create @@ -2805,7 +2812,7 @@ class KeycloakAPI: module_attributes_list.append(attr) return module_attributes_list - def update_user(self, userrep, realm="master"): + def update_user(self, userrep, realm: str = "master"): """ Update a User. :param userrep: Representation of the user to update. This representation must include the ID of the user. @@ -2823,7 +2830,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not update user {userrep['username']} in realm {realm}: {e}") - def delete_user(self, user_id, realm="master"): + def delete_user(self, user_id, realm: str = "master"): """ Delete a User. :param user_id: ID of the user to be deleted @@ -2836,7 +2843,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not delete user {user_id} in realm {realm}: {e}") - def get_user_groups(self, user_id, realm="master"): + def get_user_groups(self, user_id, realm: str = "master"): """ Get the group names for a user. :param user_id: User ID @@ -2846,7 +2853,7 @@ class KeycloakAPI: user_groups = self.get_user_group_details(user_id, realm) return [user_group["name"] for user_group in user_groups if "name" in user_group] - def get_user_group_details(self, user_id, realm="master"): + def get_user_group_details(self, user_id, realm: str = "master"): """ Get the group details for a user. :param user_id: User ID @@ -2859,11 +2866,11 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not get groups for user {user_id} in realm {realm}: {e}") - def add_user_in_group(self, user_id, group_id, realm="master"): + def add_user_in_group(self, user_id, group_id, realm: str = "master"): """DEPRECATED: Call add_user_to_group(...) instead. This method is scheduled for removal in community.general 13.0.0.""" return self.add_user_to_group(user_id, group_id, realm) - def add_user_to_group(self, user_id, group_id, realm="master"): + def add_user_to_group(self, user_id, group_id, realm: str = "master"): """ Add a user to a group. :param user_id: User ID @@ -2877,7 +2884,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not add user {user_id} to group {group_id} in realm {realm}: {e}") - def remove_user_from_group(self, user_id, group_id, realm="master"): + def remove_user_from_group(self, user_id, group_id, realm: str = "master"): """ Remove a user from a group for a user. :param user_id: User ID @@ -2891,7 +2898,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not remove user {user_id} from group {group_id} in realm {realm}: {e}") - def update_user_groups_membership(self, userrep, groups, realm="master"): + def update_user_groups_membership(self, userrep, groups, realm: str = "master"): """ Update user's group membership :param userrep: Representation of the user. This representation must include the ID. @@ -2949,7 +2956,7 @@ class KeycloakAPI: groups_to_remove.append(group_name) return groups_to_add, groups_to_remove - def find_group_by_path(self, target, realm="master"): + def find_group_by_path(self, target, realm: str = "master"): """ Finds a realm group by path, e.g. '/my/group'. The path is formed by prepending a '/' character to `target` unless it's already present. @@ -3073,7 +3080,7 @@ class KeycloakAPI: except Exception: return False - def get_client_role_scope_from_client(self, clientid, clientscopeid, realm="master"): + def get_client_role_scope_from_client(self, clientid, clientscopeid, realm: str = "master"): """Fetch the roles associated with the client's scope for a specific client on the Keycloak server. :param clientid: ID of the client from which to obtain the associated roles. :param clientscopeid: ID of the client who owns the roles. @@ -3088,7 +3095,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch roles scope for client {clientid} in realm {realm}: {e}") - def update_client_role_scope_from_client(self, payload, clientid, clientscopeid, realm="master"): + def update_client_role_scope_from_client(self, payload, clientid, clientscopeid, realm: str = "master"): """Update and fetch the roles associated with the client's scope on the Keycloak server. :param payload: List of roles to be added to the scope. :param clientid: ID of the client to update scope. @@ -3107,7 +3114,7 @@ class KeycloakAPI: return self.get_client_role_scope_from_client(clientid, clientscopeid, realm) - def delete_client_role_scope_from_client(self, payload, clientid, clientscopeid, realm="master"): + def delete_client_role_scope_from_client(self, payload, clientid, clientscopeid, realm: str = "master"): """Delete the roles contains in the payload from the client's scope on the Keycloak server. :param payload: List of roles to be deleted. :param clientid: ID of the client to delete roles from scope. @@ -3126,7 +3133,7 @@ class KeycloakAPI: return self.get_client_role_scope_from_client(clientid, clientscopeid, realm) - def get_client_role_scope_from_realm(self, clientid, realm="master"): + def get_client_role_scope_from_realm(self, clientid, realm: str = "master"): """Fetch the realm roles from the client's scope on the Keycloak server. :param clientid: ID of the client from which to obtain the associated realm roles. :param realm: Realm from which to obtain the clients. @@ -3138,7 +3145,7 @@ class KeycloakAPI: except Exception as e: self.fail_request(e, msg=f"Could not fetch roles scope for client {clientid} in realm {realm}: {e}") - def update_client_role_scope_from_realm(self, payload, clientid, realm="master"): + def update_client_role_scope_from_realm(self, payload, clientid, realm: str = "master"): """Update and fetch the realm roles from the client's scope on the Keycloak server. :param payload: List of realm roles to add. :param clientid: ID of the client to update scope. @@ -3154,7 +3161,7 @@ class KeycloakAPI: return self.get_client_role_scope_from_realm(clientid, realm) - def delete_client_role_scope_from_realm(self, payload, clientid, realm="master"): + def delete_client_role_scope_from_realm(self, payload, clientid, realm: str = "master"): """Delete the realm roles contains in the payload from the client's scope on the Keycloak server. :param payload: List of realm roles to delete. :param clientid: ID of the client to delete roles from scope. @@ -3170,7 +3177,7 @@ class KeycloakAPI: return self.get_client_role_scope_from_realm(clientid, realm) - def fail_request(self, e, msg, **kwargs): + def fail_request(self, e: Exception, msg: str, **kwargs: t.Any) -> t.NoReturn: """Triggers a module failure. This should be called when an exception occurs during/after a request. Attempts to parse the exception e as an HTTP error @@ -3179,7 +3186,6 @@ class KeycloakAPI: :param e: exception which triggered the failure :param msg: error message to display to the user :param kwargs: additional arguments to pass to module.fail_json - :return: None """ try: if isinstance(e, HTTPError): @@ -3188,7 +3194,7 @@ class KeycloakAPI: pass self.module.fail_json(msg, **kwargs) - def fail_open_url(self, e, msg, **kwargs): + def fail_open_url(self, e: Exception, msg: str, **kwargs: t.Any) -> t.NoReturn: """DEPRECATED: Use fail_request instead. Triggers a module failure. This should be called @@ -3199,12 +3205,11 @@ class KeycloakAPI: :param e: exception which triggered the failure :param msg: error message to display to the user :param kwargs: additional arguments to pass to module.fail_json - :return: None """ - return self.fail_request(e, msg, **kwargs) + self.fail_request(e, msg, **kwargs) def send_execute_actions_email( - self, user_id, realm="master", client_id=None, data=None, redirect_uri=None, lifespan=None + self, user_id, realm: str = "master", client_id=None, data=None, redirect_uri=None, lifespan=None ): """ Send an email to the user with a link they can click to perform required actions (e.g. reset password). diff --git a/plugins/module_utils/identity/keycloak/keycloak_clientsecret.py b/plugins/module_utils/identity/keycloak/keycloak_clientsecret.py index f3dc2b95d2..b8d6674a4c 100644 --- a/plugins/module_utils/identity/keycloak/keycloak_clientsecret.py +++ b/plugins/module_utils/identity/keycloak/keycloak_clientsecret.py @@ -5,13 +5,17 @@ from __future__ import annotations +import typing as t from ansible.module_utils.basic import AnsibleModule -from ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak import keycloak_argument_spec +from ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak import ( + keycloak_argument_spec, + KeycloakAPI, +) -def keycloak_clientsecret_module(): +def keycloak_clientsecret_module() -> AnsibleModule: """ Returns an AnsibleModule definition for modules that interact with a client secret. @@ -44,7 +48,7 @@ def keycloak_clientsecret_module(): return module -def keycloak_clientsecret_module_resolve_params(module, kc): +def keycloak_clientsecret_module_resolve_params(module: AnsibleModule, kc: KeycloakAPI) -> tuple[str, dict[str, t.Any]]: """ Given an AnsibleModule definition for keycloak_clientsecret_*, and a KeycloakAPI client, resolve the params needed to interact with the Keycloak diff --git a/plugins/module_utils/mh/base.py b/plugins/module_utils/mh/base.py index 871753f0d3..e8275d631f 100644 --- a/plugins/module_utils/mh/base.py +++ b/plugins/module_utils/mh/base.py @@ -40,7 +40,7 @@ class ModuleHelperBase: def verbosity(self): return self.module._verbosity - def do_raise(self, *args, **kwargs): + def do_raise(self, *args, **kwargs) -> t.NoReturn: raise _MHE(*args, **kwargs) def __getattr__(self, attr): @@ -61,14 +61,14 @@ class ModuleHelperBase: raise NotImplementedError() @property - def changed(self): + def changed(self) -> bool: try: return self.__changed__() except NotImplementedError: return self._changed @changed.setter - def changed(self, value): + def changed(self, value: bool) -> None: self._changed = value def has_changed(self): diff --git a/plugins/module_utils/mh/exceptions.py b/plugins/module_utils/mh/exceptions.py index 0e30b63268..612cfc1c3c 100644 --- a/plugins/module_utils/mh/exceptions.py +++ b/plugins/module_utils/mh/exceptions.py @@ -9,7 +9,7 @@ import typing as t class ModuleHelperException(Exception): - def __init__(self, msg: str, update_output: dict[str, t.Any] | None = None, *args, **kwargs): + def __init__(self, msg: str, update_output: dict[str, t.Any] | None = None, *args, **kwargs) -> None: self.msg: str = msg or f"Module failed with exception: {self}" if update_output is None: update_output = {} diff --git a/plugins/module_utils/mh/mixins/deprecate_attrs.py b/plugins/module_utils/mh/mixins/deprecate_attrs.py index c2991a66f1..81a3683ff8 100644 --- a/plugins/module_utils/mh/mixins/deprecate_attrs.py +++ b/plugins/module_utils/mh/mixins/deprecate_attrs.py @@ -5,12 +5,15 @@ from __future__ import annotations +import typing as t from ansible.module_utils.basic import AnsibleModule class DeprecateAttrsMixin: - def _deprecate_setup(self, attr, target, module): + def _deprecate_setup( + self, attr: str, target: object | None, module: AnsibleModule | None + ) -> tuple[object, AnsibleModule, dict[str, t.Any], dict[str, t.Any]]: if target is None: target = self if not hasattr(target, attr): @@ -37,8 +40,16 @@ class DeprecateAttrsMixin: return target, module, value_dict, trigger_dict def _deprecate_attr( - self, attr, msg, version=None, date=None, collection_name=None, target=None, value=None, module=None - ): + self, + attr: str, + msg: str, + version: str | None = None, + date: str | None = None, + collection_name: str | None = None, + target: object | None = None, + value=None, + module: AnsibleModule | None = None, + ) -> None: target, module, value_dict, trigger_dict = self._deprecate_setup(attr, target, module) value_dict[attr] = getattr(target, attr, value) diff --git a/plugins/module_utils/mh/mixins/state.py b/plugins/module_utils/mh/mixins/state.py index 7382def5b3..dbcf1f0a40 100644 --- a/plugins/module_utils/mh/mixins/state.py +++ b/plugins/module_utils/mh/mixins/state.py @@ -5,16 +5,18 @@ from __future__ import annotations +import typing as t + class StateMixin: state_param: str = "state" default_state: str | None = None - def _state(self): - state = self.module.params.get(self.state_param) + def _state(self) -> str: + state: str = self.module.params.get(self.state_param) # type: ignore[attr-defined] return self.default_state if state is None else state - def _method(self, state): + def _method(self, state: str) -> str: return f"{self.state_param}_{state}" def __run__(self): @@ -34,5 +36,5 @@ class StateMixin: func = getattr(self, method) return func() - def __state_fallback__(self): + def __state_fallback__(self) -> t.NoReturn: raise ValueError(f"Cannot find method: {self._method(self._state())}") diff --git a/plugins/module_utils/mh/module_helper.py b/plugins/module_utils/mh/module_helper.py index 9047de262d..0582dba7f1 100644 --- a/plugins/module_utils/mh/module_helper.py +++ b/plugins/module_utils/mh/module_helper.py @@ -16,6 +16,7 @@ from ansible_collections.community.general.plugins.module_utils.mh.mixins.deprec if t.TYPE_CHECKING: from collections.abc import Sequence + from ansible.module_utils.basic import AnsibleModule class ModuleHelper(DeprecateAttrsMixin, ModuleHelperBase): @@ -25,11 +26,11 @@ class ModuleHelper(DeprecateAttrsMixin, ModuleHelperBase): change_params: Sequence[str] = () facts_params: Sequence[str] = () - def __init__(self, module=None): + def __init__(self, module: AnsibleModule | dict[str, t.Any] | None = None) -> None: super().__init__(module) self.vars = VarDict() - for name, value in self.module.params.items(): + for name, value in self.module.params.items(): # type: ignore[union-attr] self.vars.set( name, value, @@ -39,26 +40,26 @@ class ModuleHelper(DeprecateAttrsMixin, ModuleHelperBase): fact=name in self.facts_params, ) - def update_vars(self, meta=None, **kwargs): + def update_vars(self, meta: dict[str, t.Any] | None = None, **kwargs: t.Any) -> None: if meta is None: meta = {} for k, v in kwargs.items(): self.vars.set(k, v, **meta) - def update_output(self, **kwargs): + def update_output(self, **kwargs: t.Any) -> None: self.update_vars(meta={"output": True}, **kwargs) - def update_facts(self, **kwargs): + def update_facts(self, **kwargs: t.Any) -> None: self.update_vars(meta={"fact": True}, **kwargs) - def _vars_changed(self): + def _vars_changed(self) -> bool: return self.vars.has_changed - def has_changed(self): + def has_changed(self) -> bool: return self.changed or self._vars_changed() @property - def output(self): + def output(self) -> dict[str, t.Any]: result = dict(self.vars.output()) if self.facts_name: facts = self.vars.facts() diff --git a/plugins/module_utils/net_tools/pritunl/api.py b/plugins/module_utils/net_tools/pritunl/api.py index 343dbab707..d6e3b15b25 100644 --- a/plugins/module_utils/net_tools/pritunl/api.py +++ b/plugins/module_utils/net_tools/pritunl/api.py @@ -14,15 +14,19 @@ import hmac import json import time import uuid +import typing as t from ansible.module_utils.urls import open_url +if t.TYPE_CHECKING: + from ansible.module_utils.basic import AnsibleModule + class PritunlException(Exception): pass -def pritunl_argument_spec(): +def pritunl_argument_spec() -> dict[str, t.Any]: return dict( pritunl_url=dict(required=True, type="str"), pritunl_api_token=dict(required=True, type="str", no_log=False), @@ -31,7 +35,7 @@ def pritunl_argument_spec(): ) -def get_pritunl_settings(module): +def get_pritunl_settings(module: AnsibleModule) -> dict[str, t.Any]: """ Helper function to set required Pritunl request params from module arguments. """ @@ -43,7 +47,7 @@ def get_pritunl_settings(module): } -def _get_pritunl_organizations(api_token, api_secret, base_url, validate_certs=True): +def _get_pritunl_organizations(api_token, api_secret, base_url: str, validate_certs: bool = True): return pritunl_auth_request( base_url=base_url, api_token=api_token, @@ -54,7 +58,9 @@ def _get_pritunl_organizations(api_token, api_secret, base_url, validate_certs=T ) -def _delete_pritunl_organization(api_token, api_secret, base_url, organization_id, validate_certs=True): +def _delete_pritunl_organization( + api_token, api_secret, base_url: str, organization_id: str, validate_certs: bool = True +): return pritunl_auth_request( base_url=base_url, api_token=api_token, @@ -65,7 +71,9 @@ def _delete_pritunl_organization(api_token, api_secret, base_url, organization_i ) -def _post_pritunl_organization(api_token, api_secret, base_url, organization_data, validate_certs=True): +def _post_pritunl_organization( + api_token, api_secret, base_url: str, organization_data: object, validate_certs: bool = True +): return pritunl_auth_request( api_token=api_token, api_secret=api_secret, @@ -78,7 +86,7 @@ def _post_pritunl_organization(api_token, api_secret, base_url, organization_dat ) -def _get_pritunl_users(api_token, api_secret, base_url, organization_id, validate_certs=True): +def _get_pritunl_users(api_token, api_secret, base_url: str, organization_id: str, validate_certs: bool = True): return pritunl_auth_request( api_token=api_token, api_secret=api_secret, @@ -89,7 +97,9 @@ def _get_pritunl_users(api_token, api_secret, base_url, organization_id, validat ) -def _delete_pritunl_user(api_token, api_secret, base_url, organization_id, user_id, validate_certs=True): +def _delete_pritunl_user( + api_token, api_secret, base_url: str, organization_id: str, user_id: str, validate_certs: bool = True +): return pritunl_auth_request( api_token=api_token, api_secret=api_secret, @@ -100,7 +110,9 @@ def _delete_pritunl_user(api_token, api_secret, base_url, organization_id, user_ ) -def _post_pritunl_user(api_token, api_secret, base_url, organization_id, user_data, validate_certs=True): +def _post_pritunl_user( + api_token, api_secret, base_url: str, organization_id: str, user_data: object, validate_certs=True +): return pritunl_auth_request( api_token=api_token, api_secret=api_secret, @@ -116,11 +128,11 @@ def _post_pritunl_user(api_token, api_secret, base_url, organization_id, user_da def _put_pritunl_user( api_token, api_secret, - base_url, - organization_id, - user_id, - user_data, - validate_certs=True, + base_url: str, + organization_id: str, + user_id: str, + user_data: object, + validate_certs: bool = True, ): return pritunl_auth_request( api_token=api_token, @@ -134,7 +146,9 @@ def _put_pritunl_user( ) -def list_pritunl_organizations(api_token, api_secret, base_url, validate_certs=True, filters=None): +def list_pritunl_organizations( + api_token, api_secret, base_url: str, validate_certs: bool = True, filters: dict[str, t.Any] | None = None +) -> list: orgs = [] response = _get_pritunl_organizations( @@ -158,7 +172,14 @@ def list_pritunl_organizations(api_token, api_secret, base_url, validate_certs=T return orgs -def list_pritunl_users(api_token, api_secret, base_url, organization_id, validate_certs=True, filters=None): +def list_pritunl_users( + api_token, + api_secret, + base_url: str, + organization_id: str, + validate_certs: bool = True, + filters: dict[str, t.Any] | None = None, +): users = [] response = _get_pritunl_users( @@ -187,9 +208,9 @@ def list_pritunl_users(api_token, api_secret, base_url, organization_id, validat def post_pritunl_organization( api_token, api_secret, - base_url, - organization_name, - validate_certs=True, + base_url: str, + organization_name: str, + validate_certs: bool = True, ): response = _post_pritunl_organization( api_token=api_token, @@ -208,11 +229,11 @@ def post_pritunl_organization( def post_pritunl_user( api_token, api_secret, - base_url, - organization_id, - user_data, - user_id=None, - validate_certs=True, + base_url: str, + organization_id: str, + user_data: object, + user_id: str | None = None, + validate_certs: bool = True, ): # If user_id is provided will do PUT otherwise will do POST if user_id is None: @@ -247,7 +268,9 @@ def post_pritunl_user( return json.loads(response.read()) -def delete_pritunl_organization(api_token, api_secret, base_url, organization_id, validate_certs=True): +def delete_pritunl_organization( + api_token, api_secret, base_url: str, organization_id: str, validate_certs: bool = True +): response = _delete_pritunl_organization( api_token=api_token, api_secret=api_secret, @@ -262,7 +285,9 @@ def delete_pritunl_organization(api_token, api_secret, base_url, organization_id return json.loads(response.read()) -def delete_pritunl_user(api_token, api_secret, base_url, organization_id, user_id, validate_certs=True): +def delete_pritunl_user( + api_token, api_secret, base_url: str, organization_id: str, user_id: str, validate_certs: bool = True +): response = _delete_pritunl_user( api_token=api_token, api_secret=api_secret, @@ -281,12 +306,12 @@ def delete_pritunl_user(api_token, api_secret, base_url, organization_id, user_i def pritunl_auth_request( api_token, api_secret, - base_url, - method, - path, - validate_certs=True, - headers=None, - data=None, + base_url: str, + method: str, + path: str, + validate_certs: bool = True, + headers: dict[str, str] | None = None, + data: bytes | str | None = None, ): """ Send an API call to a Pritunl server. diff --git a/plugins/module_utils/oracle/oci_utils.py b/plugins/module_utils/oracle/oci_utils.py index 2802a8d46a..4281afb0f2 100644 --- a/plugins/module_utils/oracle/oci_utils.py +++ b/plugins/module_utils/oracle/oci_utils.py @@ -14,13 +14,13 @@ import logging import logging.config import os import tempfile +import time +import typing as t # (TODO: remove next line!) from datetime import datetime # noqa: F401, pylint: disable=unused-import from operator import eq -import time - try: import yaml # noqa: F401, pylint: disable=unused-import @@ -48,6 +48,9 @@ except ImportError: from ansible.module_utils.common.text.converters import to_bytes +if t.TYPE_CHECKING: + from ansible.module_utils.basic import AnsibleModule + __version__ = "1.6.0-dev" MAX_WAIT_TIMEOUT_IN_SECONDS = 1200 @@ -81,7 +84,7 @@ DEFAULT_READY_STATES = [ DEFAULT_TERMINATED_STATES = ["TERMINATED", "DETACHED", "DELETED"] -def get_common_arg_spec(supports_create=False, supports_wait=False): +def get_common_arg_spec(supports_create: bool = False, supports_wait: bool = False) -> dict[str, t.Any]: """ Return the common set of module arguments for all OCI cloud modules. :param supports_create: Variable to decide whether to add options related to idempotency of create operation. @@ -125,7 +128,7 @@ def get_common_arg_spec(supports_create=False, supports_wait=False): return common_args -def get_facts_module_arg_spec(filter_by_name=False): +def get_facts_module_arg_spec(filter_by_name: bool = False) -> dict[str, t.Any]: # Note: This method is used by most OCI ansible fact modules during initialization. When making changes to this # method, ensure that no `oci` python sdk dependencies are introduced in this method. This ensures that the modules # can check for absence of OCI Python SDK and fail with an appropriate message. Introducing an OCI dependency in @@ -138,7 +141,7 @@ def get_facts_module_arg_spec(filter_by_name=False): return facts_module_arg_spec -def get_oci_config(module, service_client_class=None): +def get_oci_config(module: AnsibleModule, service_client_class=None): """Return the OCI configuration to use for all OCI API calls. The effective OCI configuration is derived by merging any overrides specified for configuration attributes through Ansible module options or environment variables. The order of precedence for deriving the effective configuration dict is: @@ -241,7 +244,7 @@ def get_oci_config(module, service_client_class=None): return config -def create_service_client(module, service_client_class): +def create_service_client(module: AnsibleModule, service_client_class): """ Creates a service client using the common module options provided by the user. :param module: An AnsibleModule that represents user provided options for a Task @@ -275,7 +278,7 @@ def create_service_client(module, service_client_class): return client -def _is_instance_principal_auth(module): +def _is_instance_principal_auth(module: AnsibleModule): # check if auth type is overridden via module params instance_principal_auth = "auth_type" in module.params and module.params["auth_type"] == "instance_principal" if not instance_principal_auth: @@ -285,7 +288,9 @@ def _is_instance_principal_auth(module): return instance_principal_auth -def _merge_auth_option(config, module, module_option_name, env_var_name, config_attr_name): +def _merge_auth_option( + config, module: AnsibleModule, module_option_name: str, env_var_name: str, config_attr_name: str +) -> None: """Merge the values for an authentication attribute from ansible module options and environment variables with the values specified in a configuration file""" _debug(f"Merging {module_option_name}") @@ -305,7 +310,7 @@ def _merge_auth_option(config, module, module_option_name, env_var_name, config_ config.update({config_attr_name: auth_attribute}) -def bucket_details_factory(bucket_details_type, module): +def bucket_details_factory(bucket_details_type: t.Literal["create", "update"], module: AnsibleModule): bucket_details = None if bucket_details_type == "create": bucket_details = CreateBucketDetails() @@ -320,7 +325,7 @@ def bucket_details_factory(bucket_details_type, module): return bucket_details -def filter_resources(all_resources, filter_params): +def filter_resources(all_resources, filter_params) -> list: if not filter_params: return all_resources filtered_resources = [] @@ -335,7 +340,7 @@ def filter_resources(all_resources, filter_params): return filtered_resources -def list_all_resources(target_fn, **kwargs): +def list_all_resources(target_fn, **kwargs) -> list: """ Return all resources after paging through all results returned by target_fn. If a `display_name` or `name` is provided as a kwarg, then only resources matching the specified name are returned. @@ -371,17 +376,17 @@ def list_all_resources(target_fn, **kwargs): return filter_resources(existing_resources, filter_params) -def _debug(s): +def _debug(s) -> None: get_logger("oci_utils").debug(s) -def get_logger(module_name): +def get_logger(module_name: str): oci_logging = setup_logging() return oci_logging.getLogger(module_name) def setup_logging( - default_level="INFO", + default_level: str = "INFO", ): """Setup logging configuration""" env_log_path = "LOG_PATH" @@ -396,7 +401,7 @@ def setup_logging( return logging -def check_and_update_attributes(target_instance, attr_name, input_value, existing_value, changed): +def check_and_update_attributes(target_instance, attr_name: str, input_value, existing_value, changed: bool) -> bool: """ This function checks the difference between two resource attributes of literal types and sets the attribute value in the target instance type holding the attribute. @@ -422,13 +427,13 @@ def check_and_update_resource( update_fn, primitive_params_update, kwargs_non_primitive_update, - module, + module: AnsibleModule, update_attributes, client=None, sub_attributes_of_update_model=None, wait_applicable=True, states=None, -): +) -> dict[str, t.Any]: """ This function handles update operation on a resource. It checks whether update is required and accordingly returns the resource and the changed status. @@ -481,10 +486,10 @@ def check_and_update_resource( def get_kwargs_update( attributes_to_update, kwargs_non_primitive_update, - module, + module: AnsibleModule, primitive_params_update, sub_attributes_of_update_model=None, -): +) -> dict[str, t.Any]: kwargs_update = dict() for param in primitive_params_update: kwargs_update[param] = module.params[param] @@ -500,7 +505,7 @@ def get_kwargs_update( return kwargs_update -def is_dictionary_subset(sub, super_dict): +def is_dictionary_subset(sub: dict, super_dict: dict) -> bool: """ This function checks if `sub` dictionary is a subset of `super` dictionary. :param sub: subset dictionary, for example user_provided_attr_value. @@ -510,7 +515,7 @@ def is_dictionary_subset(sub, super_dict): return all(sub[key] == super_dict[key] for key in sub) -def are_lists_equal(s, t): +def are_lists_equal(s: list | None, t: list | None): if s is None and t is None: return True @@ -541,7 +546,7 @@ def are_lists_equal(s, t): return not t -def get_attr_to_update(get_fn, kwargs_get, module, update_attributes): +def get_attr_to_update(get_fn, kwargs_get, module: AnsibleModule, update_attributes) -> tuple: try: resource = call_with_backoff(get_fn, **kwargs_get).data except ServiceError as ex: @@ -569,7 +574,7 @@ def get_attr_to_update(get_fn, kwargs_get, module, update_attributes): return attributes_to_update, resource -def get_taggable_arg_spec(supports_create=False, supports_wait=False): +def get_taggable_arg_spec(supports_create: bool = False, supports_wait: bool = False) -> dict[str, t.Any]: """ Returns an arg_spec that is valid for taggable OCI resources. :return: A dict that represents an ansible arg spec that builds over the common_arg_spec and adds free-form and @@ -580,7 +585,7 @@ def get_taggable_arg_spec(supports_create=False, supports_wait=False): return tag_arg_spec -def add_tags_to_model_from_module(model, module): +def add_tags_to_model_from_module(model, module: AnsibleModule): """ Adds free-form and defined tags from an ansible module to a resource model :param model: A resource model instance that supports 'freeform_tags' and 'defined_tags' as attributes @@ -620,7 +625,7 @@ def check_and_create_resource( kwargs_create, list_fn, kwargs_list, - module, + module: AnsibleModule, model, existing_resources=None, exclude_attributes=None, @@ -709,7 +714,7 @@ def check_and_create_resource( return result -def _get_attributes_to_consider(exclude_attributes, model, module): +def _get_attributes_to_consider(exclude_attributes, model, module: AnsibleModule): """ Determine the attributes to detect if an existing resource already matches the requested resource state :param exclude_attributes: Attributes to not consider for matching @@ -770,7 +775,7 @@ def is_attr_assigned_default(default_attribute_values, attr, assigned_value): return True -def create_resource(resource_type, create_fn, kwargs_create, module): +def create_resource(resource_type, create_fn, kwargs_create, module: AnsibleModule): """ Create an OCI resource :param resource_type: Type of the resource to be created. e.g.: "vcn" @@ -791,7 +796,7 @@ def create_resource(resource_type, create_fn, kwargs_create, module): def does_existing_resource_match_user_inputs( existing_resource, - module, + module: AnsibleModule, attributes_to_compare, exclude_attributes, default_attribute_values=None, @@ -858,13 +863,13 @@ def does_existing_resource_match_user_inputs( return True -def tuplize(d): +def tuplize(d) -> list[tuple]: """ This function takes a dictionary and converts it to a list of tuples recursively. :param d: A dictionary. :return: List of tuples. """ - list_of_tuples = [] + list_of_tuples: list[tuple] = [] key_list = sorted(list(d.keys())) for key in key_list: if isinstance(d[key], list): @@ -887,18 +892,18 @@ def tuplize(d): return list_of_tuples -def get_key_for_comparing_dict(d): +def get_key_for_comparing_dict(d) -> list[tuple]: tuple_form_of_d = tuplize(d) return tuple_form_of_d -def sort_dictionary(d): +def sort_dictionary(d: dict) -> dict: """ This function sorts values of a dictionary recursively. :param d: A dictionary. :return: Dictionary with sorted elements. """ - sorted_d = {} + sorted_d: dict = {} for key in d: if isinstance(d[key], list): if d[key] and isinstance(d[key][0], dict): @@ -913,7 +918,7 @@ def sort_dictionary(d): return sorted_d -def sort_list_of_dictionary(list_of_dict): +def sort_list_of_dictionary(list_of_dict: list[dict]) -> list[dict]: """ This functions sorts a list of dictionaries. It first sorts each value of the dictionary and then sorts the list of individually sorted dictionaries. For sorting, each dictionary's tuple equivalent is used. @@ -1103,7 +1108,7 @@ def create_and_wait( kwargs_create, get_fn, get_param, - module, + module: AnsibleModule, states=None, wait_applicable=True, kwargs_get=None, @@ -1151,7 +1156,7 @@ def update_and_wait( kwargs_update, get_fn, get_param, - module, + module: AnsibleModule, states=None, wait_applicable=True, kwargs_get=None, @@ -1196,7 +1201,7 @@ def create_or_update_resource_and_wait( resource_type, function, kwargs_function, - module, + module: AnsibleModule, wait_applicable, get_fn, get_param, @@ -1240,7 +1245,7 @@ def create_or_update_resource_and_wait( def wait_for_resource_lifecycle_state( client, - module, + module: AnsibleModule, wait_applicable, kwargs_get, get_fn, @@ -1291,7 +1296,7 @@ def wait_for_resource_lifecycle_state( return resource -def wait_on_work_request(client, response, module): +def wait_on_work_request(client, response, module: AnsibleModule): try: if module.params.get("wait", None): _debug(f"Waiting for work request with id {response.data.id} to reach SUCCEEDED state.") @@ -1325,11 +1330,11 @@ def delete_and_wait( kwargs_get, delete_fn, kwargs_delete, - module, + module: AnsibleModule, states=None, wait_applicable=True, process_work_request=False, -): +) -> dict[str, t.Any]: """A utility function to delete a resource and wait for the resource to get into the state as specified in the module options. :param wait_applicable: Specifies if wait for delete is applicable for this resource @@ -1348,7 +1353,7 @@ def delete_and_wait( """ states_set = set(["DETACHING", "DETACHED", "DELETING", "DELETED", "TERMINATING", "TERMINATED"]) - result = dict(changed=False) + result: dict[str, t.Any] = dict(changed=False) result[resource_type] = dict() try: resource = to_dict(call_with_backoff(get_fn, **kwargs_get).data) @@ -1412,7 +1417,7 @@ def delete_and_wait( return result -def are_attrs_equal(current_resource, module, attributes): +def are_attrs_equal(current_resource, module: AnsibleModule, attributes): """ Check if the specified attributes are equal in the specified 'model' and 'module'. This is used to check if an OCI Model instance already has the values specified by an Ansible user while invoking an OCI Ansible module and if a @@ -1440,7 +1445,7 @@ def are_attrs_equal(current_resource, module, attributes): return True -def _get_user_provided_value(module, attribute_name): +def _get_user_provided_value(module: AnsibleModule, attribute_name): """ Returns the user provided value for "attribute_name". We consider aliases in the module. """ @@ -1456,7 +1461,7 @@ def _get_user_provided_value(module, attribute_name): return user_provided_value -def update_model_with_user_options(curr_model, update_model, module): +def update_model_with_user_options(curr_model, update_model, module: AnsibleModule): """ Update the 'update_model' with user provided values in 'module' for the specified 'attributes' if they are different from the values in the 'curr_model'. @@ -1520,7 +1525,7 @@ def call_with_backoff(fn, **kwargs): raise -def generic_hash(obj): +def generic_hash(obj) -> int: """ Compute a hash of all the fields in the object :param obj: Object whose hash needs to be computed @@ -1540,7 +1545,7 @@ def generic_hash(obj): return sum -def generic_eq(s, other): +def generic_eq(s, other) -> bool: if other is None: return False return s.__dict__ == other.__dict__ @@ -1640,7 +1645,7 @@ def update_class_type_attr_difference(update_class_details, existing_instance, a return changed -def get_existing_resource(target_fn, module, **kwargs): +def get_existing_resource(target_fn, module: AnsibleModule, **kwargs): """ Returns the requested resource if it exists based on the input arguments. :param target_fn The function which should be used to find the requested resource @@ -1659,7 +1664,9 @@ def get_existing_resource(target_fn, module, **kwargs): return existing_resource -def get_attached_instance_info(module, lookup_attached_instance, list_attachments_fn, list_attachments_args): +def get_attached_instance_info( + module: AnsibleModule, lookup_attached_instance, list_attachments_fn, list_attachments_args +): config = get_oci_config(module) identity_client = create_service_client(module, IdentityClient) @@ -1747,12 +1754,12 @@ def get_component_list_difference(input_component_list, existing_components, pur return None, False -def write_to_file(path, content): +def write_to_file(path: str | bytes, content: bytes) -> None: with open(to_bytes(path), "wb") as dest_file: dest_file.write(content) -def get_target_resource_from_list(module, list_resource_fn, target_resource_id=None, **kwargs): +def get_target_resource_from_list(module: AnsibleModule, list_resource_fn, target_resource_id=None, **kwargs): """ Returns a resource filtered by identifier from a list of resources. This method should be used as an alternative of 'get resource' method when 'get resource' is nor provided by diff --git a/plugins/module_utils/remote_management/lxca/common.py b/plugins/module_utils/remote_management/lxca/common.py index ce3efc2a1d..53cb134b4f 100644 --- a/plugins/module_utils/remote_management/lxca/common.py +++ b/plugins/module_utils/remote_management/lxca/common.py @@ -14,6 +14,7 @@ from __future__ import annotations import traceback +import typing as t try: from pylxca import connect, disconnect @@ -22,11 +23,14 @@ try: except ImportError: HAS_PYLXCA = False +if t.TYPE_CHECKING: + from ansible.module_utils.basic import AnsibleModule + PYLXCA_REQUIRED = "Lenovo xClarity Administrator Python Client (Python package 'pylxca') is required for this module." -def has_pylxca(module): +def has_pylxca(module: AnsibleModule) -> None: """ Check pylxca is installed :param module: @@ -43,17 +47,17 @@ LXCA_COMMON_ARGS = dict( class connection_object: - def __init__(self, module): + def __init__(self, module: AnsibleModule) -> None: self.module = module def __enter__(self): return setup_conn(self.module) - def __exit__(self, type, value, traceback): + def __exit__(self, type, value, traceback) -> None: close_conn() -def setup_conn(module): +def setup_conn(module: AnsibleModule): """ this function create connection to LXCA :param module: @@ -70,7 +74,7 @@ def setup_conn(module): return lxca_con -def close_conn(): +def close_conn() -> None: """ this function close connection to LXCA :param module: diff --git a/plugins/module_utils/source_control/bitbucket.py b/plugins/module_utils/source_control/bitbucket.py index 902d77f6b7..3551d9cb91 100644 --- a/plugins/module_utils/source_control/bitbucket.py +++ b/plugins/module_utils/source_control/bitbucket.py @@ -5,20 +5,24 @@ from __future__ import annotations import json +import typing as t from ansible.module_utils.basic import env_fallback from ansible.module_utils.urls import fetch_url, basic_auth_header +if t.TYPE_CHECKING: + from ansible.module_utils.basic import AnsibleModule + class BitbucketHelper: BITBUCKET_API_URL = "https://api.bitbucket.org" - def __init__(self, module): + def __init__(self, module: AnsibleModule) -> None: self.module = module - self.access_token = None + self.access_token: str | None = None @staticmethod - def bitbucket_argument_spec(): + def bitbucket_argument_spec() -> dict[str, t.Any]: return dict( client_id=dict(type="str", fallback=(env_fallback, ["BITBUCKET_CLIENT_ID"])), client_secret=dict(type="str", no_log=True, fallback=(env_fallback, ["BITBUCKET_CLIENT_SECRET"])), @@ -30,14 +34,14 @@ class BitbucketHelper: ) @staticmethod - def bitbucket_required_one_of(): + def bitbucket_required_one_of() -> list[list[str]]: return [["client_id", "client_secret", "user", "password"]] @staticmethod - def bitbucket_required_together(): + def bitbucket_required_together() -> list[list[str]]: return [["client_id", "client_secret"], ["user", "password"]] - def fetch_access_token(self): + def fetch_access_token(self) -> None: if self.module.params["client_id"] and self.module.params["client_secret"]: headers = { "Authorization": basic_auth_header( @@ -57,7 +61,9 @@ class BitbucketHelper: else: self.module.fail_json(msg=f"Failed to retrieve access token: {info}") - def request(self, api_url, method, data=None, headers=None): + def request( + self, api_url: str, method: str, data: bytes | str | dict | None = None, headers: dict[str, str] | None = None + ): headers = headers or {} if self.access_token: diff --git a/plugins/module_utils/storage/hpe3par/hpe3par.py b/plugins/module_utils/storage/hpe3par/hpe3par.py index 2aeb9c1b40..8ad1ec818e 100644 --- a/plugins/module_utils/storage/hpe3par/hpe3par.py +++ b/plugins/module_utils/storage/hpe3par/hpe3par.py @@ -4,11 +4,12 @@ from __future__ import annotations +import typing as t from ansible.module_utils import basic -def convert_to_binary_multiple(size_with_unit): +def convert_to_binary_multiple(size_with_unit: str | None) -> int: if size_with_unit is None: return -1 valid_units = ["MiB", "GiB", "TiB"] @@ -35,7 +36,7 @@ storage_system_spec = { } -def cpg_argument_spec(): +def cpg_argument_spec() -> dict[str, t.Any]: spec = { "state": {"required": True, "choices": ["present", "absent"], "type": "str"}, "cpg_name": {"required": True, "type": "str"},