1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-03-22 05:09:12 +00:00

[PR #10841/986118c0 backport][stable-12] keycloak_realm_localization: new module - realm localization control (#11517)

keycloak_realm_localization: new module - realm localization control (#10841)

* add support for management of keycloak localizations

* unit test for keycloak localization support

* keycloak_realm_localization botmeta record

* rev: improvements after code review

(cherry picked from commit 986118c0af)

Co-authored-by: Jakub Danek <danekja@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2026-02-18 07:44:44 +01:00 committed by GitHub
parent 42c20a754b
commit 5dcb3b8f59
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 845 additions and 7 deletions

View file

@ -25,6 +25,9 @@ URL_REALMS = "{url}/admin/realms"
URL_REALM = "{url}/admin/realms/{realm}"
URL_REALM_KEYS_METADATA = "{url}/admin/realms/{realm}/keys"
URL_LOCALIZATIONS = "{url}/admin/realms/{realm}/localization/{locale}"
URL_LOCALIZATION = "{url}/admin/realms/{realm}/localization/{locale}/{key}"
URL_TOKEN = "{url}/realms/{realm}/protocol/openid-connect/token"
URL_CLIENT = "{url}/admin/realms/{realm}/clients/{id}"
URL_CLIENTS = "{url}/admin/realms/{realm}/clients"
@ -386,7 +389,9 @@ class KeycloakAPI:
self.restheaders = connection_header
self.http_agent = self.module.params.get("http_agent")
def _request(self, url: str, method: str, data: str | bytes | None = None):
def _request(
self, url: str, method: str, data: str | bytes | None = None, *, extra_headers: dict[str, str] | None = None
):
"""Makes a request to Keycloak and returns the raw response.
If a 401 is returned, attempts to re-authenticate
using first the module's refresh_token (if provided)
@ -397,17 +402,18 @@ class KeycloakAPI:
:param url: request path
:param method: request method (e.g., 'GET', 'POST', etc.)
:param data: (optional) data for request
:param extra_headers headers to be sent with request, defaults to self.restheaders
:return: raw API response
"""
def make_request_catching_401() -> object | HTTPError:
def make_request_catching_401(headers: dict[str, str]) -> object | HTTPError:
try:
return open_url(
url,
method=method,
data=data,
http_agent=self.http_agent,
headers=self.restheaders,
headers=headers,
timeout=self.connection_timeout,
validate_certs=self.validate_certs,
)
@ -416,7 +422,12 @@ class KeycloakAPI:
raise e
return e
r = make_request_catching_401()
headers = self.restheaders
if extra_headers is not None:
headers = headers.copy()
headers.update(extra_headers)
r = make_request_catching_401(headers)
if isinstance(r, Exception):
# Try to refresh token and retry, if available
@ -426,7 +437,7 @@ class KeycloakAPI:
token = _request_token_using_refresh_token(self.module.params)
self.restheaders["Authorization"] = f"Bearer {token}"
r = make_request_catching_401()
r = make_request_catching_401(headers)
except KeycloakError as e:
# Token refresh returns 400 if token is expired/invalid, so continue on if we get a 400
if e.authError is not None and e.authError.code != 400: # type: ignore # TODO!
@ -440,7 +451,7 @@ class KeycloakAPI:
token = _request_token_using_credentials(self.module.params)
self.restheaders["Authorization"] = f"Bearer {token}"
r = make_request_catching_401()
r = make_request_catching_401(headers)
if isinstance(r, Exception):
# Try to re-auth with client_id and client_secret, if available
@ -451,7 +462,7 @@ class KeycloakAPI:
token = _request_token_using_client_credentials(self.module.params)
self.restheaders["Authorization"] = f"Bearer {token}"
r = make_request_catching_401()
r = make_request_catching_401(headers)
except KeycloakError as e:
# Token refresh returns 400 if token is expired/invalid, so continue on if we get a 400
if e.authError is not None and e.authError.code != 400: # type: ignore # TODO!
@ -590,6 +601,78 @@ class KeycloakAPI:
except Exception as e:
self.fail_request(e, msg=f"Could not delete realm {realm}: {e}", exception=traceback.format_exc())
def get_localization_values(self, locale: str, realm: str = "master") -> dict[str, str]:
"""
Get all localization overrides for a given realm and locale.
:param locale: Locale code (for example, 'en', 'fi', 'de').
:param realm: Realm name. Defaults to 'master'.
:return: Mapping of localization keys to override values.
:raise KeycloakError: Wrapped HTTP/JSON error with context
"""
realm_url = URL_LOCALIZATIONS.format(url=self.baseurl, realm=realm, locale=locale)
try:
return self._request_and_deserialize(realm_url, method="GET")
except Exception as e:
self.fail_request(
e,
msg=f"Could not read localization overrides for realm {realm}, locale {locale}: {e}",
exception=traceback.format_exc(),
)
def set_localization_value(self, locale: str, key: str, value: str, realm: str = "master"):
"""
Create or update a single localization override for the given key.
:param locale: Locale code (for example, 'en').
:param key: Localization message key to set.
:param value: Override value to set.
:param realm: Realm name. Defaults to 'master'.
:return: HTTPResponse: Response object on success.
:raise KeycloakError: Wrapped HTTP error with context
"""
realm_url = URL_LOCALIZATION.format(url=self.baseurl, realm=realm, locale=locale, key=key)
headers = {}
headers["Content-Type"] = "text/plain; charset=utf-8"
try:
return self._request(realm_url, method="PUT", data=to_native(value), extra_headers=headers)
except Exception as e:
self.fail_request(
e,
msg=f"Could not set localization value in realm {realm}, locale {locale}: {key}={value}: {e}",
exception=traceback.format_exc(),
)
def delete_localization_value(self, locale: str, key: str, realm: str = "master"):
"""
Delete a single localization override key for the given locale.
:param locale: Locale code (for example, 'en').
:param key: Localization message key to delete.
:param realm: Realm name. Defaults to 'master'.
:return: HTTPResponse: Response object on success.
:raise KeycloakError: Wrapped HTTP error with context
"""
realm_url = URL_LOCALIZATION.format(url=self.baseurl, realm=realm, locale=locale, key=key)
try:
return self._request(realm_url, method="DELETE")
except Exception as e:
self.fail_request(
e,
msg=f"Could not delete localization value in realm {realm}, locale {locale}, key {key}: {e}",
exception=traceback.format_exc(),
)
def get_clients(self, realm: str = "master", filter=None):
"""Obtains client representations for clients in a realm

View file

@ -0,0 +1,398 @@
# !/usr/bin/python
# Copyright Jakub Danek <danek.ja@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or
# https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
DOCUMENTATION = r"""
module: keycloak_realm_localization
short_description: Allows management of Keycloak realm localization overrides via the Keycloak API
version_added: 12.4.0
description:
- This module allows you to manage per-locale message overrides for a Keycloak realm using the Keycloak Admin REST API.
- Requires access via OpenID Connect; the connecting user/client must have sufficient privileges.
- The names of module options are snake_cased versions of the names found in the Keycloak API.
attributes:
check_mode:
support: full
diff_mode:
support: full
options:
force:
description:
- If V(false), only the keys listed in the O(overrides) are modified by this module. Any other pre-existing
keys are ignored.
- If V(true), all locale overrides are made to match configuration of this module. For example any keys
missing from the O(overrides) are removed regardless of O(state) value.
type: bool
default: false
locale:
description:
- Locale code for which the overrides apply (for example, V(en), V(fi), V(de)).
type: str
required: true
parent_id:
description:
- Name of the realm that owns the locale overrides.
type: str
required: true
state:
description:
- Desired state of localization overrides for the given locale.
- On V(present), the set of overrides for the locale are made to match O(overrides).
If O(force) is V(true) keys not listed in O(overrides) are removed,
and the listed keys are created or updated.
If O(force) is V(false) keys not listed in O(overrides) are ignored,
and the listed keys are created or updated.
- On V(absent), overrides for the locale is removed. If O(force) is V(true), all keys are removed.
If O(force) is V(false), only the keys listed in O(overrides) are removed.
type: str
choices: ['present', 'absent']
default: present
overrides:
description:
- List of overrides to ensure for the locale when O(state=present). Each item is a mapping with
the record's O(overrides[].key) and its O(overrides[].value).
- Ignored when O(state=absent).
type: list
elements: dict
default: []
suboptions:
key:
description:
- The message key to override.
type: str
required: true
value:
description:
- The override value for the message key. If omitted, value defaults to an empty string.
type: str
default: ""
required: false
seealso:
- module: community.general.keycloak_realm
description: You can specify list of supported locales using O(community.general.keycloak_realm#module:supported_locales).
extends_documentation_fragment:
- community.general.keycloak
- community.general.keycloak.actiongroup_keycloak
- community.general.attributes
author: Jakub Danek (@danekja)
"""
EXAMPLES = r"""
- name: Replace all overrides for locale "en" (credentials auth)
community.general.keycloak_realm_localization:
auth_client_id: admin-cli
auth_keycloak_url: https://auth.example.com/auth
auth_realm: master
auth_username: USERNAME
auth_password: PASSWORD
parent_id: my-realm
locale: en
state: present
force: true
overrides:
- key: greeting
value: "Hello"
- key: farewell
value: "Bye"
delegate_to: localhost
- name: Replace listed overrides for locale "en" (credentials auth)
community.general.keycloak_realm_localization:
auth_client_id: admin-cli
auth_keycloak_url: https://auth.example.com/auth
auth_realm: master
auth_username: USERNAME
auth_password: PASSWORD
parent_id: my-realm
locale: en
state: present
force: false
overrides:
- key: greeting
value: "Hello"
- key: farewell
value: "Bye"
delegate_to: localhost
- name: Ensure only one override exists for locale "fi" (token auth)
community.general.keycloak_realm_localization:
auth_client_id: admin-cli
auth_keycloak_url: https://auth.example.com/auth
token: TOKEN
parent_id: my-realm
locale: fi
state: present
force: true
overrides:
- key: app.title
value: "Sovellukseni"
delegate_to: localhost
- name: Remove all overrides for locale "de"
community.general.keycloak_realm_localization:
auth_client_id: admin-cli
auth_keycloak_url: https://auth.example.com/auth
auth_realm: master
auth_username: USERNAME
auth_password: PASSWORD
parent_id: my-realm
locale: de
state: absent
force: true
delegate_to: localhost
- name: Remove only the listed overrides for locale "de"
community.general.keycloak_realm_localization:
auth_client_id: admin-cli
auth_keycloak_url: https://auth.example.com/auth
auth_realm: master
auth_username: USERNAME
auth_password: PASSWORD
parent_id: my-realm
locale: de
state: absent
force: false
overrides:
- key: app.title
- key: foo
- key: bar
delegate_to: localhost
"""
RETURN = r"""
end_state:
description:
- Final state of localization overrides for the locale after module execution.
- Contains the O(locale) and the list of O(overrides) as key/value items.
returned: on success
type: dict
contains:
locale:
description: The locale code affected.
type: str
sample: en
overrides:
description: The list of overrides that exist after execution.
type: list
elements: dict
sample:
- key: greeting
value: Hello
- key: farewell
value: Bye
"""
from copy import deepcopy
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak import (
KeycloakAPI,
KeycloakError,
get_token,
keycloak_argument_spec,
)
def _normalize_overrides(current: dict | None) -> list[dict]:
"""
Accepts:
- dict: {'k1': 'v1', ...}
Return a sorted list of {'key', 'value'}.
This helper provides a consistent shape for downstream comparison/diff logic.
"""
if not current:
return []
return [{"key": k, "value": v} for k, v in sorted(current.items())]
def main():
argument_spec = keycloak_argument_spec()
# Single override record structure
overrides_spec = dict(
key=dict(type="str", no_log=False, required=True),
value=dict(type="str", default=""),
)
meta_args = dict(
locale=dict(type="str", required=True),
parent_id=dict(type="str", required=True),
state=dict(type="str", default="present", choices=["present", "absent"]),
overrides=dict(type="list", elements="dict", options=overrides_spec, default=[]),
force=dict(type="bool", default=False),
)
argument_spec.update(meta_args)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
required_one_of=([["token", "auth_realm", "auth_username", "auth_password"]]),
required_together=([["auth_realm", "auth_username", "auth_password"]]),
)
result = dict(changed=False, msg="", end_state={}, diff=dict(before={}, after={}))
# Obtain access token, initialize API
try:
connection_header = get_token(module.params)
except KeycloakError as e:
module.fail_json(msg=str(e))
kc = KeycloakAPI(module, connection_header)
# Convenience locals for frequently used parameters
locale = module.params["locale"]
state = module.params["state"]
parent_id = module.params["parent_id"]
force = module.params["force"]
desired_raw = module.params["overrides"]
desired_overrides = _normalize_overrides({r["key"]: r.get("value") for r in desired_raw})
old_overrides = _normalize_overrides(kc.get_localization_values(locale, parent_id) or {})
before = {
"locale": locale,
"overrides": deepcopy(old_overrides),
}
# Proposed state used for diff reporting
changeset = {
"locale": locale,
"overrides": [],
}
result["changed"] = False
if state == "present":
changeset["overrides"] = deepcopy(desired_overrides)
# Compute two sets:
# - to_update: keys missing or with different values
# - to_remove: keys existing in current state but not in desired
to_update = []
to_remove = deepcopy(old_overrides)
# Mark updates and remove matched ones from to_remove
for record in desired_overrides:
override_found = False
for override in to_remove:
if override["key"] == record["key"]:
override_found = True
# Value differs -> update needed
if override["value"] != record["value"]:
result["changed"] = True
to_update.append(record)
# Remove processed item so what's left in to_remove are deletions
to_remove.remove(override)
break
if not override_found:
# New key, must be created
to_update.append(record)
result["changed"] = True
# ignore any left-overs in to_remove, force is false
if not force:
changeset["overrides"].extend(to_remove)
to_remove = []
if to_remove:
result["changed"] = True
if result["changed"]:
if module._diff:
result["diff"] = dict(before=before, after=changeset)
if module.check_mode:
result["msg"] = f"Locale {locale} overrides would be updated."
else:
for override in to_remove:
kc.delete_localization_value(locale, override["key"], parent_id)
for override in to_update:
kc.set_localization_value(locale, override["key"], override["value"], parent_id)
result["msg"] = f"Locale {locale} overrides have been updated."
else:
result["msg"] = f"Locale {locale} overrides are in sync."
# For accurate end_state, read back from API unless we are in check_mode
if not module.check_mode:
final_overrides = _normalize_overrides(kc.get_localization_values(locale, parent_id) or {})
else:
final_overrides = ["overrides"]
result["end_state"] = {"locale": locale, "overrides": final_overrides}
elif state == "absent":
if force:
to_remove = old_overrides
else:
# touch only overrides listed in parameters, leave the rest be
to_remove = deepcopy(desired_overrides)
to_keep = deepcopy(old_overrides)
for override in to_remove:
found = False
for keep in to_keep:
if override["key"] == keep["key"]:
to_keep.remove(keep)
found = True
break
if not found:
to_remove.remove(override)
changeset["overrides"] = to_keep
if to_remove:
result["changed"] = True
if module._diff:
result["diff"] = dict(before=before, after=changeset)
if module.check_mode:
if result["changed"]:
result["msg"] = f"{len(to_remove)} overrides for locale {locale} would be deleted."
else:
result["msg"] = f"No overrides for locale {locale} to be deleted."
else:
for override in to_remove:
kc.delete_localization_value(locale, override["key"], parent_id)
if result["changed"]:
result["msg"] = f"{len(to_remove)} overrides for locale {locale} deleted."
else:
result["msg"] = f"No overrides for locale {locale} to be deleted."
result["end_state"] = changeset
module.exit_json(**result)
if __name__ == "__main__":
main()