1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-03-22 05:09:12 +00:00
community.general/plugins/modules/keycloak_authentication.py
patchback[bot] 543a292f4b
[PR #11548/2f33ff10 backport][stable-11] keycloak_authentication: fix TypeError when flow has no authenticationExecutions (#11564)
keycloak_authentication: fix TypeError when flow has no authenticationExecutions (#11548)

* TIAAS-12174: fix(keycloak_authentication): handle None authenticationExecutions

When a flow is defined without authenticationExecutions, module.params.get()
returns None but the key still exists in the config dict. The 'in' check
passes but iterating over None raises TypeError.

Guard the iteration with an explicit None check.

* keycloak_authentication: add changelog fragment for NoneType fix

* keycloak_authentication: update changelog fragment with PR link

* Update plugins/modules/keycloak_authentication.py



* Changelog polishing

---------



(cherry picked from commit 2f33ff1041)

Co-authored-by: Ivan Kokalovic <67540157+koke1997@users.noreply.github.com>
Co-authored-by: Ivan Kokalovic <ivan.kokalovic@example.com>
Co-authored-by: Felix Fontein <felix@fontein.de>
2026-03-10 06:57:43 +01:00

499 lines
19 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2019, INSPQ <philippe.gauthier@inspq.qc.ca>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r"""
module: keycloak_authentication
short_description: Configure authentication in Keycloak
description:
- This module actually can only make a copy of an existing authentication flow, add an execution to it and configure it.
- It can also delete the flow.
version_added: "3.3.0"
attributes:
check_mode:
support: full
diff_mode:
support: full
action_group:
version_added: 10.2.0
options:
realm:
description:
- The name of the realm in which is the authentication.
required: true
type: str
alias:
description:
- Alias for the authentication flow.
required: true
type: str
description:
description:
- Description of the flow.
type: str
providerId:
description:
- C(providerId) for the new flow when not copied from an existing flow.
choices: ["basic-flow", "client-flow"]
type: str
copyFrom:
description:
- C(flowAlias) of the authentication flow to use for the copy.
type: str
authenticationExecutions:
description:
- Configuration structure for the executions.
type: list
elements: dict
suboptions:
providerId:
description:
- C(providerID) for the new flow when not copied from an existing flow.
type: str
displayName:
description:
- Name of the execution or subflow to create or update.
type: str
requirement:
description:
- Control status of the subflow or execution.
choices: ["REQUIRED", "ALTERNATIVE", "DISABLED", "CONDITIONAL"]
type: str
flowAlias:
description:
- Alias of parent flow.
type: str
authenticationConfig:
description:
- Describe the config of the authentication.
type: dict
index:
description:
- Priority order of the execution.
type: int
subFlowType:
description:
- For new subflows, optionally specify the type.
- Is only used at creation.
choices: ["basic-flow", "form-flow"]
default: "basic-flow"
type: str
version_added: 6.6.0
state:
description:
- Control if the authentication flow must exists or not.
choices: ["present", "absent"]
default: present
type: str
force:
type: bool
default: false
description:
- If V(true), allows to remove the authentication flow and recreate it.
extends_documentation_fragment:
- community.general.keycloak
- community.general.keycloak.actiongroup_keycloak
- community.general.attributes
author:
- Philippe Gauthier (@elfelip)
- Gaëtan Daubresse (@Gaetan2907)
"""
EXAMPLES = r"""
- name: Create an authentication flow from first broker login and add an execution to it.
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
copyFrom: "first broker login"
authenticationExecutions:
- providerId: "test-execution1"
requirement: "REQUIRED"
authenticationConfig:
alias: "test.execution1.property"
config:
test1.property: "value"
- providerId: "test-execution2"
requirement: "REQUIRED"
authenticationConfig:
alias: "test.execution2.property"
config:
test2.property: "value"
state: present
- name: Re-create the authentication flow
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
copyFrom: "first broker login"
authenticationExecutions:
- providerId: "test-provisioning"
requirement: "REQUIRED"
authenticationConfig:
alias: "test.provisioning.property"
config:
test.provisioning.property: "value"
state: present
force: true
- name: Create an authentication flow with subflow containing an execution.
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
copyFrom: "first broker login"
authenticationExecutions:
- providerId: "test-execution1"
requirement: "REQUIRED"
- displayName: "New Subflow"
requirement: "REQUIRED"
- providerId: "auth-cookie"
requirement: "REQUIRED"
flowAlias: "New Sublow"
state: present
- name: Remove authentication.
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
state: absent
"""
RETURN = r"""
msg:
description: Message as to what action was taken.
returned: always
type: str
end_state:
description: Representation of the authentication after module execution.
returned: on success
type: dict
sample:
{
"alias": "Copy of first broker login",
"authenticationExecutions": [
{
"alias": "review profile config",
"authenticationConfig": {
"alias": "review profile config",
"config": {
"update.profile.on.first.login": "missing"
},
"id": "6f09e4fb-aad4-496a-b873-7fa9779df6d7"
},
"configurable": true,
"displayName": "Review Profile",
"id": "8f77dab8-2008-416f-989e-88b09ccf0b4c",
"index": 0,
"level": 0,
"providerId": "idp-review-profile",
"requirement": "REQUIRED",
"requirementChoices": [
"REQUIRED",
"ALTERNATIVE",
"DISABLED"
]
}
],
"builtIn": false,
"description": "Actions taken after first broker login with identity provider account, which is not yet linked to any Keycloak account",
"id": "bc228863-5887-4297-b898-4d988f8eaa5c",
"providerId": "basic-flow",
"topLevel": true
}
"""
from ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak \
import KeycloakAPI, keycloak_argument_spec, get_token, KeycloakError, is_struct_included
from ansible.module_utils.basic import AnsibleModule
def find_exec_in_executions(searched_exec, executions):
"""
Search if exec is contained in the executions.
:param searched_exec: Execution to search for.
:param executions: List of executions.
:return: Index of the execution, -1 if not found..
"""
for i, existing_exec in enumerate(executions, start=0):
if ("providerId" in existing_exec and "providerId" in searched_exec and
existing_exec["providerId"] == searched_exec["providerId"] or
"displayName" in existing_exec and "displayName" in searched_exec and
existing_exec["displayName"] == searched_exec["displayName"]):
return i
return -1
def create_or_update_executions(kc, config, realm='master'):
"""
Create or update executions for an authentication flow.
:param kc: Keycloak API access.
:param config: Representation of the authentication flow including its executions.
:param realm: Realm
:return: tuple (changed, dict(before, after)
WHERE
bool changed indicates if changes have been made
dict(str, str) shows state before and after creation/update
"""
try:
changed = False
after = ""
before = ""
execution = None
if config.get("authenticationExecutions") is not None:
# Get existing executions on the Keycloak server for this alias
existing_executions = kc.get_executions_representation(config, realm=realm)
for new_exec_index, new_exec in enumerate(config["authenticationExecutions"], start=0):
if new_exec["index"] is not None:
new_exec_index = new_exec["index"]
exec_found = False
# Get flowalias parent if given
if new_exec["flowAlias"] is not None:
flow_alias_parent = new_exec["flowAlias"]
else:
flow_alias_parent = config["alias"]
# Check if same providerId or displayName name between existing and new execution
exec_index = find_exec_in_executions(new_exec, existing_executions)
if exec_index != -1:
# Remove key that doesn't need to be compared with existing_exec
exclude_key = ["flowAlias", "subFlowType"]
for index_key, key in enumerate(new_exec, start=0):
if new_exec[key] is None:
exclude_key.append(key)
# Compare the executions to see if it need changes
if not is_struct_included(new_exec, existing_executions[exec_index], exclude_key) or exec_index != new_exec_index:
exec_found = True
if new_exec['index'] is None:
new_exec_index = exec_index
before += str(existing_executions[exec_index]) + '\n'
execution = existing_executions[exec_index].copy()
# Remove exec from list in case 2 exec with same name
existing_executions[exec_index].clear()
elif new_exec["providerId"] is not None:
kc.create_execution(new_exec, flowAlias=flow_alias_parent, realm=realm)
execution = kc.get_executions_representation(config, realm=realm)[exec_index]
exec_found = True
exec_index = new_exec_index
after += str(new_exec) + '\n'
elif new_exec["displayName"] is not None:
kc.create_subflow(new_exec["displayName"], flow_alias_parent, realm=realm, flowType=new_exec["subFlowType"])
execution = kc.get_executions_representation(config, realm=realm)[exec_index]
exec_found = True
exec_index = new_exec_index
after += str(new_exec) + '\n'
if exec_found:
changed = True
if exec_index != -1:
# Update the existing execution
updated_exec = {
"id": execution["id"]
}
# add the execution configuration
if new_exec["authenticationConfig"] is not None:
if "authenticationConfig" in execution and "id" in execution["authenticationConfig"]:
kc.delete_authentication_config(execution["authenticationConfig"]["id"], realm=realm)
kc.add_authenticationConfig_to_execution(updated_exec["id"], new_exec["authenticationConfig"], realm=realm)
for key in new_exec:
# remove unwanted key for the next API call
if key not in ("flowAlias", "authenticationConfig", "subFlowType"):
updated_exec[key] = new_exec[key]
if new_exec["requirement"] is not None:
if "priority" in execution:
updated_exec["priority"] = execution["priority"]
kc.update_authentication_executions(flow_alias_parent, updated_exec, realm=realm)
diff = exec_index - new_exec_index
kc.change_execution_priority(updated_exec["id"], diff, realm=realm)
after += str(kc.get_executions_representation(config, realm=realm)[new_exec_index]) + '\n'
return changed, dict(before=before, after=after)
except Exception as e:
kc.module.fail_json(msg='Could not create or update executions for authentication flow %s in realm %s: %s'
% (config["alias"], realm, str(e)))
def main():
"""
Module execution
:return:
"""
argument_spec = keycloak_argument_spec()
meta_args = dict(
realm=dict(type='str', required=True),
alias=dict(type='str', required=True),
providerId=dict(type='str', choices=["basic-flow", "client-flow"]),
description=dict(type='str'),
copyFrom=dict(type='str'),
authenticationExecutions=dict(type='list', elements='dict',
options=dict(
providerId=dict(type='str'),
displayName=dict(type='str'),
requirement=dict(choices=["REQUIRED", "ALTERNATIVE", "DISABLED", "CONDITIONAL"], type='str'),
flowAlias=dict(type='str'),
authenticationConfig=dict(type='dict'),
index=dict(type='int'),
subFlowType=dict(choices=["basic-flow", "form-flow"], default='basic-flow', type='str'),
)),
state=dict(choices=["absent", "present"], default='present'),
force=dict(type='bool', default=False),
)
argument_spec.update(meta_args)
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True,
required_one_of=([['token', 'auth_realm', 'auth_username', 'auth_password', 'auth_client_id', 'auth_client_secret']]),
required_together=([['auth_username', 'auth_password']]),
required_by={'refresh_token': 'auth_realm'},
)
result = dict(changed=False, msg='', flow={})
# Obtain access token, initialize API
try:
connection_header = get_token(module.params)
except KeycloakError as e:
module.fail_json(msg=str(e))
kc = KeycloakAPI(module, connection_header)
realm = module.params.get('realm')
state = module.params.get('state')
force = module.params.get('force')
new_auth_repr = {
"alias": module.params.get("alias"),
"copyFrom": module.params.get("copyFrom"),
"providerId": module.params.get("providerId"),
"authenticationExecutions": module.params.get("authenticationExecutions"),
"description": module.params.get("description"),
"builtIn": module.params.get("builtIn"),
"subflow": module.params.get("subflow"),
}
auth_repr = kc.get_authentication_flow_by_alias(alias=new_auth_repr["alias"], realm=realm)
# Cater for when it doesn't exist (an empty dict)
if not auth_repr:
if state == 'absent':
# Do nothing and exit
if module._diff:
result['diff'] = dict(before='', after='')
result['changed'] = False
result['end_state'] = {}
result['msg'] = new_auth_repr["alias"] + ' absent'
module.exit_json(**result)
elif state == 'present':
# Process a creation
result['changed'] = True
if module._diff:
result['diff'] = dict(before='', after=new_auth_repr)
if module.check_mode:
module.exit_json(**result)
# If copyFrom is defined, create authentication flow from a copy
if "copyFrom" in new_auth_repr and new_auth_repr["copyFrom"] is not None:
auth_repr = kc.copy_auth_flow(config=new_auth_repr, realm=realm)
else: # Create an empty authentication flow
auth_repr = kc.create_empty_auth_flow(config=new_auth_repr, realm=realm)
# If the authentication still not exist on the server, raise an exception.
if auth_repr is None:
result['msg'] = "Authentication just created not found: " + str(new_auth_repr)
module.fail_json(**result)
# Configure the executions for the flow
create_or_update_executions(kc=kc, config=new_auth_repr, realm=realm)
# Get executions created
exec_repr = kc.get_executions_representation(config=new_auth_repr, realm=realm)
if exec_repr is not None:
auth_repr["authenticationExecutions"] = exec_repr
result['end_state'] = auth_repr
else:
if state == 'present':
# Process an update
if force: # If force option is true
# Delete the actual authentication flow
result['changed'] = True
if module._diff:
result['diff'] = dict(before=auth_repr, after=new_auth_repr)
if module.check_mode:
module.exit_json(**result)
kc.delete_authentication_flow_by_id(id=auth_repr["id"], realm=realm)
# If copyFrom is defined, create authentication flow from a copy
if "copyFrom" in new_auth_repr and new_auth_repr["copyFrom"] is not None:
auth_repr = kc.copy_auth_flow(config=new_auth_repr, realm=realm)
else: # Create an empty authentication flow
auth_repr = kc.create_empty_auth_flow(config=new_auth_repr, realm=realm)
# If the authentication still not exist on the server, raise an exception.
if auth_repr is None:
result['msg'] = "Authentication just created not found: " + str(new_auth_repr)
module.fail_json(**result)
# Configure the executions for the flow
if module.check_mode:
module.exit_json(**result)
changed, diff = create_or_update_executions(kc=kc, config=new_auth_repr, realm=realm)
result['changed'] |= changed
if module._diff:
result['diff'] = diff
# Get executions created
exec_repr = kc.get_executions_representation(config=new_auth_repr, realm=realm)
if exec_repr is not None:
auth_repr["authenticationExecutions"] = exec_repr
result['end_state'] = auth_repr
else:
# Process a deletion (because state was not 'present')
result['changed'] = True
if module._diff:
result['diff'] = dict(before=auth_repr, after='')
if module.check_mode:
module.exit_json(**result)
# delete it
kc.delete_authentication_flow_by_id(id=auth_repr["id"], realm=realm)
result['msg'] = 'Authentication flow: {alias} id: {id} is deleted'.format(alias=new_auth_repr['alias'],
id=auth_repr["id"])
module.exit_json(**result)
if __name__ == '__main__':
main()